Skip to content

Commit 6a40ac5

Browse files
Source S3: use AirbyteTracedException (#18602)
* #750 # 837 #904 Source S3: use AirbyteTracedException * source s3: upd changelog * auto-bump connector version Co-authored-by: Octavia Squidington III <[email protected]>
1 parent cd03c04 commit 6a40ac5

File tree

16 files changed

+105
-30
lines changed

16 files changed

+105
-30
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1050,7 +1050,7 @@
10501050
- name: S3
10511051
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
10521052
dockerRepository: airbyte/source-s3
1053-
dockerImageTag: 0.1.24
1053+
dockerImageTag: 0.1.25
10541054
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
10551055
icon: s3.svg
10561056
sourceType: file

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10310,7 +10310,7 @@
1031010310
supportsNormalization: false
1031110311
supportsDBT: false
1031210312
supported_destination_sync_modes: []
10313-
- dockerImage: "airbyte/source-s3:0.1.24"
10313+
- dockerImage: "airbyte/source-s3:0.1.25"
1031410314
spec:
1031510315
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
1031610316
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"

airbyte-integrations/connectors/source-s3/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
1717
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1818
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1919

20-
LABEL io.airbyte.version=0.1.24
20+
LABEL io.airbyte.version=0.1.25
2121
LABEL io.airbyte.name=airbyte/source-s3

airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"aws_access_key_id": "123456",
77
"aws_secret_access_key": "123456key",
88
"path_prefix": "",
9-
"endpoint": "http://10.0.92.4:9000"
9+
"endpoint": "http://10.0.56.135:9000"
1010
},
1111
"format": {
1212
"filetype": "csv"

airbyte-integrations/connectors/source-s3/integration_tests/integration_test_abstract.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,9 @@ def _stream_records_test_logic(
128128
for file_dict in stream_slice["files"]:
129129
# TODO: if we ever test other filetypes in these tests this will need fixing
130130
file_reader = CsvParser(format)
131-
with file_dict["storage_file"].open(file_reader.is_binary) as f:
132-
expected_columns.extend(list(file_reader.get_inferred_schema(f).keys()))
131+
storage_file = file_dict["storage_file"]
132+
with storage_file.open(file_reader.is_binary) as f:
133+
expected_columns.extend(list(file_reader.get_inferred_schema(f, storage_file.file_info).keys()))
133134
expected_columns = set(expected_columns) # de-dupe
134135

135136
for record in fs.read_records(sync_mode, stream_slice=stream_slice):
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import List, Optional, Union
6+
7+
from airbyte_cdk.models import FailureType
8+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
9+
10+
from .source_files_abstract.file_info import FileInfo
11+
12+
13+
class S3Exception(AirbyteTracedException):
14+
def __init__(
15+
self,
16+
file_info: Union[List[FileInfo], FileInfo],
17+
internal_message: Optional[str] = None,
18+
message: Optional[str] = None,
19+
failure_type: FailureType = FailureType.system_error,
20+
exception: BaseException = None,
21+
):
22+
file_info = (
23+
file_info
24+
if isinstance(file_info, (list, tuple))
25+
else [
26+
file_info,
27+
]
28+
)
29+
file_names = ", ".join([file.key for file in file_info])
30+
user_friendly_message = f"""
31+
The connector encountered an error while processing the file(s): {file_names}.
32+
{message}
33+
This can be an input configuration error as well, please double check your connection settings.
34+
"""
35+
super().__init__(internal_message=internal_message, message=user_friendly_message, failure_type=failure_type, exception=exception)

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,13 @@ def is_binary(self) -> bool:
4545
"""
4646

4747
@abstractmethod
48-
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
48+
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> dict:
4949
"""
5050
Override this with format-specifc logic to infer the schema of file
5151
Note: needs to return inferred schema with JsonSchema datatypes
5252
5353
:param file: file-like object (opened via StorageFile)
54+
:param file_info: file metadata
5455
:return: mapping of {columns:datatypes} where datatypes are JsonSchema types
5556
"""
5657

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/avro_parser.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import fastavro
88
from fastavro import reader
9+
from source_s3.source_files_abstract.file_info import FileInfo
910

1011
from .abstract_file_parser import AbstractFileParser
1112

@@ -69,18 +70,20 @@ def _get_avro_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
6970
else:
7071
return schema
7172

72-
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
73+
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> dict:
7374
"""Return schema
7475
:param file: file-like object (opened via StorageFile)
76+
:param file_info: file metadata
7577
:return: mapping of {columns:datatypes} where datatypes are JsonSchema types
7678
"""
7779
avro_schema = self._get_avro_schema(file)
7880
schema_dict = self._parse_data_type(data_type_mapping, avro_schema)
7981
return schema_dict
8082

81-
def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
83+
def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Iterator[Mapping[str, Any]]:
8284
"""Stream the data using a generator
8385
:param file: file-like object (opened via StorageFile)
86+
:param file_info: file metadata
8487
:yield: data record as a mapping of {columns:values}
8588
"""
8689
avro_reader = reader(file)

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import pyarrow as pa
1212
import six # type: ignore[import]
1313
from pyarrow import csv as pa_csv
14+
from source_s3.exceptions import S3Exception
15+
from source_s3.source_files_abstract.file_info import FileInfo
1416
from source_s3.utils import get_value_or_json_if_empty_string, run_in_external_process
1517

1618
from .abstract_file_parser import AbstractFileParser
@@ -20,6 +22,19 @@
2022
TMP_FOLDER = tempfile.mkdtemp()
2123

2224

25+
def wrap_exception(exceptions: Tuple[type, ...]):
26+
def wrapper(fn: callable):
27+
def inner(self, file: Union[TextIO, BinaryIO], file_info: FileInfo):
28+
try:
29+
return fn(self, file, file_info)
30+
except exceptions as e:
31+
raise S3Exception(file_info, str(e), str(e), exception=e)
32+
33+
return inner
34+
35+
return wrapper
36+
37+
2338
class CsvParser(AbstractFileParser):
2439
def __init__(self, *args: Any, **kwargs: Any) -> None:
2540
super().__init__(*args, **kwargs)
@@ -74,7 +89,8 @@ def _convert_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str
7489
**json.loads(additional_reader_options),
7590
}
7691

77-
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> Mapping[str, Any]:
92+
@wrap_exception((ValueError,))
93+
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Mapping[str, Any]:
7894
"""
7995
https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html
8096
This now uses multiprocessing in order to timeout the schema inference as it can hang.
@@ -146,7 +162,8 @@ def _get_schema_dict_without_inference(self, file: Union[TextIO, BinaryIO]) -> M
146162
field_names = next(reader)
147163
return {field_name.strip(): pyarrow.string() for field_name in field_names}
148164

149-
def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
165+
@wrap_exception((ValueError,))
166+
def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Iterator[Mapping[str, Any]]:
150167
"""
151168
https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html
152169
PyArrow returns lists of values for each column so we zip() these up into records which we then yield

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import pyarrow as pa
99
from pyarrow import json as pa_json
10+
from source_s3.source_files_abstract.file_info import FileInfo
1011

1112
from .abstract_file_parser import AbstractFileParser
1213
from .jsonl_spec import JsonlFormat
@@ -73,7 +74,7 @@ def _read_table(self, file: Union[TextIO, BinaryIO], json_schema: Mapping[str, A
7374
file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema))
7475
)
7576

76-
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> Mapping[str, Any]:
77+
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Mapping[str, Any]:
7778
"""
7879
https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html
7980
Json reader support multi thread hence, donot need to add external process
@@ -93,7 +94,7 @@ def field_type_to_str(type_: Any) -> str:
9394
schema_dict = {field.name: field_type_to_str(field.type) for field in table.schema}
9495
return self.json_schema_to_pyarrow_schema(schema_dict, reverse=True)
9596

96-
def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
97+
def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Iterator[Mapping[str, Any]]:
9798
"""
9899
https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html
99100

0 commit comments

Comments
 (0)