Skip to content

🐛 Source S3: Loading of files' metadata #8252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 59 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3f5fe85
update unit tests
antixar Nov 23, 2021
a4ce4dd
update tests
antixar Nov 25, 2021
00cd291
update version
antixar Nov 25, 2021
75fb824
fix conftest
antixar Nov 25, 2021
5749ce9
correction connection_setup
antixar Nov 25, 2021
a84d5bb
Update airbyte-integrations/connectors/source-s3/source_s3/source_fil…
antixar Dec 8, 2021
7a9d3a0
merge with master
antixar Dec 15, 2021
1aface7
restore slice logic
antixar Dec 16, 2021
fb5daa0
Merge remote-tracking branch 'origin' into antixar/6870-source-s3-csv…
antixar Dec 20, 2021
c221569
test correction
antixar Dec 20, 2021
86b8640
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Dec 20, 2021
88b153c
test logs
antixar Dec 20, 2021
1f28602
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Dec 21, 2021
4819e23
correction after review
antixar Dec 21, 2021
ac1f33f
reset failed files
antixar Dec 21, 2021
1bc0c3b
test without custom chunking
antixar Dec 21, 2021
247eb1d
correction after flake8
antixar Dec 21, 2021
4aa028d
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Dec 21, 2021
150f98e
correction of tests
antixar Dec 21, 2021
9f39aa2
merge with master
antixar Jan 10, 2022
05e6641
bix a test bug
antixar Jan 10, 2022
4a26bde
fix flake8 comment
antixar Jan 10, 2022
1e2efaf
fix generation of minio config file
antixar Jan 10, 2022
c9e4d3a
fix generation of minio config file
antixar Jan 10, 2022
29d9dc3
fix generation of minio config file2
antixar Jan 10, 2022
6f07a58
fix aceptance test
antixar Jan 10, 2022
80ff530
fix aceptance test3
antixar Jan 10, 2022
a1108e7
fix aceptance test4
antixar Jan 10, 2022
e20d0cd
fix aceptance test4
antixar Jan 10, 2022
17542b1
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Jan 24, 2022
7779233
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Jan 25, 2022
7f8b106
fix mypy comments
antixar Jan 25, 2022
6d2af0c
fix mypy comments
antixar Jan 25, 2022
a4dc642
improve tracemalloc.take_snapshot
grubberr Jan 26, 2022
c98c46c
fix tmp folder for minio
antixar Jan 27, 2022
9858cc0
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Jan 27, 2022
bc1dae9
fix flake8 comments
antixar Jan 27, 2022
7873cd6
fix unit tests
antixar Jan 27, 2022
4a08a2b
bump version
antixar Jan 27, 2022
df836a9
raise test timeouts
antixar Jan 28, 2022
7782cba
debug minio tests
antixar Jan 28, 2022
e4b0f3d
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Jan 31, 2022
89e80e3
debug test issues
antixar Jan 31, 2022
2a42bf2
corrected by isort
antixar Jan 31, 2022
6a7b01d
clean temp test buckets
antixar Jan 31, 2022
9e6d5b0
test locale issue
antixar Feb 1, 2022
c321fe4
move new tests to single test file
antixar Feb 1, 2022
abb6044
correction after flake8
antixar Feb 1, 2022
a07ee66
debug reports
antixar Feb 1, 2022
a4cc4c8
test covarage logic
antixar Feb 1, 2022
ababf7d
test covarage logic
antixar Feb 1, 2022
c93f1d3
test covarage logic
antixar Feb 1, 2022
5b48a94
test covarage logic
antixar Feb 1, 2022
8c933d6
test covarage logic
antixar Feb 1, 2022
c7f227f
test failed unittests
antixar Feb 1, 2022
2efb604
test failed integration tests
antixar Feb 1, 2022
f64d8e3
test failed SAT
antixar Feb 1, 2022
e2ecb0c
successful tests
antixar Feb 1, 2022
b753296
update spec file
antixar Feb 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "69589781-7828-43c5-9f63-8925b1c1ccc2",
"name": "S3",
"dockerRepository": "airbyte/source-s3",
"dockerImageTag": "0.1.7",
"dockerImageTag": "0.1.8",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/s3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/sources/s3
sourceType: file
- name: SalesLoft
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,38 @@

import docker
import pytest
from airbyte_cdk import AirbyteLogger

pytest_plugins = ("source_acceptance_test.plugin",)
logger = AirbyteLogger()
TMP_FOLDER = tempfile.mkdtemp()


@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
yield


@pytest.fixture(scope="session", autouse=True)
def minio_setup():
client = docker.from_env()
tmp_dir = tempfile.mkdtemp()
with ZipFile("./integration_tests/minio_data.zip") as archive:
archive.extractall(tmp_dir)
archive.extractall(TMP_FOLDER)
client = docker.from_env()
for container in client.containers.list():
if container.name == "ci_test_minio":
container.stop()
break

container = client.containers.run(
"minio/minio",
f"server {tmp_dir}/minio_data",
network_mode="host",
volumes=["/tmp:/tmp", "/var/run/docker.sock:/var/run/docker.sock"],
f"server {TMP_FOLDER}",
name="ci_test_minio",
auto_remove=True,
# network_mode="host",
volumes=[f"/{TMP_FOLDER}/minio_data:/{TMP_FOLDER}"],
detach=True,
ports={"9000/tcp": ("127.0.0.1", 9000)},
)
logger.info("Run a minio/minio container")
yield
shutil.rmtree(tmp_dir)
shutil.rmtree(TMP_FOLDER)
container.stop()


@pytest.fixture(scope="session", autouse=True)
def connector_setup():
yield from minio_setup()
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json
import os
from pathlib import Path
from typing import Mapping

import pytest
from airbyte_cdk import AirbyteLogger
from source_s3.source import SourceS3
from unit_tests.abstract_test_parser import memory_limit
from unit_tests.test_csv_parser import generate_big_file

from .acceptance import TMP_FOLDER

HERE = Path(__file__).resolve().parent


@pytest.fixture(scope="module")
def credentials() -> Mapping:
filename = HERE / "config_minio.json"
with open(filename) as f:
return json.load(f)


class TestIntegrationCsvFiles:
logger = AirbyteLogger()

@memory_limit(20) # max used memory should be less than 20Mb
def read_source(self, credentials, catalog):
read_count = 0
for msg in SourceS3().read(logger=self.logger, config=credentials, catalog=catalog):
if msg.record:
read_count += 1
return read_count

@pytest.mark.order(1)
def test_big_file(self, credentials):
"""tests a big csv file (>= 1.0G records)"""
# generates a big CSV files separately
big_file_folder = os.path.join(TMP_FOLDER, "minio_data", "test-bucket", "big_files")
os.makedirs(big_file_folder)
filepath = os.path.join(big_file_folder, "file.csv")

# please change this value if you need to test another file size
future_file_size = 1.0 # in gigabytes
_, file_size = generate_big_file(filepath, future_file_size, 500)
expected_count = sum(1 for _ in open(filepath)) - 1
self.logger.info(f"generated file {filepath} with size {file_size}Gb, lines: {expected_count}")

credentials["path_pattern"] = "big_files/*.csv"
credentials["format"]["block_size"] = 5 * 1024 ** 2
source = SourceS3()
catalog = source.read_catalog(HERE / "configured_catalog.json")

assert self.read_source(credentials, catalog) == expected_count
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://localhost:9000"
"endpoint": "http://127.0.0.1:9000"
},
"format": {
"filetype": "csv"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import pytest

from .acceptance import minio_setup


@pytest.fixture(scope="session", autouse=True)
def connector_setup():
yield from minio_setup()
Original file line number Diff line number Diff line change
Expand Up @@ -121,25 +121,23 @@ def _stream_records_test_logic(

records = []
for stream_slice in fs.stream_slices(sync_mode=sync_mode, stream_state=current_state):
if stream_slice is not None:
# we need to do this in order to work out which extra columns (if any) we expect in this stream_slice
expected_columns = []
for file_dict in stream_slice:
# TODO: if we ever test other filetypes in these tests this will need fixing
file_reader = CsvParser(format)
with file_dict["storagefile"].open(file_reader.is_binary) as f:
expected_columns.extend(list(file_reader.get_inferred_schema(f).keys()))
expected_columns = set(expected_columns) # de-dupe

for record in fs.read_records(sync_mode, stream_slice=stream_slice):
# check actual record values match expected schema
assert all(
[
isinstance(record[col], JSONTYPE_TO_PYTHONTYPE[typ]) or record[col] is None
for col, typ in full_expected_schema.items()
]
)
records.append(record)
# we need to do this in order to work out which extra columns (if any) we expect in this stream_slice
expected_columns = []
# TODO: if we ever test other filetypes in these tests this will need fixing
file_reader = CsvParser(format)
with stream_slice["storage_file"].open(file_reader.is_binary) as f:
expected_columns.extend(list(file_reader.get_inferred_schema(f).keys()))
expected_columns = set(expected_columns) # de-dupe

for record in fs.read_records(sync_mode, stream_slice=stream_slice):
# check actual record values match expected schema
assert all(
[
isinstance(record[col], JSONTYPE_TO_PYTHONTYPE[typ]) or record[col] is None
for col, typ in full_expected_schema.items()
]
)
records.append(record)

assert all([len(r.keys()) == total_num_columns for r in records])
assert len(records) == num_records
Expand Down
6 changes: 1 addition & 5 deletions airbyte-integrations/connectors/source-s3/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
"pytz",
]

TEST_REQUIREMENTS = [
"pytest~=6.1",
"source-acceptance-test",
"pandas==1.3.1",
]
TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "pandas==1.3.1", "psutil", "pytest-order"]

setup(
name="source_s3",
Expand Down
29 changes: 2 additions & 27 deletions airbyte-integrations/connectors/source-s3/source_s3/s3file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@


from contextlib import contextmanager
from datetime import datetime
from typing import BinaryIO, Iterator, TextIO, Union

import smart_open
from boto3 import session as boto3session
from botocore import UNSIGNED
from botocore.client import Config as ClientConfig
from botocore.config import Config
from botocore.exceptions import NoCredentialsError
from source_s3.s3_utils import make_s3_client, make_s3_resource

from .source_files_abstract.storagefile import StorageFile
Expand All @@ -39,28 +37,6 @@ def _setup_boto_session(self):
self._boto_session = boto3session.Session()
self._boto_s3_resource = make_s3_resource(self._provider, config=Config(signature_version=UNSIGNED), session=self._boto_session)

@property
def last_modified(self) -> datetime:
"""
Using decorator set up boto3 session & s3 resource.
Note: slight nuance for grabbing this when we have no credentials.

:return: last_modified property of the blob/file
"""
bucket = self._provider.get("bucket")
try:
obj = self._boto_s3_resource.Object(bucket, self.url)
return obj.last_modified
# For some reason, this standard method above doesn't work for public files with no credentials so fall back on below
except NoCredentialsError as nce:
# we don't expect this error if using credentials so throw it
if self.use_aws_account(self._provider):
raise nce
else:
return make_s3_client(self._provider, config=ClientConfig(signature_version=UNSIGNED)).head_object(
Bucket=bucket, Key=self.url
)["LastModified"]

@staticmethod
def use_aws_account(provider: dict) -> bool:
aws_access_key_id = provider.get("aws_access_key_id")
Expand All @@ -77,14 +53,13 @@ def open(self, binary: bool) -> Iterator[Union[TextIO, BinaryIO]]:
"""
mode = "rb" if binary else "r"
bucket = self._provider.get("bucket")

if self.use_aws_account(self._provider):
params = {"client": make_s3_client(self._provider, session=self._boto_session)}
result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode)
else:
config = ClientConfig(signature_version=UNSIGNED)
params = {"client": make_s3_client(self._provider, config=config)}
result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode)
self.logger.debug(f"try to open {self.file_info}")
result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode)

# see https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager for why we do this
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import os
from dataclasses import dataclass
from datetime import datetime
from functools import total_ordering


@total_ordering
@dataclass
class FileInfo:
"""Class for sharing of metadate"""

key: str
size: int
last_modified: datetime

@property
def size_in_megabytes(self):
return self.size / 1024 ** 2

def __str__(self):
return "Key: %s, LastModified: %s, Size: %.4fMb" % (self.key, self.last_modified.isoformat(), self.size_in_megabytes)

def __repr__(self):
return self.__str__()

@classmethod
def create_by_local_file(cls, filepath: str) -> "FileInfo":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're only using this method in testing... doesn't make sense as a method of the class imo, should rather be a separate private testing method to instantiate an instance of FileInfo if we need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to test folders

"Generates a FileInfo instance. This function can be used for tests"
if not os.path.exists(filepath):
return cls(key=filepath, size=0, last_modified=datetime.now())
return cls(key=filepath, size=os.stat(filepath).st_size, last_modified=datetime.fromtimestamp(os.path.getmtime(filepath)))

def __eq__(self, other):
if isinstance(other, str):
return self.key == other
return self.key == other.key

def __lt__(self, other):
if isinstance(other, str):
return self.key < other
return self.key < other.key

def __hash__(self):
return self.key.__hash__()
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
import pyarrow as pa
from airbyte_cdk.logger import AirbyteLogger

from ..file_info import FileInfo


class AbstractFileParser(ABC):
logger = AirbyteLogger()

def __init__(self, format: dict, master_schema: dict = None):
"""
:param format: file format specific mapping as described in spec.json
Expand All @@ -19,7 +23,6 @@ def __init__(self, format: dict, master_schema: dict = None):
self._master_schema = (
master_schema # this may need to be used differently by some formats, pyarrow allows extra columns in csv schema
)
self.logger = AirbyteLogger()

@property
@abstractmethod
Expand All @@ -39,12 +42,13 @@ def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
"""

@abstractmethod
def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Iterator[Mapping[str, Any]]:
"""
Override this with format-specifc logic to stream each data row from the file as a mapping of {columns:values}
Note: avoid loading the whole file into memory to avoid OOM breakages

:param file: file-like object (opened via StorageFile)
:param file_info: file metadata
:yield: data record as a mapping of {columns:values}
"""

Expand Down
Loading