Skip to content

Commit b34d328

Browse files
davydov-doctavia-squidington-iii
authored andcommitted
Source S3: use AirbyteTracedException (#18602)
* #750 # 837 #904 Source S3: use AirbyteTracedException * source s3: upd changelog * auto-bump connector version Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 9d94a06 commit b34d328

File tree

16 files changed

+105
-30
lines changed

16 files changed

+105
-30
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1050,7 +1050,7 @@
10501050
- name: S3
10511051
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
10521052
dockerRepository: airbyte/source-s3
1053-
dockerImageTag: 0.1.24
1053+
dockerImageTag: 0.1.25
10541054
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
10551055
icon: s3.svg
10561056
sourceType: file

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10310,7 +10310,7 @@
1031010310
supportsNormalization: false
1031110311
supportsDBT: false
1031210312
supported_destination_sync_modes: []
10313-
- dockerImage: "airbyte/source-s3:0.1.24"
10313+
- dockerImage: "airbyte/source-s3:0.1.25"
1031410314
spec:
1031510315
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
1031610316
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"

airbyte-integrations/connectors/source-s3/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
1717
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1818
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1919

20-
LABEL io.airbyte.version=0.1.24
20+
LABEL io.airbyte.version=0.1.25
2121
LABEL io.airbyte.name=airbyte/source-s3

airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"aws_access_key_id": "123456",
77
"aws_secret_access_key": "123456key",
88
"path_prefix": "",
9-
"endpoint": "http://10.0.92.4:9000"
9+
"endpoint": "http://10.0.56.135:9000"
1010
},
1111
"format": {
1212
"filetype": "csv"

airbyte-integrations/connectors/source-s3/integration_tests/integration_test_abstract.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,9 @@ def _stream_records_test_logic(
128128
for file_dict in stream_slice["files"]:
129129
# TODO: if we ever test other filetypes in these tests this will need fixing
130130
file_reader = CsvParser(format)
131-
with file_dict["storage_file"].open(file_reader.is_binary) as f:
132-
expected_columns.extend(list(file_reader.get_inferred_schema(f).keys()))
131+
storage_file = file_dict["storage_file"]
132+
with storage_file.open(file_reader.is_binary) as f:
133+
expected_columns.extend(list(file_reader.get_inferred_schema(f, storage_file.file_info).keys()))
133134
expected_columns = set(expected_columns) # de-dupe
134135

135136
for record in fs.read_records(sync_mode, stream_slice=stream_slice):
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import List, Optional, Union
6+
7+
from airbyte_cdk.models import FailureType
8+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
9+
10+
from .source_files_abstract.file_info import FileInfo
11+
12+
13+
class S3Exception(AirbyteTracedException):
14+
def __init__(
15+
self,
16+
file_info: Union[List[FileInfo], FileInfo],
17+
internal_message: Optional[str] = None,
18+
message: Optional[str] = None,
19+
failure_type: FailureType = FailureType.system_error,
20+
exception: BaseException = None,
21+
):
22+
file_info = (
23+
file_info
24+
if isinstance(file_info, (list, tuple))
25+
else [
26+
file_info,
27+
]
28+
)
29+
file_names = ", ".join([file.key for file in file_info])
30+
user_friendly_message = f"""
31+
The connector encountered an error while processing the file(s): {file_names}.
32+
{message}
33+
This can be an input configuration error as well, please double check your connection settings.
34+
"""
35+
super().__init__(internal_message=internal_message, message=user_friendly_message, failure_type=failure_type, exception=exception)

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,13 @@ def is_binary(self) -> bool:
4545
"""
4646

4747
@abstractmethod
48-
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
48+
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> dict:
4949
"""
5050
Override this with format-specifc logic to infer the schema of file
5151
Note: needs to return inferred schema with JsonSchema datatypes
5252
5353
:param file: file-like object (opened via StorageFile)
54+
:param file_info: file metadata
5455
:return: mapping of {columns:datatypes} where datatypes are JsonSchema types
5556
"""
5657

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/avro_parser.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import fastavro
88
from fastavro import reader
9+
from source_s3.source_files_abstract.file_info import FileInfo
910

1011
from .abstract_file_parser import AbstractFileParser
1112

@@ -69,18 +70,20 @@ def _get_avro_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
6970
else:
7071
return schema
7172

72-
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
73+
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> dict:
7374
"""Return schema
7475
:param file: file-like object (opened via StorageFile)
76+
:param file_info: file metadata
7577
:return: mapping of {columns:datatypes} where datatypes are JsonSchema types
7678
"""
7779
avro_schema = self._get_avro_schema(file)
7880
schema_dict = self._parse_data_type(data_type_mapping, avro_schema)
7981
return schema_dict
8082

81-
def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
83+
def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Iterator[Mapping[str, Any]]:
8284
"""Stream the data using a generator
8385
:param file: file-like object (opened via StorageFile)
86+
:param file_info: file metadata
8487
:yield: data record as a mapping of {columns:values}
8588
"""
8689
avro_reader = reader(file)

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import pyarrow as pa
1212
import six # type: ignore[import]
1313
from pyarrow import csv as pa_csv
14+
from source_s3.exceptions import S3Exception
15+
from source_s3.source_files_abstract.file_info import FileInfo
1416
from source_s3.utils import get_value_or_json_if_empty_string, run_in_external_process
1517

1618
from .abstract_file_parser import AbstractFileParser
@@ -20,6 +22,19 @@
2022
TMP_FOLDER = tempfile.mkdtemp()
2123

2224

25+
def wrap_exception(exceptions: Tuple[type, ...]):
26+
def wrapper(fn: callable):
27+
def inner(self, file: Union[TextIO, BinaryIO], file_info: FileInfo):
28+
try:
29+
return fn(self, file, file_info)
30+
except exceptions as e:
31+
raise S3Exception(file_info, str(e), str(e), exception=e)
32+
33+
return inner
34+
35+
return wrapper
36+
37+
2338
class CsvParser(AbstractFileParser):
2439
def __init__(self, *args: Any, **kwargs: Any) -> None:
2540
super().__init__(*args, **kwargs)
@@ -74,7 +89,8 @@ def _convert_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str
7489
**json.loads(additional_reader_options),
7590
}
7691

77-
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> Mapping[str, Any]:
92+
@wrap_exception((ValueError,))
93+
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Mapping[str, Any]:
7894
"""
7995
https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html
8096
This now uses multiprocessing in order to timeout the schema inference as it can hang.
@@ -146,7 +162,8 @@ def _get_schema_dict_without_inference(self, file: Union[TextIO, BinaryIO]) -> M
146162
field_names = next(reader)
147163
return {field_name.strip(): pyarrow.string() for field_name in field_names}
148164

149-
def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
165+
@wrap_exception((ValueError,))
166+
def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Iterator[Mapping[str, Any]]:
150167
"""
151168
https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html
152169
PyArrow returns lists of values for each column so we zip() these up into records which we then yield

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import pyarrow as pa
99
from pyarrow import json as pa_json
10+
from source_s3.source_files_abstract.file_info import FileInfo
1011

1112
from .abstract_file_parser import AbstractFileParser
1213
from .jsonl_spec import JsonlFormat
@@ -73,7 +74,7 @@ def _read_table(self, file: Union[TextIO, BinaryIO], json_schema: Mapping[str, A
7374
file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema))
7475
)
7576

76-
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> Mapping[str, Any]:
77+
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Mapping[str, Any]:
7778
"""
7879
https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html
7980
Json reader support multi thread hence, donot need to add external process
@@ -93,7 +94,7 @@ def field_type_to_str(type_: Any) -> str:
9394
schema_dict = {field.name: field_type_to_str(field.type) for field in table.schema}
9495
return self.json_schema_to_pyarrow_schema(schema_dict, reverse=True)
9596

96-
def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
97+
def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Iterator[Mapping[str, Any]]:
9798
"""
9899
https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html
99100

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
from typing import Any, BinaryIO, Iterator, List, Mapping, TextIO, Tuple, Union
66

77
import pyarrow.parquet as pq
8+
from airbyte_cdk.models import FailureType
89
from pyarrow.parquet import ParquetFile
10+
from source_s3.exceptions import S3Exception
11+
from source_s3.source_files_abstract.file_info import FileInfo
912

1013
from .abstract_file_parser import AbstractFileParser
1114
from .parquet_spec import ParquetFormat
@@ -85,7 +88,7 @@ def convert_field_data(logical_type: str, field_value: Any) -> Any:
8588
return func(field_value) if func else field_value
8689
raise TypeError(f"unsupported field type: {logical_type}, value: {field_value}")
8790

88-
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
91+
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> dict:
8992
"""
9093
https://arrow.apache.org/docs/python/parquet.html#finer-grained-reading-and-writing
9194
@@ -97,10 +100,10 @@ def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
97100
}
98101
if not schema_dict:
99102
# pyarrow can parse empty parquet files but a connector can't generate dynamic schema
100-
raise OSError("empty Parquet file")
103+
raise S3Exception(file_info, "empty Parquet file", "The .parquet file is empty!", FailureType.config_error)
101104
return schema_dict
102105

103-
def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
106+
def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Iterator[Mapping[str, Any]]:
104107
"""
105108
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html
106109
PyArrow reads streaming batches from a Parquet file
@@ -116,7 +119,7 @@ def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str,
116119
}
117120
if not reader.schema:
118121
# pyarrow can parse empty parquet files but a connector can't generate dynamic schema
119-
raise OSError("empty Parquet file")
122+
raise S3Exception(file_info, "empty Parquet file", "The .parquet file is empty!", FailureType.config_error)
120123

121124
args = self._select_options("columns", "batch_size") # type: ignore[arg-type]
122125
self.logger.debug(f"Found the {reader.num_row_groups} Parquet groups")

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union
1313

1414
from airbyte_cdk.logger import AirbyteLogger
15+
from airbyte_cdk.models import FailureType
1516
from airbyte_cdk.models.airbyte_protocol import SyncMode
1617
from airbyte_cdk.sources.streams import Stream
1718
from wcmatch.glob import GLOBSTAR, SPLIT, globmatch
1819

20+
from ..exceptions import S3Exception
1921
from .file_info import FileInfo
2022
from .formats.abstract_file_parser import AbstractFileParser
2123
from .formats.avro_parser import AvroParser
@@ -221,14 +223,16 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
221223

222224
file_reader = self.fileformatparser_class(self._format)
223225

226+
processed_files = []
224227
for file_info in self.get_time_ordered_file_infos():
225228
# skip this file if it's earlier than min_datetime
226229
if (min_datetime is not None) and (file_info.last_modified < min_datetime):
227230
continue
228231

229232
storagefile = self.storagefile_class(file_info, self._provider)
230233
with storagefile.open(file_reader.is_binary) as f:
231-
this_schema = file_reader.get_inferred_schema(f)
234+
this_schema = file_reader.get_inferred_schema(f, file_info)
235+
processed_files.append(file_info)
232236

233237
if this_schema == master_schema:
234238
continue # exact schema match so go to next file
@@ -249,15 +253,18 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
249253
master_schema[col] = broadest_of_types
250254
if override_type or type_explicitly_defined:
251255
LOGGER.warn(
252-
f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. "
256+
f"Detected mismatched datatype on column '{col}', in file '{file_info}'. "
253257
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'. "
254258
+ f"Airbyte will attempt to coerce this to {master_schema[col]} on read."
255259
)
256260
continue
257261
# otherwise throw an error on mismatching datatypes
258-
raise RuntimeError(
259-
f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. "
260-
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'."
262+
raise S3Exception(
263+
processed_files,
264+
"Column type mismatch",
265+
f"Detected mismatched datatype on column '{col}', in file '{file_info}'. "
266+
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'.",
267+
failure_type=FailureType.config_error,
261268
)
262269

263270
# missing columns in this_schema doesn't affect our master_schema, so we don't check for it here
@@ -343,7 +350,7 @@ def _read_from_slice(
343350
storage_file: StorageFile = file_item["storage_file"]
344351
with storage_file.open(file_reader.is_binary) as f:
345352
# TODO: make this more efficient than mutating every record one-by-one as they stream
346-
for record in file_reader.stream_records(f):
353+
for record in file_reader.stream_records(f, storage_file.file_info):
347354
schema_matched_record = self._match_target_schema(record, list(self._get_schema_map().keys()))
348355
complete_record = self._add_extra_fields_from_map(
349356
schema_matched_record,

airbyte-integrations/connectors/source-s3/unit_tests/abstract_test_parser.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,25 +132,28 @@ def _get_readmode(self, file_info: Mapping[str, Any]) -> str:
132132

133133
@memory_limit(1024)
134134
def test_suite_inferred_schema(self, file_info: Mapping[str, Any]) -> None:
135+
file_info_instance = FileInfo(key=file_info["filepath"], size=os.stat(file_info["filepath"]).st_size, last_modified=datetime.now())
135136
with smart_open(file_info["filepath"], self._get_readmode(file_info)) as f:
136137
if "test_get_inferred_schema" in file_info["fails"]:
137138
with pytest.raises(Exception) as e_info:
138-
file_info["AbstractFileParser"].get_inferred_schema(f)
139+
file_info["AbstractFileParser"].get_inferred_schema(f), file_info_instance
139140
self.logger.debug(str(e_info))
140141
else:
141-
assert file_info["AbstractFileParser"].get_inferred_schema(f) == file_info["inferred_schema"]
142+
assert file_info["AbstractFileParser"].get_inferred_schema(f, file_info_instance) == file_info["inferred_schema"]
142143

143144
@memory_limit(1024)
144145
def test_stream_suite_records(self, file_info: Mapping[str, Any]) -> None:
145146
filepath = file_info["filepath"]
146-
self.logger.info(f"read the file: {filepath}, size: {os.stat(filepath).st_size / (1024 ** 2)}Mb")
147+
file_size = os.stat(filepath).st_size
148+
file_info_instance = FileInfo(key=filepath, size=file_size, last_modified=datetime.now())
149+
self.logger.info(f"read the file: {filepath}, size: {file_size / (1024 ** 2)}Mb")
147150
with smart_open(filepath, self._get_readmode(file_info)) as f:
148151
if "test_stream_records" in file_info["fails"]:
149152
with pytest.raises(Exception) as e_info:
150-
[print(r) for r in file_info["AbstractFileParser"].stream_records(f)]
153+
[print(r) for r in file_info["AbstractFileParser"].stream_records(f, file_info_instance)]
151154
self.logger.debug(str(e_info))
152155
else:
153-
records = [r for r in file_info["AbstractFileParser"].stream_records(f)]
156+
records = [r for r in file_info["AbstractFileParser"].stream_records(f, file_info_instance)]
154157

155158
assert len(records) == file_info["num_records"]
156159
for index, expected_record in file_info["line_checks"].items():

airbyte-integrations/connectors/source-s3/unit_tests/test_csv_parser.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
from pathlib import Path
1111
from typing import Any, List, Mapping, Tuple
1212

13+
import pendulum
1314
import pytest
1415
from smart_open import open as smart_open
16+
from source_s3.source_files_abstract.file_info import FileInfo
1517
from source_s3.source_files_abstract.formats.csv_parser import CsvParser
1618

1719
from .abstract_test_parser import AbstractTestParser, memory_limit
@@ -403,7 +405,7 @@ def test_big_file(self) -> None:
403405
next(expected_file)
404406
read_count = 0
405407
with smart_open(filepath, self._get_readmode({"AbstractFileParser": parser})) as f:
406-
for record in parser.stream_records(f):
408+
for record in parser.stream_records(f, FileInfo(key=filepath, size=file_size, last_modified=pendulum.now())):
407409
record_line = ",".join("" if v is None else str(v) for v in record.values())
408410
expected_line = next(expected_file).strip("\n")
409411
assert record_line == expected_line

airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import pytest
1010
from airbyte_cdk import AirbyteLogger
1111
from airbyte_cdk.models import SyncMode
12+
from source_s3.exceptions import S3Exception
1213
from source_s3.source_files_abstract.file_info import FileInfo
1314
from source_s3.source_files_abstract.storagefile import StorageFile
1415
from source_s3.source_files_abstract.stream import IncrementalFileStream
@@ -642,7 +643,7 @@ def test_master_schema(
642643
dataset="dummy", provider={}, format={"filetype": "csv"}, schema=user_schema, path_pattern="**/prefix*.csv"
643644
)
644645
if error_expected:
645-
with pytest.raises(RuntimeError):
646+
with pytest.raises(S3Exception):
646647
stream_instance._get_master_schema(min_datetime=min_datetime)
647648
else:
648649
assert stream_instance._get_master_schema(min_datetime=min_datetime) == expected_schema

0 commit comments

Comments
 (0)