Skip to content

Commit 3dc79f5

Browse files
Source S3: speed up discovery (#22500)
* #1470 source S3: speed up discovery * #1470 source s3: upd changelog * auto-bump connector version --------- Co-authored-by: Octavia Squidington III <[email protected]>
1 parent d82e01a commit 3dc79f5

File tree

5 files changed

+38
-16
lines changed

5 files changed

+38
-16
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1549,7 +1549,7 @@
15491549
- name: S3
15501550
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
15511551
dockerRepository: airbyte/source-s3
1552-
dockerImageTag: 0.1.31
1552+
dockerImageTag: 0.1.32
15531553
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
15541554
icon: s3.svg
15551555
sourceType: file

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12775,7 +12775,7 @@
1277512775
supportsNormalization: false
1277612776
supportsDBT: false
1277712777
supported_destination_sync_modes: []
12778-
- dockerImage: "airbyte/source-s3:0.1.31"
12778+
- dockerImage: "airbyte/source-s3:0.1.32"
1277912779
spec:
1278012780
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
1278112781
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"

airbyte-integrations/connectors/source-s3/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
1717
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1818
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1919

20-
LABEL io.airbyte.version=0.1.31
20+
LABEL io.airbyte.version=0.1.32
2121
LABEL io.airbyte.name=airbyte/source-s3

airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
#
44

55

6+
import concurrent.futures
67
import json
8+
import threading
79
from abc import ABC, abstractmethod
810
from copy import deepcopy
911
from datetime import datetime, timedelta
@@ -29,6 +31,7 @@
2931
JSON_TYPES = ["string", "number", "integer", "object", "array", "boolean", "null"]
3032

3133
LOGGER = AirbyteLogger()
34+
LOCK = threading.Lock()
3235

3336

3437
class ConfigurationError(Exception):
@@ -52,6 +55,7 @@ def fileformatparser_map(self) -> Mapping[str, type]:
5255
ab_file_name_col = "_ab_source_file_url"
5356
airbyte_columns = [ab_additional_col, ab_last_mod_col, ab_file_name_col]
5457
datetime_format_string = "%Y-%m-%dT%H:%M:%S%z"
58+
parallel_tasks_size = 256
5559

5660
def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str, schema: str = None):
5761
"""
@@ -202,6 +206,17 @@ def _broadest_type(type_1: str, type_2: str) -> Optional[str]:
202206
if types == {"number", "string"}:
203207
return "string"
204208

209+
@staticmethod
210+
def guess_file_schema(storage_file, file_reader, file_info, processed_files, schemas):
211+
try:
212+
with storage_file.open(file_reader.is_binary) as f:
213+
this_schema = file_reader.get_inferred_schema(f, file_info)
214+
with LOCK:
215+
schemas[file_info] = this_schema
216+
processed_files.append(file_info)
217+
except OSError:
218+
pass
219+
205220
def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
206221
"""
207222
In order to auto-infer a schema across many files and/or allow for additional properties (columns),
@@ -224,19 +239,25 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
224239
file_reader = self.fileformatparser_class(self._format)
225240

226241
processed_files = []
227-
for file_info in self.get_time_ordered_file_infos():
228-
# skip this file if it's earlier than min_datetime
229-
if (min_datetime is not None) and (file_info.last_modified < min_datetime):
230-
continue
231-
232-
storagefile = self.storagefile_class(file_info, self._provider)
233-
try:
234-
with storagefile.open(file_reader.is_binary) as f:
235-
this_schema = file_reader.get_inferred_schema(f, file_info)
236-
processed_files.append(file_info)
237-
except OSError:
238-
continue
239-
242+
schemas = {}
243+
244+
file_infos = list(self.get_time_ordered_file_infos())
245+
if min_datetime is not None:
246+
file_infos = [info for info in file_infos if info.last_modified >= min_datetime]
247+
248+
for i in range(0, len(file_infos), self.parallel_tasks_size):
249+
chunk_infos = file_infos[i : i + self.parallel_tasks_size]
250+
with concurrent.futures.ThreadPoolExecutor() as executor:
251+
executor.map(
252+
lambda args: self.guess_file_schema(*args),
253+
[
254+
(self.storagefile_class(file_info, self._provider), file_reader, file_info, processed_files, schemas)
255+
for file_info in chunk_infos
256+
],
257+
)
258+
259+
for file_info in file_infos:
260+
this_schema = schemas[file_info]
240261
if this_schema == master_schema:
241262
continue # exact schema match so go to next file
242263

docs/integrations/sources/s3.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo
209209
210210
| Version | Date | Pull Request | Subject |
211211
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------|
212+
| 0.1.32 | 2023-02-07 | [22500](https://github.com/airbytehq/airbyte/pull/22500) | Speed up discovery |
212213
| 0.1.31 | 2023-02-08 | [22550](https://github.com/airbytehq/airbyte/pull/22550) | Validate CSV read options and convert options |
213214
| 0.1.30 | 2023-01-25 | [21587](https://github.com/airbytehq/airbyte/pull/21587) | Make sure spec works as expected in UI |
214215
| 0.1.29 | 2023-01-19 | [21604](https://github.com/airbytehq/airbyte/pull/21604) | Handle OSError: skip unreachable keys and keep working on accessible ones. Warn a customer |

0 commit comments

Comments
 (0)