diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index aafd9d33a8a73..5d7613f0a2091 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.67.1 +Fixes bug in parquet_parser.py triggered by null values in datetime columns. + ## 0.67.0 Low-code: Add CustomRecordFilter diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py index 00b78c489801b..4c7851828d8d7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py @@ -99,42 +99,45 @@ def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> An """ Convert a pyarrow scalar to a value that can be output by the source. """ + # Dictionaries are stored as two columns: indices and values + # The indices column is an array of integers that maps to the values column + if pa.types.is_dictionary(parquet_value.type): + return { + "indices": parquet_value.indices.tolist(), + "values": parquet_value.dictionary.tolist(), + } + + if pa.types.is_null(parquet_value.type): + return None + + # All if statements below assume there is a valid py_value and test against it. + py_value = parquet_value.as_py() + if py_value is None: + return py_value + # Convert date and datetime objects to isoformat strings if pa.types.is_time(parquet_value.type) or pa.types.is_timestamp(parquet_value.type) or pa.types.is_date(parquet_value.type): - return parquet_value.as_py().isoformat() + return py_value.isoformat() # Convert month_day_nano_interval to array if parquet_value.type == pa.month_day_nano_interval(): - return json.loads(json.dumps(parquet_value.as_py())) + return json.loads(json.dumps(py_value)) # Decode binary strings to utf-8 if ParquetParser._is_binary(parquet_value.type): - py_value = parquet_value.as_py() - if py_value is None: - return py_value return py_value.decode("utf-8") if pa.types.is_decimal(parquet_value.type): if parquet_format.decimal_as_float: - return parquet_value.as_py() + return py_value else: - return str(parquet_value.as_py()) + return str(py_value) - # Dictionaries are stored as two columns: indices and values - # The indices column is an array of integers that maps to the values column - if pa.types.is_dictionary(parquet_value.type): - return { - "indices": parquet_value.indices.tolist(), - "values": parquet_value.dictionary.tolist(), - } if pa.types.is_map(parquet_value.type): - return {k: v for k, v in parquet_value.as_py()} - - if pa.types.is_null(parquet_value.type): - return None + return {k: v for k, v in py_value} # Convert duration to seconds, then convert to the appropriate unit if pa.types.is_duration(parquet_value.type): - duration = parquet_value.as_py() + duration = py_value duration_seconds = duration.total_seconds() if parquet_value.type.unit == "s": return duration_seconds @@ -147,7 +150,7 @@ def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> An else: raise ValueError(f"Unknown duration unit: {parquet_value.type.unit}") else: - return parquet_value.as_py() + return py_value @staticmethod def parquet_type_to_schema_type(parquet_type: pa.DataType, parquet_format: ParquetFormat) -> Mapping[str, str]: diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index b441974dc1bb9..2e7ff9cc45832 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -36,7 +36,7 @@ name="airbyte-cdk", # The version of the airbyte-cdk package is used at runtime to validate manifests. That validation must be # updated if our semver format changes such as using release candidate versions. - version="0.67.0", + version="0.67.1", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/file_types/test_parquet_parser.py b/airbyte-cdk/python/unit_tests/sources/file_based/file_types/test_parquet_parser.py index 984a782c5925a..b0da20f848539 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/file_types/test_parquet_parser.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/file_types/test_parquet_parser.py @@ -168,6 +168,7 @@ def test_type_mapping(parquet_type: pa.DataType, expected_type: Mapping[str, str pa.map_(pa.string(), pa.int32()), _default_parquet_format, {"hello": 1, "world": 2}, {"hello": 1, "world": 2}, id="test_map" ), pytest.param(pa.null(), _default_parquet_format, None, None, id="test_null"), + pytest.param(pa.date32(), _default_parquet_format, None, None, id="test_null_datetime"), ], ) def test_value_transformation( diff --git a/airbyte-integrations/connectors/source-s3/metadata.yaml b/airbyte-integrations/connectors/source-s3/metadata.yaml index 2b3b204f87bee..cd1690914b5ba 100644 --- a/airbyte-integrations/connectors/source-s3/metadata.yaml +++ b/airbyte-integrations/connectors/source-s3/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: file connectorType: source definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 - dockerImageTag: 4.5.7 + dockerImageTag: 4.5.8 dockerRepository: airbyte/source-s3 documentationUrl: https://docs.airbyte.com/integrations/sources/s3 githubIssueLabel: source-s3 diff --git a/airbyte-integrations/connectors/source-s3/pyproject.toml b/airbyte-integrations/connectors/source-s3/pyproject.toml index 3127d3b94c760..11331a7e084ef 100644 --- a/airbyte-integrations/connectors/source-s3/pyproject.toml +++ b/airbyte-integrations/connectors/source-s3/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "4.5.7" +version = "4.5.8" name = "source-s3" description = "Source implementation for S3." authors = [ "Airbyte ",] diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 1586348a10d89..0a92cbb1e33d9 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -264,6 +264,7 @@ To perform the text extraction from PDF and Docx files, the connector uses the [ | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------| +| 4.5.8 | 2024-02-27 | [35650](https://github.com/airbytehq/airbyte/pull/35650/) | Fixes bug in parquet_parser.py triggered by null values in datetime columns. | | 4.5.7 | 2024-02-23 | [34895](https://github.com/airbytehq/airbyte/pull/34895) | Run incremental syncs with concurrency | | 4.5.6 | 2024-02-21 | [35246](https://github.com/airbytehq/airbyte/pull/35246) | Fixes bug that occurred when creating CSV streams with tab delimiter. | | 4.5.5 | 2024-02-18 | [35392](https://github.com/airbytehq/airbyte/pull/35392) | Add support filtering by start date |