Skip to content

Commit 91eff1d

Browse files
authored
🐛 Source S3: Loading of files' metadata (#8252)
1 parent c3e9ef2 commit 91eff1d

35 files changed

+1161
-835
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"sourceDefinitionId": "69589781-7828-43c5-9f63-8925b1c1ccc2",
33
"name": "S3",
44
"dockerRepository": "airbyte/source-s3",
5-
"dockerImageTag": "0.1.9",
5+
"dockerImageTag": "0.1.10",
66
"documentationUrl": "https://docs.airbyte.io/integrations/sources/s3",
77
"icon": "s3.svg"
88
}

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@
620620
- name: S3
621621
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
622622
dockerRepository: airbyte/source-s3
623-
dockerImageTag: 0.1.9
623+
dockerImageTag: 0.1.10
624624
documentationUrl: https://docs.airbyte.io/integrations/sources/s3
625625
icon: s3.svg
626626
sourceType: file

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6454,7 +6454,7 @@
64546454
path_in_connector_config:
64556455
- "credentials"
64566456
- "client_secret"
6457-
- dockerImage: "airbyte/source-s3:0.1.9"
6457+
- dockerImage: "airbyte/source-s3:0.1.10"
64586458
spec:
64596459
documentationUrl: "https://docs.airbyte.io/integrations/sources/s3"
64606460
changelogUrl: "https://docs.airbyte.io/integrations/sources/s3"

airbyte-integrations/connectors/source-s3/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
1717
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1818
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1919

20-
LABEL io.airbyte.version=0.1.9
20+
LABEL io.airbyte.version=0.1.10
2121
LABEL io.airbyte.name=airbyte/source-s3

airbyte-integrations/connectors/source-s3/acceptance-test-config.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,19 @@ tests:
2626
basic_read:
2727
# for CSV format
2828
- config_path: "secrets/config.json"
29+
timeout_seconds: 1800
2930
configured_catalog_path: "integration_tests/configured_catalog.json"
3031
expect_records:
3132
path: "integration_tests/expected_records.txt"
3233
# for Parquet format
3334
- config_path: "secrets/parquet_config.json"
35+
timeout_seconds: 1800
3436
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
3537
expect_records:
3638
path: "integration_tests/parquet_expected_records.txt"
3739
# for custom server
3840
- config_path: "integration_tests/config_minio.json"
41+
timeout_seconds: 1800
3942
configured_catalog_path: "integration_tests/configured_catalog.json"
4043
# expected records contains _ab_source_file_last_modified property which
4144
# is modified all the time s3 file changed and for custom server it is
@@ -46,18 +49,21 @@ tests:
4649
incremental:
4750
# for CSV format
4851
- config_path: "secrets/config.json"
52+
timeout_seconds: 1800
4953
configured_catalog_path: "integration_tests/configured_catalog.json"
5054
cursor_paths:
5155
test: ["_ab_source_file_last_modified"]
5256
future_state_path: "integration_tests/abnormal_state.json"
5357
# for Parquet format
5458
- config_path: "secrets/parquet_config.json"
59+
timeout_seconds: 1800
5560
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
5661
cursor_paths:
5762
test: ["_ab_source_file_last_modified"]
5863
future_state_path: "integration_tests/abnormal_state.json"
5964
# for custom server
6065
- config_path: "integration_tests/config_minio.json"
66+
timeout_seconds: 1800
6167
configured_catalog_path: "integration_tests/configured_catalog.json"
6268
cursor_paths:
6369
test: ["_ab_source_file_last_modified"]
@@ -66,10 +72,13 @@ tests:
6672
full_refresh:
6773
# for CSV format
6874
- config_path: "secrets/config.json"
75+
timeout_seconds: 1800
6976
configured_catalog_path: "integration_tests/configured_catalog.json"
7077
# for Parquet format
7178
- config_path: "secrets/parquet_config.json"
79+
timeout_seconds: 1800
7280
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
7381
# for custom server
7482
- config_path: "integration_tests/config_minio.json"
83+
timeout_seconds: 1800
7584
configured_catalog_path: "integration_tests/configured_catalog.json"

airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,36 +3,14 @@
33
#
44

55

6-
import shutil
7-
import tempfile
8-
from zipfile import ZipFile
6+
from typing import Iterable
97

10-
import docker
118
import pytest
129

1310
pytest_plugins = ("source_acceptance_test.plugin",)
1411

1512

1613
@pytest.fixture(scope="session", autouse=True)
17-
def connector_setup():
14+
def connector_setup() -> Iterable[None]:
1815
"""This fixture is a placeholder for external resources that acceptance test might require."""
1916
yield
20-
21-
22-
@pytest.fixture(scope="session", autouse=True)
23-
def minio_setup():
24-
client = docker.from_env()
25-
tmp_dir = tempfile.mkdtemp()
26-
with ZipFile("./integration_tests/minio_data.zip") as archive:
27-
archive.extractall(tmp_dir)
28-
29-
container = client.containers.run(
30-
"minio/minio",
31-
f"server {tmp_dir}/minio_data",
32-
network_mode="host",
33-
volumes=["/tmp:/tmp", "/var/run/docker.sock:/var/run/docker.sock"],
34-
detach=True,
35-
)
36-
yield
37-
shutil.rmtree(tmp_dir)
38-
container.stop()

airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json renamed to airbyte-integrations/connectors/source-s3/integration_tests/config_minio.template.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"aws_access_key_id": "123456",
77
"aws_secret_access_key": "123456key",
88
"path_prefix": "",
9-
"endpoint": "http://localhost:9000"
9+
"endpoint": "http://<local_ip>:9000"
1010
},
1111
"format": {
1212
"filetype": "csv"
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
#
2+
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import json
6+
import time
7+
from pathlib import Path
8+
from typing import Any, Iterable, List, Mapping
9+
from zipfile import ZipFile
10+
11+
import docker
12+
import pytest
13+
import requests # type: ignore[import]
14+
from airbyte_cdk import AirbyteLogger
15+
from docker.errors import APIError
16+
from netifaces import AF_INET, ifaddresses, interfaces
17+
from requests.exceptions import ConnectionError # type: ignore[import]
18+
19+
from .integration_test import TMP_FOLDER, TestIncrementalFileStreamS3
20+
21+
LOGGER = AirbyteLogger()
22+
23+
24+
def get_local_ip() -> str:
25+
all_interface_ips: List[str] = []
26+
for iface_name in interfaces():
27+
all_interface_ips += [i["addr"] for i in ifaddresses(iface_name).setdefault(AF_INET, [{"addr": None}]) if i["addr"]]
28+
LOGGER.info(f"detected interface IPs: {all_interface_ips}")
29+
for ip in sorted(all_interface_ips):
30+
if not ip.startswith("127."):
31+
return ip
32+
33+
assert False, "not found an non-localhost interface"
34+
35+
36+
@pytest.fixture(scope="session")
37+
def minio_credentials() -> Mapping[str, Any]:
38+
config_template = Path(__file__).parent / "config_minio.template.json"
39+
assert config_template.is_file() is not None, f"not found {config_template}"
40+
config_file = Path(__file__).parent / "config_minio.json"
41+
config_file.write_text(config_template.read_text().replace("<local_ip>", get_local_ip()))
42+
credentials = {}
43+
with open(str(config_file)) as f:
44+
credentials = json.load(f)
45+
return credentials
46+
47+
48+
@pytest.fixture(scope="session", autouse=True)
49+
def minio_setup(minio_credentials: Mapping[str, Any]) -> Iterable[None]:
50+
51+
with ZipFile("./integration_tests/minio_data.zip") as archive:
52+
archive.extractall(TMP_FOLDER)
53+
client = docker.from_env()
54+
# Minio should be attached to non-localhost interface.
55+
# Because another test container should have direct connection to it
56+
local_ip = get_local_ip()
57+
LOGGER.debug(f"minio settings: {minio_credentials}")
58+
try:
59+
container = client.containers.run(
60+
image="minio/minio:RELEASE.2021-10-06T23-36-31Z",
61+
command=f"server {TMP_FOLDER}",
62+
name="ci_test_minio",
63+
auto_remove=True,
64+
volumes=[f"/{TMP_FOLDER}/minio_data:/{TMP_FOLDER}"],
65+
detach=True,
66+
ports={"9000/tcp": (local_ip, 9000)},
67+
)
68+
except APIError as err:
69+
if err.status_code == 409:
70+
for container in client.containers.list():
71+
if container.name == "ci_test_minio":
72+
LOGGER.info("minio was started before")
73+
break
74+
else:
75+
raise
76+
77+
check_url = f"http://{local_ip}:9000/minio/health/live"
78+
checked = False
79+
for _ in range(120): # wait 1 min
80+
time.sleep(0.5)
81+
LOGGER.info(f"try to connect to {check_url}")
82+
try:
83+
data = requests.get(check_url)
84+
except ConnectionError as err:
85+
LOGGER.warn(f"minio error: {err}")
86+
continue
87+
if data.status_code == 200:
88+
checked = True
89+
LOGGER.info("Run a minio/minio container...")
90+
break
91+
else:
92+
LOGGER.info(f"minio error: {data.response.text}")
93+
if not checked:
94+
assert False, "couldn't connect to minio!!!"
95+
96+
yield
97+
# this minio container was not finished because it is needed for all integration adn acceptance tests
98+
99+
100+
def pytest_sessionfinish(session: Any, exitstatus: Any) -> None:
101+
"""tries to find and remove all temp buckets"""
102+
instance = TestIncrementalFileStreamS3()
103+
instance._s3_connect(instance.credentials)
104+
temp_buckets = []
105+
for bucket in instance.s3_resource.buckets.all():
106+
if bucket.name.startswith(instance.temp_bucket_prefix):
107+
temp_buckets.append(bucket.name)
108+
for bucket_name in temp_buckets:
109+
bucket = instance.s3_resource.Bucket(bucket_name)
110+
bucket.objects.all().delete()
111+
bucket.delete()
112+
LOGGER.info(f"S3 Bucket {bucket_name} is now deleted")

airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,26 @@
44

55

66
import json
7+
import os
8+
import shutil
79
import time
8-
from typing import Iterator, List, Mapping
10+
from typing import Any, Dict, Iterator, List, Mapping
911

1012
import boto3
11-
from airbyte_cdk.logger import AirbyteLogger
13+
import pytest
14+
from airbyte_cdk import AirbyteLogger
1215
from botocore.errorfactory import ClientError
16+
from source_s3.source import SourceS3
1317
from source_s3.stream import IncrementalFileStreamS3
18+
from unit_tests.abstract_test_parser import memory_limit
19+
from unit_tests.test_csv_parser import generate_big_file
1420

1521
from .integration_test_abstract import HERE, SAMPLE_DIR, AbstractTestIncrementalFileStream
1622

23+
TMP_FOLDER = "/tmp/test_minio_source_s3"
24+
if not os.path.exists(TMP_FOLDER):
25+
os.makedirs(TMP_FOLDER)
26+
1727
LOGGER = AirbyteLogger()
1828

1929

@@ -35,7 +45,7 @@ def credentials(self) -> Mapping:
3545
def provider(self, bucket_name: str) -> Mapping:
3646
return {"storage": "S3", "bucket": bucket_name}
3747

38-
def _s3_connect(self, credentials: Mapping):
48+
def _s3_connect(self, credentials: Mapping) -> None:
3949
region = "eu-west-3"
4050
self.s3_client = boto3.client(
4151
"s3",
@@ -85,9 +95,42 @@ def cloud_files(self, cloud_bucket_name: str, credentials: Mapping, files_to_upl
8595
self.s3_client.upload_file(str(filepath), bucket_name, upload_path, ExtraArgs=extra_args)
8696
yield f"{bucket_name}/{upload_path}"
8797

88-
def teardown_infra(self, cloud_bucket_name: str, credentials: Mapping):
98+
def teardown_infra(self, cloud_bucket_name: str, credentials: Mapping) -> None:
8999
self._s3_connect(credentials)
90100
bucket = self.s3_resource.Bucket(cloud_bucket_name)
91101
bucket.objects.all().delete()
92102
bucket.delete()
93103
LOGGER.info(f"S3 Bucket {cloud_bucket_name} is now deleted")
104+
105+
106+
class TestIntegrationCsvFiles:
107+
logger = AirbyteLogger()
108+
109+
@memory_limit(150) # max used memory should be less than 150Mb
110+
def read_source(self, credentials: Dict[str, Any], catalog: Dict[str, Any]) -> int:
111+
read_count = 0
112+
for msg in SourceS3().read(logger=self.logger, config=credentials, catalog=catalog):
113+
if msg.record:
114+
read_count += 1
115+
return read_count
116+
117+
@pytest.mark.order(1)
118+
def test_big_file(self, minio_credentials: Dict[str, Any]) -> None:
119+
"""tests a big csv file (>= 1.0G records)"""
120+
# generates a big CSV files separately
121+
big_file_folder = os.path.join(TMP_FOLDER, "minio_data", "test-bucket", "big_files")
122+
shutil.rmtree(big_file_folder, ignore_errors=True)
123+
os.makedirs(big_file_folder)
124+
filepath = os.path.join(big_file_folder, "file.csv")
125+
126+
# please change this value if you need to test another file size
127+
future_file_size = 0.5 # in gigabytes
128+
_, file_size = generate_big_file(filepath, future_file_size, 500)
129+
expected_count = sum(1 for _ in open(filepath)) - 1
130+
self.logger.info(f"generated file {filepath} with size {file_size}Gb, lines: {expected_count}")
131+
132+
minio_credentials["path_pattern"] = "big_files/file.csv"
133+
minio_credentials["format"]["block_size"] = 5 * 1024 ** 2
134+
source = SourceS3()
135+
catalog = source.read_catalog(HERE / "configured_catalog.json")
136+
assert self.read_source(minio_credentials, catalog) == expected_count

0 commit comments

Comments
 (0)