Skip to content

Commit 4dc394c

Browse files
Source S3: fix reading jsonl files with nested data (#16607)
* #531 source s3: fix reading nested jsonl files * #531 source s3: upd changelog * oncall #531 source s3: fix sample file * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 73ba7b6 commit 4dc394c

File tree

10 files changed

+87
-34
lines changed

10 files changed

+87
-34
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -876,7 +876,7 @@
876876
- name: S3
877877
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
878878
dockerRepository: airbyte/source-s3
879-
dockerImageTag: 0.1.18
879+
dockerImageTag: 0.1.20
880880
documentationUrl: https://docs.airbyte.io/integrations/sources/s3
881881
icon: s3.svg
882882
sourceType: file

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8868,7 +8868,7 @@
88688868
supportsNormalization: false
88698869
supportsDBT: false
88708870
supported_destination_sync_modes: []
8871-
- dockerImage: "airbyte/source-s3:0.1.18"
8871+
- dockerImage: "airbyte/source-s3:0.1.20"
88728872
spec:
88738873
documentationUrl: "https://docs.airbyte.io/integrations/sources/s3"
88748874
changelogUrl: "https://docs.airbyte.io/integrations/sources/s3"

airbyte-integrations/connectors/source-s3/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
1717
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1818
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1919

20-
LABEL io.airbyte.version=0.1.19
20+
LABEL io.airbyte.version=0.1.20
2121
LABEL io.airbyte.name=airbyte/source-s3

airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"aws_access_key_id": "123456",
77
"aws_secret_access_key": "123456key",
88
"path_prefix": "",
9-
"endpoint": "http://10.0.3.185:9000"
9+
"endpoint": "http://10.0.252.11:9000"
1010
},
1111
"format": {
1212
"filetype": "csv"

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,19 @@
1313
class AbstractFileParser(ABC):
1414
logger = AirbyteLogger()
1515

16+
NON_SCALAR_TYPES = {"struct": "struct"}
17+
TYPE_MAP = {
18+
"boolean": ("bool_", "bool"),
19+
"integer": ("int64", "int8", "int16", "int32", "uint8", "uint16", "uint32", "uint64"),
20+
"number": ("float64", "float16", "float32", "decimal128", "decimal256", "halffloat", "float", "double"),
21+
"string": ("large_string", "string"),
22+
# TODO: support object type rather than coercing to string
23+
"object": ("large_string",),
24+
# TODO: support array type rather than coercing to string
25+
"array": ("large_string",),
26+
"null": ("large_string",),
27+
}
28+
1629
def __init__(self, format: dict, master_schema: dict = None):
1730
"""
1831
:param format: file format specific mapping as described in spec.json
@@ -52,8 +65,8 @@ def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) ->
5265
:yield: data record as a mapping of {columns:values}
5366
"""
5467

55-
@staticmethod
56-
def json_type_to_pyarrow_type(typ: str, reverse: bool = False, logger: AirbyteLogger = AirbyteLogger()) -> str:
68+
@classmethod
69+
def json_type_to_pyarrow_type(cls, typ: str, reverse: bool = False, logger: AirbyteLogger = AirbyteLogger()) -> str:
5770
"""
5871
Converts Json Type to PyArrow types to (or the other way around if reverse=True)
5972
@@ -63,35 +76,28 @@ def json_type_to_pyarrow_type(typ: str, reverse: bool = False, logger: AirbyteLo
6376
:return: PyArrow type if reverse is False, else Json type
6477
"""
6578
str_typ = str(typ)
66-
# this is a map of airbyte types to pyarrow types. The first list element of the pyarrow types should be the one to use where required.
67-
map = {
68-
"boolean": ("bool_", "bool"),
69-
"integer": ("int64", "int8", "int16", "int32", "uint8", "uint16", "uint32", "uint64"),
70-
"number": ("float64", "float16", "float32", "decimal128", "decimal256", "halffloat", "float", "double"),
71-
"string": ("large_string", "string"),
72-
# TODO: support object type rather than coercing to string
73-
"object": ("large_string",),
74-
# TODO: support array type rather than coercing to string
75-
"array": ("large_string",),
76-
"null": ("large_string",),
77-
}
79+
# This is a map of airbyte types to pyarrow types.
80+
# The first list element of the pyarrow types should be the one to use where required.
81+
7882
if not reverse:
79-
for json_type, pyarrow_types in map.items():
83+
for json_type, pyarrow_types in cls.TYPE_MAP.items():
8084
if str_typ.lower() == json_type:
81-
return str(
82-
getattr(pa, pyarrow_types[0]).__call__()
83-
) # better way might be necessary when we decide to handle more type complexity
85+
type_ = next(iter(pyarrow_types))
86+
if type_ in cls.NON_SCALAR_TYPES:
87+
return cls.NON_SCALAR_TYPES[type_]
88+
# better way might be necessary when we decide to handle more type complexity
89+
return str(getattr(pa, type_).__call__())
8490
logger.debug(f"JSON type '{str_typ}' is not mapped, falling back to default conversion to large_string")
8591
return str(pa.large_string())
8692
else:
87-
for json_type, pyarrow_types in map.items():
93+
for json_type, pyarrow_types in cls.TYPE_MAP.items():
8894
if any(str_typ.startswith(pa_type) for pa_type in pyarrow_types):
8995
return json_type
9096
logger.debug(f"PyArrow type '{str_typ}' is not mapped, falling back to default conversion to string")
9197
return "string" # default type if unspecified in map
9298

93-
@staticmethod
94-
def json_schema_to_pyarrow_schema(schema: Mapping[str, Any], reverse: bool = False) -> Mapping[str, Any]:
99+
@classmethod
100+
def json_schema_to_pyarrow_schema(cls, schema: Mapping[str, Any], reverse: bool = False) -> Mapping[str, Any]:
95101
"""
96102
Converts a schema with JsonSchema datatypes to one with PyArrow types (or the other way if reverse=True)
97103
This utilises json_type_to_pyarrow_type() to convert each datatype
@@ -100,4 +106,4 @@ def json_schema_to_pyarrow_schema(schema: Mapping[str, Any], reverse: bool = Fal
100106
:param reverse: switch to True for PyArrow schema -> Json schema, defaults to False
101107
:return: converted schema dict
102108
"""
103-
return {column: AbstractFileParser.json_type_to_pyarrow_type(json_type, reverse=reverse) for column, json_type in schema.items()}
109+
return {column: cls.json_type_to_pyarrow_type(json_type, reverse=reverse) for column, json_type in schema.items()}

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,21 @@
1313

1414

1515
class JsonlParser(AbstractFileParser):
16+
TYPE_MAP = {
17+
"boolean": ("bool_", "bool"),
18+
"integer": ("int64", "int8", "int16", "int32", "uint8", "uint16", "uint32", "uint64"),
19+
"number": ("float64", "float16", "float32", "decimal128", "decimal256", "halffloat", "float", "double"),
20+
"string": ("large_string", "string"),
21+
# TODO: support object type rather than coercing to string
22+
"object": (
23+
"struct",
24+
"large_string",
25+
),
26+
# TODO: support array type rather than coercing to string
27+
"array": ("large_string",),
28+
"null": ("large_string",),
29+
}
30+
1631
def __init__(self, *args: Any, **kwargs: Any) -> None:
1732
super().__init__(*args, **kwargs)
1833
self.format_model = None
@@ -45,8 +60,9 @@ def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str,
4560
"unexpected_field_behavior": self.format.unexpected_field_behavior,
4661
}
4762
if json_schema:
48-
parse_options["explicit_schema"] = pa.schema(self.json_schema_to_pyarrow_schema(json_schema))
49-
63+
schema = self.json_schema_to_pyarrow_schema(json_schema)
64+
schema = pa.schema({field: type_ for field, type_ in schema.items() if type_ not in self.NON_SCALAR_TYPES.values()})
65+
parse_options["explicit_schema"] = schema
5066
return parse_options
5167

5268
def _read_table(self, file: Union[TextIO, BinaryIO], json_schema: Mapping[str, Any] = None) -> pa.Table:
@@ -60,8 +76,16 @@ def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> Mapping[str, Any
6076
Json reader support multi thread hence, donot need to add external process
6177
https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html
6278
"""
79+
80+
def field_type_to_str(type_: Any) -> str:
81+
if isinstance(type_, pa.lib.StructType):
82+
return "struct"
83+
if isinstance(type_, pa.lib.DataType):
84+
return str(type_)
85+
raise Exception(f"Unknown PyArrow Type: {type_}")
86+
6387
table = self._read_table(file)
64-
schema_dict = {field.name: field.type for field in table.schema}
88+
schema_dict = {field.name: field_type_to_str(field.type) for field in table.schema}
6589
return self.json_schema_to_pyarrow_schema(schema_dict, reverse=True)
6690

6791
def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{"meta":{"entity": "accounts","url": "https://example.com/api/accounts/123","changed_on": "2022/04/20T16:20Z"},"payload": {"accounts": ["data", "goes","here"]}}
2+
{"meta":{"entity": "line_items","url": "https://example.com/api/accounts/123/line_items/42","changed_on": "2022/04/20T16:20Z"},"payload": {"line_items": ["data", "goes","here"]}}

airbyte-integrations/connectors/source-s3/unit_tests/test_jsonl_parser.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,12 @@ def cases(cls) -> Mapping[str, Any]:
160160
"line_checks": {},
161161
"fails": [],
162162
},
163+
"nested_json_test": {
164+
"AbstractFileParser": JsonlParser(format={"filetype": "jsonl"}),
165+
"filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_10_nested_structure.jsonl"),
166+
"num_records": 2,
167+
"inferred_schema": {"meta": "object", "payload": "object"},
168+
"line_checks": {},
169+
"fails": [],
170+
},
163171
}

airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -601,18 +601,30 @@ def test_filepath_iterator(self, bucket, path_prefix, list_v2_objects, expected_
601601
FileInfo(last_modified=datetime(2022, 6, 7, 8, 9, 10), key="second", size=128),
602602
],
603603
[
604-
{"pk": "string", "full_name": "string", "street_address": "string", "customer_code": "integer", "email": "string",
605-
"dob": "string"},
606-
{"pk": "integer", "full_name": "string", "street_address": "string", "customer_code": "integer", "email": "string",
607-
"dob": "string"}
604+
{
605+
"pk": "string",
606+
"full_name": "string",
607+
"street_address": "string",
608+
"customer_code": "integer",
609+
"email": "string",
610+
"dob": "string",
611+
},
612+
{
613+
"pk": "integer",
614+
"full_name": "string",
615+
"street_address": "string",
616+
"customer_code": "integer",
617+
"email": "string",
618+
"dob": "string",
619+
},
608620
],
609621
{
610622
"pk": "string",
611623
"full_name": "string",
612624
"street_address": "string",
613625
"customer_code": "integer",
614626
"email": "string",
615-
"dob": "string"
627+
"dob": "string",
616628
},
617629
True,
618630
False,

docs/integrations/sources/s3.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo
205205
206206
| Version | Date | Pull Request | Subject |
207207
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------|
208+
| 0.1.20 | 2022-09-12 | [16607](https://github.com/airbytehq/airbyte/pull/16607) | Fix for reading jsonl files containing nested structures |
208209
| 0.1.19 | 2022-09-13 | [16631](https://github.com/airbytehq/airbyte/pull/16631) | Adjust column type to a broadest one when merging two or more json schemas |
209210
| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. |
210211
| 0.1.17 | 2022-07-21 | [14911](https://github.com/airbytehq/airbyte/pull/14911) | "decimal" type added for parquet |

0 commit comments

Comments
 (0)