Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airbyte-cdk): CSV decoder #47131

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1717,6 +1717,21 @@ definitions:
type:
type: string
enum: [JsonlDecoder]
CsvDecoder:
title: CSV Decoder
description: Use this if the response is text in CSV format.
type: object
required:
- type
- delimiter
properties:
type:
type: string
enum: [CsvDecoder]
delimiter:
type: string
description: The delimiter used in the CSV file.
default: ","
KeysToLower:
title: Keys to Lower Case
description: A transformation that renames all keys to lower case.
Expand Down Expand Up @@ -2408,6 +2423,7 @@ definitions:
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/CsvDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -2524,6 +2540,7 @@ definitions:
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/CsvDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder
from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder
from airbyte_cdk.sources.declarative.decoders.csv_decoder import CsvDecoder

__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"]
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder", "CsvDecoder"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import csv
import io
import logging
from dataclasses import InitVar, dataclass
from typing import Any, Generator, Mapping, MutableMapping

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder

logger = logging.getLogger("airbyte")


@dataclass
class CsvDecoder(Decoder):
"""
CsvDecoder is a decoder strategy that parses the CSV content of the response, and converts it to a stream of dicts.
"""

parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]):
self.delimiter = parameters.get("delimiter", ",")

def is_stream_response(self) -> bool:
return False

def decode(self, response: requests.Response) -> Generator[MutableMapping[str, Any], None, None]:
try:
csv_content = io.StringIO(response.text)
reader = csv.reader(csv_content, delimiter=self.delimiter)
header = next(reader)

for row in reader:
record = {}
for i, value in enumerate(row):
if i < len(header):
record[header[i]] = value
yield record

except Exception as exc:
logger.warning(f"Response cannot be parsed from CSV: {response.status_code=}, {response.text=}, {exc=}")
yield {}
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,11 @@ class JsonlDecoder(BaseModel):
type: Literal['JsonlDecoder']


class CsvDecoder(BaseModel):
type: Literal['CsvDecoder']
delimiter: str = Field(..., description='The delimiter used in the CSV file.')


class KeysToLower(BaseModel):
type: Literal['KeysToLower']
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')
Expand Down Expand Up @@ -1634,12 +1639,12 @@ class SimpleRetriever(BaseModel):
description='PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.',
title='Partition Router',
)
decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = (
Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
)
decoder: Optional[
Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder, CsvDecoder]
] = Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')

Expand Down Expand Up @@ -1700,12 +1705,12 @@ class AsyncRetriever(BaseModel):
description='PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.',
title='Partition Router',
)
decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = (
Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
)
decoder: Optional[
Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder, CsvDecoder]
] = Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders import (
CsvDecoder,
Decoder,
IterableDecoder,
JsonDecoder,
Expand Down Expand Up @@ -66,6 +67,7 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CompositeErrorHandler as CompositeErrorHandlerModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ConcurrencyLevel as ConcurrencyLevelModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ConstantBackoffStrategy as ConstantBackoffStrategyModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CsvDecoder as CsvDecoderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CursorPagination as CursorPaginationModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomAuthenticator as CustomAuthenticatorModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomBackoffStrategy as CustomBackoffStrategyModel
Expand Down Expand Up @@ -236,6 +238,7 @@ def _init_mappings(self) -> None:
KeysToLowerModel: self.create_keys_to_lower_transformation,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
CsvDecoderModel: self.create_csv_decoder,
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
JwtAuthenticatorModel: self.create_jwt_authenticator,
LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration,
Expand Down Expand Up @@ -388,7 +391,9 @@ def create_legacy_to_per_partition_state_migration(
if not hasattr(partition_router, "parent_stream_configs"):
raise ValueError("LegacyToPerPartitionStateMigrations can only be applied with a parent stream configuration.")

return LegacyToPerPartitionStateMigration(declarative_stream.retriever.partition_router, declarative_stream.incremental_sync, config, declarative_stream.parameters) # type: ignore # The retriever type was already checked
return LegacyToPerPartitionStateMigration(
declarative_stream.retriever.partition_router, declarative_stream.incremental_sync, config, declarative_stream.parameters
) # type: ignore # The retriever type was already checked

def create_session_token_authenticator(
self, model: SessionTokenAuthenticatorModel, config: Config, name: str, **kwargs: Any
Expand Down Expand Up @@ -971,6 +976,10 @@ def create_iterable_decoder(model: IterableDecoderModel, config: Config, **kwarg
def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> XmlDecoder:
return XmlDecoder(parameters={})

@staticmethod
def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> CsvDecoder:
return CsvDecoder(parameters={"delimiter": model.delimiter})

@staticmethod
def create_json_file_schema_loader(model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any) -> JsonFileSchemaLoader:
return JsonFileSchemaLoader(file_path=model.file_path or "", config=config, parameters=model.parameters or {})
Expand Down Expand Up @@ -1323,7 +1332,6 @@ def create_async_retriever(
transformations: List[RecordTransformation],
**kwargs: Any,
) -> AsyncRetriever:

decoder = self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(parameters={})
record_selector = self._create_component_from_model(
model=model.record_selector,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import pytest
import requests
from airbyte_cdk.sources.declarative.decoders import CsvDecoder


@pytest.mark.parametrize(
"response_body, parameters, expected",
[
(
"col1,col2\nval1,val2",
{},
[{"col1": "val1", "col2": "val2"}],
),
(
"col1;col2\nval1;val2",
{"delimiter": ";"},
[{"col1": "val1", "col2": "val2"}],
),
(
"malformed csv",
{},
[], # Expect an empty dict on error
),
],
ids=["with_header", "custom_delimiter", "malformed_csv"],
)
def test_csv_decoder(requests_mock, response_body, parameters, expected):
requests_mock.register_uri("GET", "https://airbyte.io/", text=response_body)
response = requests.get("https://airbyte.io/")
results = list(CsvDecoder(parameters=parameters).decode(response))
assert results == expected
Loading