Skip to content

Commit a47a172

Browse files
davydov-djhammarstedt
authored andcommitted
Source S3: choose between data types when merging master schema (airbytehq#16631)
* airbytehq#422 source s3: choose broadest data type when there is a mismatch during merging json schemas * airbytehq#422 source s3: upd changelog
1 parent 16252b3 commit a47a172

File tree

4 files changed

+58
-13
lines changed

4 files changed

+58
-13
lines changed

airbyte-integrations/connectors/source-s3/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
1717
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1818
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1919

20-
LABEL io.airbyte.version=0.1.18
20+
LABEL io.airbyte.version=0.1.19
2121
LABEL io.airbyte.name=airbyte/source-s3

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,21 @@ def get_json_schema(self) -> Mapping[str, Any]:
185185
properties[self.ab_last_mod_col]["format"] = "date-time"
186186
return {"type": "object", "properties": properties}
187187

188+
@staticmethod
189+
def _broadest_type(type_1: str, type_2: str) -> Optional[str]:
190+
non_comparable_types = ["object", "array", "null"]
191+
if type_1 in non_comparable_types or type_2 in non_comparable_types:
192+
return None
193+
types = {type_1, type_2}
194+
if types == {"boolean", "string"}:
195+
return "string"
196+
if types == {"integer", "number"}:
197+
return "number"
198+
if types == {"integer", "string"}:
199+
return "string"
200+
if types == {"number", "string"}:
201+
return "string"
202+
188203
def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
189204
"""
190205
In order to auto-infer a schema across many files and/or allow for additional properties (columns),
@@ -223,22 +238,27 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
223238
# this compares datatype of every column that the two schemas have in common
224239
for col in column_superset:
225240
if (col in master_schema.keys()) and (col in this_schema.keys()) and (master_schema[col] != this_schema[col]):
226-
# If this column exists in a provided schema or schema state, we'll WARN here rather than throw an error
227-
# this is to allow more leniency as we may be able to coerce this datatype mismatch on read according to
228-
# provided schema state. If not, then the read will error anyway
229-
if col in self._schema.keys():
241+
# If this column exists in a provided schema or schema state, we'll WARN here rather than throw an error.
242+
# This is to allow more leniency as we may be able to coerce this datatype mismatch on read according to
243+
# provided schema state. Else we're inferring the schema (or at least this column) from scratch, and therefore
244+
# we try to choose the broadest type among two if possible
245+
broadest_of_types = self._broadest_type(master_schema[col], this_schema[col])
246+
type_explicitly_defined = col in self._schema.keys()
247+
override_type = broadest_of_types and not type_explicitly_defined
248+
if override_type:
249+
master_schema[col] = broadest_of_types
250+
if override_type or type_explicitly_defined:
230251
LOGGER.warn(
231252
f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. "
232253
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'. "
233254
+ f"Airbyte will attempt to coerce this to {master_schema[col]} on read."
234255
)
235-
# else we're inferring the schema (or at least this column) from scratch and therefore
236-
# throw an error on mismatching datatypes
237-
else:
238-
raise RuntimeError(
239-
f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. "
240-
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'."
241-
)
256+
continue
257+
# otherwise throw an error on mismatching datatypes
258+
raise RuntimeError(
259+
f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. "
260+
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'."
261+
)
242262

243263
# missing columns in this_schema doesn't affect our master_schema, so we don't check for it here
244264

airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,30 @@ def test_filepath_iterator(self, bucket, path_prefix, list_v2_objects, expected_
593593
False,
594594
False,
595595
),
596+
( # int becomes str in case of type mismatch in different files
597+
"{}",
598+
datetime(2020, 5, 5, 13, 5, 5),
599+
[
600+
FileInfo(last_modified=datetime(2022, 1, 1, 13, 5, 5), key="first", size=128),
601+
FileInfo(last_modified=datetime(2022, 6, 7, 8, 9, 10), key="second", size=128),
602+
],
603+
[
604+
{"pk": "string", "full_name": "string", "street_address": "string", "customer_code": "integer", "email": "string",
605+
"dob": "string"},
606+
{"pk": "integer", "full_name": "string", "street_address": "string", "customer_code": "integer", "email": "string",
607+
"dob": "string"}
608+
],
609+
{
610+
"pk": "string",
611+
"full_name": "string",
612+
"street_address": "string",
613+
"customer_code": "integer",
614+
"email": "string",
615+
"dob": "string"
616+
},
617+
True,
618+
False,
619+
),
596620
),
597621
)
598622
@patch("source_s3.stream.IncrementalFileStreamS3.storagefile_class", MagicMock())

docs/integrations/sources/s3.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo
205205
206206
| Version | Date | Pull Request | Subject |
207207
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------|
208-
| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. |
208+
| 0.1.19 | 2022-09-13 | [16631](https://github.com/airbytehq/airbyte/pull/16631) | Adjust column type to a broadest one when merging two or more json schemas |
209+
| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. |
209210
| 0.1.17 | 2022-07-21 | [14911](https://github.com/airbytehq/airbyte/pull/14911) | "decimal" type added for parquet |
210211
| 0.1.16 | 2022-07-13 | [14669](https://github.com/airbytehq/airbyte/pull/14669) | Fixed bug when extra columns apeared to be non-present in master schema |
211212
| 0.1.15 | 2022-05-31 | [12568](https://github.com/airbytehq/airbyte/pull/12568) | Fixed possible case of files being missed during incremental syncs |

0 commit comments

Comments
 (0)