diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 3fcbbf34672d..e547dad01c14 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1717,6 +1717,21 @@ definitions: type: type: string enum: [JsonlDecoder] + CsvDecoder: + title: CSV Decoder + description: Use this if the response is text in CSV format. + type: object + required: + - type + - delimiter + properties: + type: + type: string + enum: [CsvDecoder] + delimiter: + type: string + description: The delimiter used in the CSV file. + default: "," KeysToLower: title: Keys to Lower Case description: A transformation that renames all keys to lower case. @@ -2408,6 +2423,7 @@ definitions: - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" + - "$ref": "#/definitions/CsvDecoder" $parameters: type: object additionalProperties: true @@ -2524,6 +2540,7 @@ definitions: - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" + - "$ref": "#/definitions/CsvDecoder" $parameters: type: object additionalProperties: true diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/__init__.py index b67561e989ce..8bb0f988c079 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/__init__.py @@ -7,5 +7,6 @@ from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import PaginationDecoderDecorator from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder +from airbyte_cdk.sources.declarative.decoders.csv_decoder import CsvDecoder -__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"] +__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder", "CsvDecoder"] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/csv_decoder.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/csv_decoder.py new file mode 100644 index 000000000000..c09613b29a7f --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/csv_decoder.py @@ -0,0 +1,46 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import csv +import io +import logging +from dataclasses import InitVar, dataclass +from typing import Any, Generator, Mapping, MutableMapping + +import requests +from airbyte_cdk.sources.declarative.decoders.decoder import Decoder + +logger = logging.getLogger("airbyte") + + +@dataclass +class CsvDecoder(Decoder): + """ + CsvDecoder is a decoder strategy that parses the CSV content of the response, and converts it to a stream of dicts. + """ + + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]): + self.delimiter = parameters.get("delimiter", ",") + + def is_stream_response(self) -> bool: + return False + + def decode(self, response: requests.Response) -> Generator[MutableMapping[str, Any], None, None]: + try: + csv_content = io.StringIO(response.text) + reader = csv.reader(csv_content, delimiter=self.delimiter) + header = next(reader) + + for row in reader: + record = {} + for i, value in enumerate(row): + if i < len(header): + record[header[i]] = value + yield record + + except Exception as exc: + logger.warning(f"Response cannot be parsed from CSV: {response.status_code=}, {response.text=}, {exc=}") + yield {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index eeb773b2522f..bea0b85057a9 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -676,6 +676,11 @@ class JsonlDecoder(BaseModel): type: Literal['JsonlDecoder'] +class CsvDecoder(BaseModel): + type: Literal['CsvDecoder'] + delimiter: str = Field(..., description='The delimiter used in the CSV file.') + + class KeysToLower(BaseModel): type: Literal['KeysToLower'] parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') @@ -1634,12 +1639,12 @@ class SimpleRetriever(BaseModel): description='PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.', title='Partition Router', ) - decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = ( - Field( - None, - description='Component decoding the response so records can be extracted.', - title='Decoder', - ) + decoder: Optional[ + Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder, CsvDecoder] + ] = Field( + None, + description='Component decoding the response so records can be extracted.', + title='Decoder', ) parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') @@ -1700,12 +1705,12 @@ class AsyncRetriever(BaseModel): description='PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.', title='Partition Router', ) - decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = ( - Field( - None, - description='Component decoding the response so records can be extracted.', - title='Decoder', - ) + decoder: Optional[ + Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder, CsvDecoder] + ] = Field( + None, + description='Component decoding the response so records can be extracted.', + title='Decoder', ) parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4ec85e1ee81f..db231516b883 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -32,6 +32,7 @@ from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders import ( + CsvDecoder, Decoder, IterableDecoder, JsonDecoder, @@ -66,6 +67,7 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import CompositeErrorHandler as CompositeErrorHandlerModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import ConcurrencyLevel as ConcurrencyLevelModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import ConstantBackoffStrategy as ConstantBackoffStrategyModel +from airbyte_cdk.sources.declarative.models.declarative_component_schema import CsvDecoder as CsvDecoderModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import CursorPagination as CursorPaginationModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomAuthenticator as CustomAuthenticatorModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomBackoffStrategy as CustomBackoffStrategyModel @@ -236,6 +238,7 @@ def _init_mappings(self) -> None: KeysToLowerModel: self.create_keys_to_lower_transformation, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, + CsvDecoderModel: self.create_csv_decoder, JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, JwtAuthenticatorModel: self.create_jwt_authenticator, LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration, @@ -388,7 +391,9 @@ def create_legacy_to_per_partition_state_migration( if not hasattr(partition_router, "parent_stream_configs"): raise ValueError("LegacyToPerPartitionStateMigrations can only be applied with a parent stream configuration.") - return LegacyToPerPartitionStateMigration(declarative_stream.retriever.partition_router, declarative_stream.incremental_sync, config, declarative_stream.parameters) # type: ignore # The retriever type was already checked + return LegacyToPerPartitionStateMigration( + declarative_stream.retriever.partition_router, declarative_stream.incremental_sync, config, declarative_stream.parameters + ) # type: ignore # The retriever type was already checked def create_session_token_authenticator( self, model: SessionTokenAuthenticatorModel, config: Config, name: str, **kwargs: Any @@ -971,6 +976,10 @@ def create_iterable_decoder(model: IterableDecoderModel, config: Config, **kwarg def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> XmlDecoder: return XmlDecoder(parameters={}) + @staticmethod + def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> CsvDecoder: + return CsvDecoder(parameters={"delimiter": model.delimiter}) + @staticmethod def create_json_file_schema_loader(model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any) -> JsonFileSchemaLoader: return JsonFileSchemaLoader(file_path=model.file_path or "", config=config, parameters=model.parameters or {}) @@ -1323,7 +1332,6 @@ def create_async_retriever( transformations: List[RecordTransformation], **kwargs: Any, ) -> AsyncRetriever: - decoder = self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(parameters={}) record_selector = self._create_component_from_model( model=model.record_selector, diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/decoders/test_csv_decoder.py b/airbyte-cdk/python/unit_tests/sources/declarative/decoders/test_csv_decoder.py new file mode 100644 index 000000000000..0580770dc257 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/decoders/test_csv_decoder.py @@ -0,0 +1,35 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import pytest +import requests +from airbyte_cdk.sources.declarative.decoders import CsvDecoder + + +@pytest.mark.parametrize( + "response_body, parameters, expected", + [ + ( + "col1,col2\nval1,val2", + {}, + [{"col1": "val1", "col2": "val2"}], + ), + ( + "col1;col2\nval1;val2", + {"delimiter": ";"}, + [{"col1": "val1", "col2": "val2"}], + ), + ( + "malformed csv", + {}, + [], # Expect an empty dict on error + ), + ], + ids=["with_header", "custom_delimiter", "malformed_csv"], +) +def test_csv_decoder(requests_mock, response_body, parameters, expected): + requests_mock.register_uri("GET", "https://airbyte.io/", text=response_body) + response = requests.get("https://airbyte.io/") + results = list(CsvDecoder(parameters=parameters).decode(response)) + assert results == expected