Skip to content

Commit c4ae774

Browse files
committed
Source S3: run full refresh syncs concurrently
1 parent d05f473 commit c4ae774

File tree

8 files changed

+60
-31
lines changed

8 files changed

+60
-31
lines changed

airbyte-integrations/connectors/source-s3/acceptance-test-config.yml

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,95 +4,95 @@ acceptance_tests:
44
- config_path: secrets/config.json
55
expect_records:
66
path: integration_tests/expected_records/csv.jsonl
7-
exact_order: true
7+
exact_order: false
88
timeout_seconds: 1800
99
file_types:
1010
skip_test: true
1111
bypass_reason: "To be testes with the last config"
1212
- config_path: secrets/config_iam_role.json
1313
expect_records:
1414
path: integration_tests/expected_records/csv.jsonl
15-
exact_order: true
15+
exact_order: false
1616
timeout_seconds: 1800
1717
file_types:
1818
skip_test: true
1919
bypass_reason: "To be testes with the last config"
2020
- config_path: secrets/v4_csv_custom_encoding_config.json
2121
expect_records:
2222
path: integration_tests/expected_records/legacy_csv_custom_encoding.jsonl
23-
exact_order: true
23+
exact_order: false
2424
timeout_seconds: 1800
2525
file_types:
2626
skip_test: true
2727
bypass_reason: "To be testes with the last config"
2828
- config_path: secrets/v4_csv_custom_format_config.json
2929
expect_records:
3030
path: integration_tests/expected_records/legacy_csv_custom_format.jsonl
31-
exact_order: true
31+
exact_order: false
3232
timeout_seconds: 1800
3333
file_types:
3434
skip_test: true
3535
bypass_reason: "To be testes with the last config"
3636
- config_path: secrets/v4_csv_user_schema_config.json
3737
expect_records:
3838
path: integration_tests/expected_records/legacy_csv_user_schema.jsonl
39-
exact_order: true
39+
exact_order: false
4040
timeout_seconds: 1800
4141
file_types:
4242
skip_test: true
4343
bypass_reason: "To be testes with the last config"
4444
- config_path: secrets/v4_csv_no_header_config.json
4545
expect_records:
4646
path: integration_tests/expected_records/legacy_csv_no_header.jsonl
47-
exact_order: true
47+
exact_order: false
4848
timeout_seconds: 1800
4949
file_types:
5050
skip_test: true
5151
bypass_reason: "To be testes with the last config"
5252
- config_path: secrets/v4_csv_skip_rows_config.json
5353
expect_records:
5454
path: integration_tests/expected_records/legacy_csv_skip_rows.jsonl
55-
exact_order: true
55+
exact_order: false
5656
timeout_seconds: 1800
5757
file_types:
5858
skip_test: true
5959
bypass_reason: "To be testes with the last config"
6060
- config_path: secrets/v4_csv_skip_rows_no_header_config.json
6161
expect_records:
6262
path: integration_tests/expected_records/legacy_csv_skip_rows_no_header.jsonl
63-
exact_order: true
63+
exact_order: false
6464
timeout_seconds: 1800
6565
file_types:
6666
skip_test: true
6767
bypass_reason: "To be testes with the last config"
6868
- config_path: secrets/v4_csv_with_nulls_config.json
6969
expect_records:
7070
path: integration_tests/expected_records/legacy_csv_with_nulls.jsonl
71-
exact_order: true
71+
exact_order: false
7272
timeout_seconds: 1800
7373
file_types:
7474
skip_test: true
7575
bypass_reason: "To be testes with the last config"
7676
- config_path: secrets/v4_csv_with_null_bools_config.json
7777
expect_records:
7878
path: integration_tests/expected_records/legacy_csv_with_null_bools.jsonl
79-
exact_order: true
79+
exact_order: false
8080
timeout_seconds: 1800
8181
file_types:
8282
skip_test: true
8383
bypass_reason: "To be testes with the last config"
8484
- config_path: secrets/v4_parquet_config.json
8585
expect_records:
8686
path: integration_tests/expected_records/parquet.jsonl
87-
exact_order: true
87+
exact_order: false
8888
timeout_seconds: 1800
8989
file_types:
9090
skip_test: true
9191
bypass_reason: "To be testes with the last config"
9292
- config_path: secrets/parquet_dataset_config.json
9393
expect_records:
9494
path: integration_tests/expected_records/parquet_dataset.jsonl
95-
exact_order: true
95+
exact_order: false
9696
timeout_seconds: 1800
9797
file_types:
9898
skip_test: true
@@ -107,71 +107,71 @@ acceptance_tests:
107107
- config_path: secrets/v4_avro_config.json
108108
expect_records:
109109
path: integration_tests/expected_records/avro.jsonl
110-
exact_order: true
110+
exact_order: false
111111
timeout_seconds: 1800
112112
file_types:
113113
skip_test: true
114114
bypass_reason: "To be testes with the last config"
115115
- config_path: secrets/v4_jsonl_config.json
116116
expect_records:
117117
path: integration_tests/expected_records/jsonl.jsonl
118-
exact_order: true
118+
exact_order: false
119119
timeout_seconds: 1800
120120
file_types:
121121
skip_test: true
122122
bypass_reason: "To be testes with the last config"
123123
- config_path: secrets/v4_jsonl_newlines_config.json
124124
expect_records:
125125
path: integration_tests/expected_records/jsonl_newlines.jsonl
126-
exact_order: true
126+
exact_order: false
127127
timeout_seconds: 1800
128128
file_types:
129129
skip_test: true
130130
bypass_reason: "To be testes with the last config"
131131
- config_path: secrets/zip_config_csv.json
132132
expect_records:
133133
path: integration_tests/expected_records/zip_csv.jsonl
134-
exact_order: true
134+
exact_order: false
135135
timeout_seconds: 1800
136136
file_types:
137137
skip_test: true
138138
bypass_reason: "To be testes with the last config"
139139
- config_path: secrets/zip_config_csv_custom_encoding.json
140140
expect_records:
141141
path: integration_tests/expected_records/zip_csv_custom_encoding.jsonl
142-
exact_order: true
142+
exact_order: false
143143
timeout_seconds: 1800
144144
file_types:
145145
skip_test: true
146146
bypass_reason: "To be testes with the last config"
147147
- config_path: secrets/zip_config_jsonl.json
148148
expect_records:
149149
path: integration_tests/expected_records/zip_jsonl.jsonl
150-
exact_order: true
150+
exact_order: false
151151
timeout_seconds: 1800
152152
file_types:
153153
skip_test: true
154154
bypass_reason: "To be testes with the last config"
155155
- config_path: secrets/zip_config_avro.json
156156
expect_records:
157157
path: integration_tests/expected_records/zip_avro.jsonl
158-
exact_order: true
158+
exact_order: false
159159
timeout_seconds: 1800
160160
file_types:
161161
skip_test: true
162162
bypass_reason: "To be testes with the last config"
163163
- config_path: secrets/zip_config_parquet.json
164164
expect_records:
165165
path: integration_tests/expected_records/zip_parquet.jsonl
166-
exact_order: true
166+
exact_order: false
167167
timeout_seconds: 1800
168168
file_types:
169169
skip_test: true
170170
bypass_reason: "To be testes with the last config"
171171
- config_path: secrets/unstructured_config.json
172172
expect_records:
173173
path: integration_tests/expected_records/unstructured.jsonl
174-
exact_order: true
174+
exact_order: false
175175
timeout_seconds: 1800
176176

177177
connection:

airbyte-integrations/connectors/source-s3/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: file
1111
connectorType: source
1212
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
13-
dockerImageTag: 4.4.1
13+
dockerImageTag: 4.5.0
1414
dockerRepository: airbyte/source-s3
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
1616
githubIssueLabel: source-s3

airbyte-integrations/connectors/source-s3/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from setuptools import find_packages, setup
77

88
MAIN_REQUIREMENTS = [
9-
"airbyte-cdk[file-based]==0.59.2", # temporarily pin until concurrency can be released
9+
"airbyte-cdk[file-based]>=0.60.1",
1010
"smart-open[s3]==5.1.0",
1111
"wcmatch==8.4",
1212
"dill==0.3.4",

airbyte-integrations/connectors/source-s3/source_s3/run.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,17 @@
1515

1616
def get_source(args: List[str]):
1717
catalog_path = AirbyteEntrypoint.extract_catalog(args)
18+
config_path = AirbyteEntrypoint.extract_config(args)
19+
state_path = AirbyteEntrypoint.extract_state(args)
1820
try:
19-
return SourceS3(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor)
21+
return SourceS3(
22+
SourceS3StreamReader(),
23+
Config,
24+
SourceS3.read_catalog(catalog_path) if catalog_path else None,
25+
SourceS3.read_config(config_path) if config_path else None,
26+
SourceS3.read_state(state_path) if state_path else None,
27+
cursor_cls=Cursor,
28+
)
2029
except Exception:
2130
print(
2231
AirbyteMessage(

airbyte-integrations/connectors/source-s3/source_s3/v4/source.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from airbyte_cdk.config_observation import emit_configuration_as_airbyte_control_message
88
from airbyte_cdk.models import ConnectorSpecification
9-
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
9+
from airbyte_cdk.sources.file_based.file_based_source import DEFAULT_CONCURRENCY, FileBasedSource
1010
from airbyte_cdk.utils import is_cloud_environment
1111
from source_s3.source import SourceS3Spec
1212
from source_s3.v4.legacy_config_transformer import LegacyConfigTransformer
@@ -21,14 +21,17 @@
2121

2222

2323
class SourceS3(FileBasedSource):
24-
def read_config(self, config_path: str) -> Mapping[str, Any]:
24+
_concurrency_level = DEFAULT_CONCURRENCY
25+
26+
@classmethod
27+
def read_config(cls, config_path: str) -> Mapping[str, Any]:
2528
"""
2629
Used to override the default read_config so that when the new file-based S3 connector processes a config
2730
in the legacy format, it can be transformed into the new config. This happens in entrypoint before we
2831
validate the config against the new spec.
2932
"""
3033
config = super().read_config(config_path)
31-
if not self._is_v4_config(config):
34+
if not SourceS3._is_v4_config(config):
3235
parsed_legacy_config = SourceS3Spec(**config)
3336
converted_config = LegacyConfigTransformer.convert(parsed_legacy_config)
3437
emit_configuration_as_airbyte_control_message(converted_config)
@@ -66,7 +69,8 @@ def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
6669
connectionSpecification=s4_spec,
6770
)
6871

69-
def _is_v4_config(self, config: Mapping[str, Any]) -> bool:
72+
@staticmethod
73+
def _is_v4_config(config: Mapping[str, Any]) -> bool:
7074
return "streams" in config
7175

7276
@staticmethod

airbyte-integrations/connectors/source-s3/unit_tests/v4/test_source.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@
1515
class SourceTest(unittest.TestCase):
1616
def setUp(self) -> None:
1717
self._stream_reader = Mock(spec=SourceS3StreamReader)
18-
self._source = SourceS3(self._stream_reader, Config, str(TEST_FILES_FOLDER.joinpath("catalog.json")))
18+
self._source = SourceS3(
19+
self._stream_reader,
20+
Config,
21+
SourceS3.read_catalog(str(TEST_FILES_FOLDER.joinpath("catalog.json"))),
22+
SourceS3.read_config(str(TEST_FILES_FOLDER.joinpath("v3_config.json"))),
23+
None,
24+
)
1925

2026
@patch("source_s3.v4.source.emit_configuration_as_airbyte_control_message")
2127
def test_given_config_is_v3_when_read_config_then_emit_new_config(self, emit_config_mock) -> None:

airbyte-integrations/connectors/source-s3/v4_main.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,17 @@
1515

1616
def get_source(args: List[str]):
1717
catalog_path = AirbyteEntrypoint.extract_catalog(args)
18+
config_path = AirbyteEntrypoint.extract_config(args)
19+
state_path = AirbyteEntrypoint.extract_state(args)
1820
try:
19-
return SourceS3(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor)
21+
return SourceS3(
22+
SourceS3StreamReader(),
23+
Config,
24+
SourceS3.read_catalog(catalog_path) if catalog_path else None,
25+
SourceS3.read_config(config_path) if config_path else None,
26+
SourceS3.read_state(state_path) if state_path else None,
27+
cursor_cls=Cursor,
28+
)
2029
except Exception:
2130
print(
2231
AirbyteMessage(

docs/integrations/sources/s3.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,9 @@ To perform the text extraction from PDF and Docx files, the connector uses the [
321321

322322
| Version | Date | Pull Request | Subject |
323323
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
324+
| 4.5.0 | 2024-02-01 | [34591](https://github.com/airbytehq/airbyte/pull/34591) | Run full refresh syncs concurrently |
324325
| 4.4.1 | 2024-01-30 | [34665](https://github.com/airbytehq/airbyte/pull/34665) | Pin moto & CDK version |
325-
| 4.4.0 | 2024-01-12 | [33818](https://github.com/airbytehq/airbyte/pull/33818) | Add IAM Role Authentication |
326+
| 4.4.0 | 2023-01-12 | [33818](https://github.com/airbytehq/airbyte/pull/33818) | Add IAM Role Authentication |
326327
| 4.3.1 | 2024-01-04 | [33937](https://github.com/airbytehq/airbyte/pull/33937) | Prepare for airbyte-lib |
327328
| 4.3.0 | 2023-12-14 | [33411](https://github.com/airbytehq/airbyte/pull/33411) | Bump CDK version to auto-set primary key for document file streams and support raw txt files |
328329
| 4.2.4 | 2023-12-06 | [33187](https://github.com/airbytehq/airbyte/pull/33187) | Bump CDK version to hide source-defined primary key |

0 commit comments

Comments
 (0)