Skip to content

Source S3: fix reading jsonl files with nested data #16607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.18
dockerImageTag: 0.1.20
documentationUrl: https://docs.airbyte.io/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8868,7 +8868,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:0.1.18"
- dockerImage: "airbyte/source-s3:0.1.20"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.io/integrations/sources/s3"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://10.0.3.185:9000"
"endpoint": "http://10.0.252.11:9000"
},
"format": {
"filetype": "csv"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@
class AbstractFileParser(ABC):
logger = AirbyteLogger()

NON_SCALAR_TYPES = {"struct": "struct"}
TYPE_MAP = {
"boolean": ("bool_", "bool"),
"integer": ("int64", "int8", "int16", "int32", "uint8", "uint16", "uint32", "uint64"),
"number": ("float64", "float16", "float32", "decimal128", "decimal256", "halffloat", "float", "double"),
"string": ("large_string", "string"),
# TODO: support object type rather than coercing to string
"object": ("large_string",),
# TODO: support array type rather than coercing to string
"array": ("large_string",),
"null": ("large_string",),
}

def __init__(self, format: dict, master_schema: dict = None):
"""
:param format: file format specific mapping as described in spec.json
Expand Down Expand Up @@ -52,8 +65,8 @@ def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) ->
:yield: data record as a mapping of {columns:values}
"""

@staticmethod
def json_type_to_pyarrow_type(typ: str, reverse: bool = False, logger: AirbyteLogger = AirbyteLogger()) -> str:
@classmethod
def json_type_to_pyarrow_type(cls, typ: str, reverse: bool = False, logger: AirbyteLogger = AirbyteLogger()) -> str:
"""
Converts Json Type to PyArrow types to (or the other way around if reverse=True)

Expand All @@ -63,35 +76,28 @@ def json_type_to_pyarrow_type(typ: str, reverse: bool = False, logger: AirbyteLo
:return: PyArrow type if reverse is False, else Json type
"""
str_typ = str(typ)
# this is a map of airbyte types to pyarrow types. The first list element of the pyarrow types should be the one to use where required.
map = {
"boolean": ("bool_", "bool"),
"integer": ("int64", "int8", "int16", "int32", "uint8", "uint16", "uint32", "uint64"),
"number": ("float64", "float16", "float32", "decimal128", "decimal256", "halffloat", "float", "double"),
"string": ("large_string", "string"),
# TODO: support object type rather than coercing to string
"object": ("large_string",),
# TODO: support array type rather than coercing to string
"array": ("large_string",),
"null": ("large_string",),
}
# This is a map of airbyte types to pyarrow types.
# The first list element of the pyarrow types should be the one to use where required.

if not reverse:
for json_type, pyarrow_types in map.items():
for json_type, pyarrow_types in cls.TYPE_MAP.items():
if str_typ.lower() == json_type:
return str(
getattr(pa, pyarrow_types[0]).__call__()
) # better way might be necessary when we decide to handle more type complexity
type_ = next(iter(pyarrow_types))
if type_ in cls.NON_SCALAR_TYPES:
return cls.NON_SCALAR_TYPES[type_]
# better way might be necessary when we decide to handle more type complexity
return str(getattr(pa, type_).__call__())
logger.debug(f"JSON type '{str_typ}' is not mapped, falling back to default conversion to large_string")
return str(pa.large_string())
else:
for json_type, pyarrow_types in map.items():
for json_type, pyarrow_types in cls.TYPE_MAP.items():
if any(str_typ.startswith(pa_type) for pa_type in pyarrow_types):
return json_type
logger.debug(f"PyArrow type '{str_typ}' is not mapped, falling back to default conversion to string")
return "string" # default type if unspecified in map

@staticmethod
def json_schema_to_pyarrow_schema(schema: Mapping[str, Any], reverse: bool = False) -> Mapping[str, Any]:
@classmethod
def json_schema_to_pyarrow_schema(cls, schema: Mapping[str, Any], reverse: bool = False) -> Mapping[str, Any]:
"""
Converts a schema with JsonSchema datatypes to one with PyArrow types (or the other way if reverse=True)
This utilises json_type_to_pyarrow_type() to convert each datatype
Expand All @@ -100,4 +106,4 @@ def json_schema_to_pyarrow_schema(schema: Mapping[str, Any], reverse: bool = Fal
:param reverse: switch to True for PyArrow schema -> Json schema, defaults to False
:return: converted schema dict
"""
return {column: AbstractFileParser.json_type_to_pyarrow_type(json_type, reverse=reverse) for column, json_type in schema.items()}
return {column: cls.json_type_to_pyarrow_type(json_type, reverse=reverse) for column, json_type in schema.items()}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@


class JsonlParser(AbstractFileParser):
TYPE_MAP = {
"boolean": ("bool_", "bool"),
"integer": ("int64", "int8", "int16", "int32", "uint8", "uint16", "uint32", "uint64"),
"number": ("float64", "float16", "float32", "decimal128", "decimal256", "halffloat", "float", "double"),
"string": ("large_string", "string"),
# TODO: support object type rather than coercing to string
"object": (
"struct",
"large_string",
),
# TODO: support array type rather than coercing to string
"array": ("large_string",),
"null": ("large_string",),
}

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.format_model = None
Expand Down Expand Up @@ -45,8 +60,9 @@ def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str,
"unexpected_field_behavior": self.format.unexpected_field_behavior,
}
if json_schema:
parse_options["explicit_schema"] = pa.schema(self.json_schema_to_pyarrow_schema(json_schema))

schema = self.json_schema_to_pyarrow_schema(json_schema)
schema = pa.schema({field: type_ for field, type_ in schema.items() if type_ not in self.NON_SCALAR_TYPES.values()})
parse_options["explicit_schema"] = schema
return parse_options

def _read_table(self, file: Union[TextIO, BinaryIO], json_schema: Mapping[str, Any] = None) -> pa.Table:
Expand All @@ -60,8 +76,16 @@ def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> Mapping[str, Any
Json reader support multi thread hence, donot need to add external process
https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html
"""

def field_type_to_str(type_: Any) -> str:
if isinstance(type_, pa.lib.StructType):
return "struct"
if isinstance(type_, pa.lib.DataType):
return str(type_)
raise Exception(f"Unknown PyArrow Type: {type_}")

table = self._read_table(file)
schema_dict = {field.name: field.type for field in table.schema}
schema_dict = {field.name: field_type_to_str(field.type) for field in table.schema}
return self.json_schema_to_pyarrow_schema(schema_dict, reverse=True)

def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"meta":{"entity": "accounts","url": "https://example.com/api/accounts/123","changed_on": "2022/04/20T16:20Z"},"payload": {"accounts": ["data", "goes","here"]}}
{"meta":{"entity": "line_items","url": "https://example.com/api/accounts/123/line_items/42","changed_on": "2022/04/20T16:20Z"},"payload": {"line_items": ["data", "goes","here"]}}
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,12 @@ def cases(cls) -> Mapping[str, Any]:
"line_checks": {},
"fails": [],
},
"nested_json_test": {
"AbstractFileParser": JsonlParser(format={"filetype": "jsonl"}),
"filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_10_nested_structure.jsonl"),
"num_records": 2,
"inferred_schema": {"meta": "object", "payload": "object"},
"line_checks": {},
"fails": [],
},
}
Original file line number Diff line number Diff line change
Expand Up @@ -601,18 +601,30 @@ def test_filepath_iterator(self, bucket, path_prefix, list_v2_objects, expected_
FileInfo(last_modified=datetime(2022, 6, 7, 8, 9, 10), key="second", size=128),
],
[
{"pk": "string", "full_name": "string", "street_address": "string", "customer_code": "integer", "email": "string",
"dob": "string"},
{"pk": "integer", "full_name": "string", "street_address": "string", "customer_code": "integer", "email": "string",
"dob": "string"}
{
"pk": "string",
"full_name": "string",
"street_address": "string",
"customer_code": "integer",
"email": "string",
"dob": "string",
},
{
"pk": "integer",
"full_name": "string",
"street_address": "string",
"customer_code": "integer",
"email": "string",
"dob": "string",
},
],
{
"pk": "string",
"full_name": "string",
"street_address": "string",
"customer_code": "integer",
"email": "string",
"dob": "string"
"dob": "string",
},
True,
False,
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------|
| 0.1.20 | 2022-09-12 | [16607](https://github.com/airbytehq/airbyte/pull/16607) | Fix for reading jsonl files containing nested structures |
| 0.1.19 | 2022-09-13 | [16631](https://github.com/airbytehq/airbyte/pull/16631) | Adjust column type to a broadest one when merging two or more json schemas |
| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. |
| 0.1.17 | 2022-07-21 | [14911](https://github.com/airbytehq/airbyte/pull/14911) | "decimal" type added for parquet |
Expand Down