Skip to content

Commit 6ed63f5

Browse files
authored
Source S3: run incremental syncs with concurrency (#34895)
1 parent a6b3f0c commit 6ed63f5

File tree

7 files changed

+129
-109
lines changed

7 files changed

+129
-109
lines changed

airbyte-integrations/connectors/source-s3/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: file
1111
connectorType: source
1212
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
13-
dockerImageTag: 4.5.6
13+
dockerImageTag: 4.5.7
1414
dockerRepository: airbyte/source-s3
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
1616
githubIssueLabel: source-s3

airbyte-integrations/connectors/source-s3/poetry.lock

Lines changed: 58 additions & 58 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

airbyte-integrations/connectors/source-s3/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "4.5.6"
6+
version = "4.5.7"
77
name = "source-s3"
88
description = "Source implementation for S3."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-s3/source_s3/v4/source.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222

2323
class SourceS3(FileBasedSource):
24+
_concurrency_level = DEFAULT_CONCURRENCY
25+
2426
@classmethod
2527
def read_config(cls, config_path: str) -> Mapping[str, Any]:
2628
"""

airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
class SourceS3StreamReader(AbstractFileBasedStreamReader):
3131
def __init__(self):
3232
super().__init__()
33-
self._s3_client = None
3433

3534
@property
3635
def config(self) -> Config:
@@ -56,24 +55,24 @@ def s3_client(self) -> BaseClient:
5655
# We shouldn't hit this; config should always get set before attempting to
5756
# list or read files.
5857
raise ValueError("Source config is missing; cannot create the S3 client.")
59-
if self._s3_client is None:
60-
client_kv_args = _get_s3_compatible_client_args(self.config) if self.config.endpoint else {}
6158

62-
# Set the region_name if it's provided in the config
63-
if self.config.region_name:
64-
client_kv_args["region_name"] = self.config.region_name
59+
client_kv_args = _get_s3_compatible_client_args(self.config) if self.config.endpoint else {}
6560

66-
if self.config.role_arn:
67-
self._s3_client = self._get_iam_s3_client(client_kv_args)
68-
else:
69-
self._s3_client = boto3.client(
70-
"s3",
71-
aws_access_key_id=self.config.aws_access_key_id,
72-
aws_secret_access_key=self.config.aws_secret_access_key,
73-
**client_kv_args,
74-
)
61+
# Set the region_name if it's provided in the config
62+
if self.config.region_name:
63+
client_kv_args["region_name"] = self.config.region_name
64+
65+
if self.config.role_arn:
66+
_s3_client = self._get_iam_s3_client(client_kv_args)
67+
else:
68+
_s3_client = boto3.client(
69+
"s3",
70+
aws_access_key_id=self.config.aws_access_key_id,
71+
aws_secret_access_key=self.config.aws_secret_access_key,
72+
**client_kv_args,
73+
)
7574

76-
return self._s3_client
75+
return _s3_client
7776

7877
def _get_iam_s3_client(self, client_kv_args: dict) -> BaseClient:
7978
"""

airbyte-integrations/connectors/source-s3/unit_tests/v4/test_stream_reader.py

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88
from datetime import datetime, timedelta
99
from itertools import product
1010
from typing import Any, Dict, List, Optional, Set
11-
from unittest.mock import patch
11+
from unittest.mock import MagicMock, patch
1212

1313
import pytest
1414
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
1515
from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError
1616
from airbyte_cdk.sources.file_based.file_based_stream_reader import FileReadMode
1717
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
1818
from botocore.stub import Stubber
19-
from moto import mock_sts
19+
from moto import mock_s3, mock_sts
2020
from pydantic import AnyUrl
2121
from source_s3.v4.config import Config
2222
from source_s3.v4.stream_reader import SourceS3StreamReader
@@ -124,10 +124,51 @@ def test_get_matching_files(
124124
except Exception as exc:
125125
raise exc
126126

127-
stub = set_stub(reader, mocked_response, multiple_pages)
128-
files = list(reader.get_matching_files(globs, None, logger))
129-
stub.deactivate()
130-
assert set(f.uri for f in files) == expected_uris
127+
with patch.object(SourceS3StreamReader, 's3_client', new_callable=MagicMock) as mock_s3_client:
128+
_setup_mock_s3_client(mock_s3_client, mocked_response, multiple_pages)
129+
files = list(reader.get_matching_files(globs, None, logger))
130+
assert set(f.uri for f in files) == expected_uris
131+
132+
133+
def _setup_mock_s3_client(mock_s3_client, mocked_response, multiple_pages):
134+
responses = []
135+
if multiple_pages and len(mocked_response) > 1:
136+
# Split the mocked_response for pagination simulation
137+
first_half = mocked_response[:len(mocked_response) // 2]
138+
second_half = mocked_response[len(mocked_response) // 2:]
139+
140+
responses.append({
141+
"IsTruncated": True,
142+
"Contents": first_half,
143+
"KeyCount": len(first_half),
144+
"NextContinuationToken": "token",
145+
})
146+
147+
responses.append({
148+
"IsTruncated": False,
149+
"Contents": second_half,
150+
"KeyCount": len(second_half),
151+
})
152+
else:
153+
responses.append({
154+
"IsTruncated": False,
155+
"Contents": mocked_response,
156+
"KeyCount": len(mocked_response),
157+
})
158+
159+
def list_objects_v2_side_effect(Bucket, Prefix=None, ContinuationToken=None, **kwargs):
160+
if ContinuationToken == "token":
161+
return responses[1]
162+
return responses[0]
163+
164+
mock_s3_client.list_objects_v2 = MagicMock(side_effect=list_objects_v2_side_effect)
165+
166+
167+
def _split_mocked_response(mocked_response, multiple_pages):
168+
if not multiple_pages:
169+
return mocked_response, []
170+
split_index = len(mocked_response) // 2
171+
return mocked_response[:split_index], mocked_response[split_index:]
131172

132173

133174
@patch("boto3.client")
@@ -196,9 +237,9 @@ def test_open_file_calls_any_open_with_the_right_encoding(smart_open_mock):
196237
with reader.open_file(RemoteFile(uri="", last_modified=datetime.now()), FileReadMode.READ, encoding, logger) as fp:
197238
fp.read()
198239

199-
smart_open_mock.assert_called_once_with(
200-
"s3://test/", transport_params={"client": reader.s3_client}, mode=FileReadMode.READ.value, encoding=encoding
201-
)
240+
assert smart_open_mock.call_args.args == ("s3://test/",)
241+
assert smart_open_mock.call_args.kwargs["mode"] == FileReadMode.READ.value
242+
assert smart_open_mock.call_args.kwargs["encoding"] == encoding
202243

203244

204245
def test_get_s3_client_without_config_raises_exception():
@@ -218,29 +259,6 @@ def documentation_url(cls) -> AnyUrl:
218259
stream_reader.config = other_config
219260

220261

221-
def set_stub(reader: SourceS3StreamReader, contents: List[Dict[str, Any]], multiple_pages: bool) -> Stubber:
222-
s3_stub = Stubber(reader.s3_client)
223-
split_contents_idx = int(len(contents) / 2) if multiple_pages else -1
224-
page1, page2 = contents[:split_contents_idx], contents[split_contents_idx:]
225-
resp = {
226-
"KeyCount": len(page1),
227-
"Contents": page1,
228-
}
229-
if page2:
230-
resp["NextContinuationToken"] = "token"
231-
s3_stub.add_response("list_objects_v2", resp)
232-
if page2:
233-
s3_stub.add_response(
234-
"list_objects_v2",
235-
{
236-
"KeyCount": len(page2),
237-
"Contents": page2,
238-
},
239-
)
240-
s3_stub.activate()
241-
return s3_stub
242-
243-
244262
@mock_sts
245263
@patch("source_s3.v4.stream_reader.boto3.client")
246264
def test_get_iam_s3_client(boto3_client_mock):
@@ -303,4 +321,4 @@ def test_filter_file_by_start_date(start_date: datetime, last_modified_date: dat
303321
start_date=start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
304322
)
305323

306-
assert expected_result == reader.is_modified_after_start_date(last_modified_date)
324+
assert expected_result == reader.is_modified_after_start_date(last_modified_date)

docs/integrations/sources/s3.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ To perform the text extraction from PDF and Docx files, the connector uses the [
264264

265265
| Version | Date | Pull Request | Subject |
266266
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
267+
| 4.5.7 | 2024-02-23 | [34895](https://github.com/airbytehq/airbyte/pull/34895) | Run incremental syncs with concurrency |
267268
| 4.5.6 | 2024-02-21 | [35246](https://github.com/airbytehq/airbyte/pull/35246) | Fixes bug that occurred when creating CSV streams with tab delimiter. |
268269
| 4.5.5 | 2024-02-18 | [35392](https://github.com/airbytehq/airbyte/pull/35392) | Add support filtering by start date |
269270
| 4.5.4 | 2024-02-15 | [35055](https://github.com/airbytehq/airbyte/pull/35055) | Temporarily revert concurrency |

0 commit comments

Comments
 (0)