Skip to content

Commit d9885eb

Browse files
roman-yermilov-glxiaohansong
authored andcommitted
Source S3: add filter by start date (#35392)
1 parent c979762 commit d9885eb

File tree

5 files changed

+57
-6
lines changed

5 files changed

+57
-6
lines changed

airbyte-integrations/connectors/source-s3/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: file
1111
connectorType: source
1212
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
13-
dockerImageTag: 4.5.4
13+
dockerImageTag: 4.5.5
1414
dockerRepository: airbyte/source-s3
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
1616
githubIssueLabel: source-s3

airbyte-integrations/connectors/source-s3/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "4.5.4"
6+
version = "4.5.5"
77
name = "source-s3"
88
description = "Source implementation for S3."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from typing import Iterable, List, Optional, Set
1010

1111
import boto3.session
12+
import pendulum
1213
import pytz
1314
import smart_open
1415
from airbyte_cdk.models import FailureType
@@ -205,7 +206,11 @@ def _page(
205206
continue
206207

207208
for remote_file in self._handle_file(file):
208-
if self.file_matches_globs(remote_file, globs) and remote_file.uri not in seen:
209+
if (
210+
self.file_matches_globs(remote_file, globs)
211+
and self.is_modified_after_start_date(remote_file.last_modified)
212+
and remote_file.uri not in seen
213+
):
209214
seen.add(remote_file.uri)
210215
yield remote_file
211216
else:
@@ -217,6 +222,12 @@ def _page(
217222
logger.info(f"Finished listing objects from S3 for prefix={prefix}. Found {total_n_keys_for_prefix} objects.")
218223
break
219224

225+
def is_modified_after_start_date(self, last_modified_date: Optional[datetime]) -> bool:
226+
"""Returns True if given date higher or equal than start date or something is missing"""
227+
if not (self.config.start_date and last_modified_date):
228+
return True
229+
return last_modified_date >= pendulum.parse(self.config.start_date).naive()
230+
220231
def _handle_file(self, file):
221232
if file["Key"].endswith(".zip"):
222233
yield from self._handle_zip_file(file)

airbyte-integrations/connectors/source-s3/unit_tests/v4/test_stream_reader.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import io
77
import logging
8-
from datetime import datetime
8+
from datetime import datetime, timedelta
99
from itertools import product
1010
from typing import Any, Dict, List, Optional, Set
1111
from unittest.mock import patch
@@ -269,3 +269,38 @@ def test_get_iam_s3_client(boto3_client_mock):
269269

270270
# Assertions to validate the s3 client
271271
assert s3_client is not None
272+
273+
@pytest.mark.parametrize(
274+
"start_date, last_modified_date, expected_result",
275+
(
276+
# True when file is new or modified after given start_date
277+
(
278+
datetime.now() - timedelta(days=180),
279+
datetime.now(),
280+
True
281+
),
282+
(
283+
datetime.strptime("2024-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"),
284+
datetime.strptime("2024-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"),
285+
True
286+
),
287+
# False when file is older than given start_date
288+
(
289+
datetime.now(),
290+
datetime.now() - timedelta(days=180),
291+
False
292+
)
293+
)
294+
)
295+
def test_filter_file_by_start_date(start_date: datetime, last_modified_date: datetime, expected_result: bool) -> None:
296+
reader = SourceS3StreamReader()
297+
298+
reader.config = Config(
299+
bucket="test",
300+
aws_access_key_id="test",
301+
aws_secret_access_key="test",
302+
streams=[],
303+
start_date=start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
304+
)
305+
306+
assert expected_result == reader.is_modified_after_start_date(last_modified_date)

docs/integrations/sources/s3.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ The Amazon S3 source connector supports the following [sync modes](https://docs.
8888
| Replicate Multiple Streams \(distinct tables\) | Yes |
8989
| Namespaces | No |
9090

91+
## Supported streams
92+
93+
There is no predefined streams. The streams are based on content of your bucket.
94+
9195
## File Compressions
9296

9397
| Compression | Supported? |
@@ -260,8 +264,9 @@ To perform the text extraction from PDF and Docx files, the connector uses the [
260264

261265
| Version | Date | Pull Request | Subject |
262266
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
263-
| 4.5.4 | 2024-02-15 | [35055](https://github.com/airbytehq/airbyte/pull/35055) | Temporarily revert concurrency |
264-
| 4.5.3 | 2024-02-12 | [35164](https://github.com/airbytehq/airbyte/pull/35164) | Manage dependencies with Poetry. |
267+
| 4.5.5 | 2024-02-18 | [35392](https://github.com/airbytehq/airbyte/pull/35392) | Add support filtering by start date |
268+
| 4.5.4 | 2024-02-15 | [35055](https://github.com/airbytehq/airbyte/pull/35055) | Temporarily revert concurrency |
269+
| 4.5.3 | 2024-02-12 | [35164](https://github.com/airbytehq/airbyte/pull/35164) | Manage dependencies with Poetry. |
265270
| 4.5.2 | 2024-02-06 | [34930](https://github.com/airbytehq/airbyte/pull/34930) | Bump CDK version to fix issue when SyncMode is missing from catalog |
266271
| 4.5.1 | 2024-02-02 | [31701](https://github.com/airbytehq/airbyte/pull/31701) | Add `region` support |
267272
| 4.5.0 | 2024-02-01 | [34591](https://github.com/airbytehq/airbyte/pull/34591) | Run full refresh syncs concurrently |

0 commit comments

Comments
 (0)