Skip to content

🐛 Source S3: fix bug when importing parquet files with null datetimes #35650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.67.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be best to remove this change from the PR: since the CDK publication workflow automatically updates airbyte-cdk/python/CHANGELOG.md, it's best to rely on that rather than manually updating—that way there are no git conflicts between updates that are developed simultaneously, the changelog and version tags will be properly sorted in order of publication.

Fixes bug in parquet_parser.py triggered by null values in datetime columns.

## 0.67.0
Low-code: Add CustomRecordFilter

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,42 +99,45 @@ def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> An
"""
Convert a pyarrow scalar to a value that can be output by the source.
"""
# Dictionaries are stored as two columns: indices and values
# The indices column is an array of integers that maps to the values column
if pa.types.is_dictionary(parquet_value.type):
return {
"indices": parquet_value.indices.tolist(),
"values": parquet_value.dictionary.tolist(),
}

if pa.types.is_null(parquet_value.type):
return None

# All if statements below assume there is a valid py_value and test against it.
py_value = parquet_value.as_py()
if py_value is None:
return py_value

# Convert date and datetime objects to isoformat strings
if pa.types.is_time(parquet_value.type) or pa.types.is_timestamp(parquet_value.type) or pa.types.is_date(parquet_value.type):
return parquet_value.as_py().isoformat()
return py_value.isoformat()

# Convert month_day_nano_interval to array
if parquet_value.type == pa.month_day_nano_interval():
return json.loads(json.dumps(parquet_value.as_py()))
return json.loads(json.dumps(py_value))

# Decode binary strings to utf-8
if ParquetParser._is_binary(parquet_value.type):
py_value = parquet_value.as_py()
if py_value is None:
return py_value
return py_value.decode("utf-8")
if pa.types.is_decimal(parquet_value.type):
if parquet_format.decimal_as_float:
return parquet_value.as_py()
return py_value
else:
return str(parquet_value.as_py())
return str(py_value)

# Dictionaries are stored as two columns: indices and values
# The indices column is an array of integers that maps to the values column
if pa.types.is_dictionary(parquet_value.type):
return {
"indices": parquet_value.indices.tolist(),
"values": parquet_value.dictionary.tolist(),
}
if pa.types.is_map(parquet_value.type):
return {k: v for k, v in parquet_value.as_py()}

if pa.types.is_null(parquet_value.type):
return None
return {k: v for k, v in py_value}

# Convert duration to seconds, then convert to the appropriate unit
if pa.types.is_duration(parquet_value.type):
duration = parquet_value.as_py()
duration = py_value
duration_seconds = duration.total_seconds()
if parquet_value.type.unit == "s":
return duration_seconds
Expand All @@ -147,7 +150,7 @@ def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> An
else:
raise ValueError(f"Unknown duration unit: {parquet_value.type.unit}")
else:
return parquet_value.as_py()
return py_value

@staticmethod
def parquet_type_to_schema_type(parquet_type: pa.DataType, parquet_format: ParquetFormat) -> Mapping[str, str]:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
name="airbyte-cdk",
# The version of the airbyte-cdk package is used at runtime to validate manifests. That validation must be
# updated if our semver format changes such as using release candidate versions.
version="0.67.0",
version="0.67.1",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def test_type_mapping(parquet_type: pa.DataType, expected_type: Mapping[str, str
pa.map_(pa.string(), pa.int32()), _default_parquet_format, {"hello": 1, "world": 2}, {"hello": 1, "world": 2}, id="test_map"
),
pytest.param(pa.null(), _default_parquet_format, None, None, id="test_null"),
pytest.param(pa.date32(), _default_parquet_format, None, None, id="test_null_datetime"),
],
)
def test_value_transformation(
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 4.5.7
dockerImageTag: 4.5.8
dockerRepository: airbyte/source-s3
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
githubIssueLabel: source-s3
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "4.5.7"
version = "4.5.8"
name = "source-s3"
description = "Source implementation for S3."
authors = [ "Airbyte <[email protected]>",]
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ To perform the text extraction from PDF and Docx files, the connector uses the [

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
| 4.5.8 | 2024-02-27 | [35650](https://github.com/airbytehq/airbyte/pull/35650/) | Fixes bug in parquet_parser.py triggered by null values in datetime columns. |
| 4.5.7 | 2024-02-23 | [34895](https://github.com/airbytehq/airbyte/pull/34895) | Run incremental syncs with concurrency |
| 4.5.6 | 2024-02-21 | [35246](https://github.com/airbytehq/airbyte/pull/35246) | Fixes bug that occurred when creating CSV streams with tab delimiter. |
| 4.5.5 | 2024-02-18 | [35392](https://github.com/airbytehq/airbyte/pull/35392) | Add support filtering by start date |
Expand Down