Skip to content

🐛 Source S3: Loading of files' metadata #8252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 59 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3f5fe85
update unit tests
antixar Nov 23, 2021
a4ce4dd
update tests
antixar Nov 25, 2021
00cd291
update version
antixar Nov 25, 2021
75fb824
fix conftest
antixar Nov 25, 2021
5749ce9
correction connection_setup
antixar Nov 25, 2021
a84d5bb
Update airbyte-integrations/connectors/source-s3/source_s3/source_fil…
antixar Dec 8, 2021
7a9d3a0
merge with master
antixar Dec 15, 2021
1aface7
restore slice logic
antixar Dec 16, 2021
fb5daa0
Merge remote-tracking branch 'origin' into antixar/6870-source-s3-csv…
antixar Dec 20, 2021
c221569
test correction
antixar Dec 20, 2021
86b8640
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Dec 20, 2021
88b153c
test logs
antixar Dec 20, 2021
1f28602
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Dec 21, 2021
4819e23
correction after review
antixar Dec 21, 2021
ac1f33f
reset failed files
antixar Dec 21, 2021
1bc0c3b
test without custom chunking
antixar Dec 21, 2021
247eb1d
correction after flake8
antixar Dec 21, 2021
4aa028d
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Dec 21, 2021
150f98e
correction of tests
antixar Dec 21, 2021
9f39aa2
merge with master
antixar Jan 10, 2022
05e6641
bix a test bug
antixar Jan 10, 2022
4a26bde
fix flake8 comment
antixar Jan 10, 2022
1e2efaf
fix generation of minio config file
antixar Jan 10, 2022
c9e4d3a
fix generation of minio config file
antixar Jan 10, 2022
29d9dc3
fix generation of minio config file2
antixar Jan 10, 2022
6f07a58
fix aceptance test
antixar Jan 10, 2022
80ff530
fix aceptance test3
antixar Jan 10, 2022
a1108e7
fix aceptance test4
antixar Jan 10, 2022
e20d0cd
fix aceptance test4
antixar Jan 10, 2022
17542b1
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Jan 24, 2022
7779233
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Jan 25, 2022
7f8b106
fix mypy comments
antixar Jan 25, 2022
6d2af0c
fix mypy comments
antixar Jan 25, 2022
a4dc642
improve tracemalloc.take_snapshot
grubberr Jan 26, 2022
c98c46c
fix tmp folder for minio
antixar Jan 27, 2022
9858cc0
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Jan 27, 2022
bc1dae9
fix flake8 comments
antixar Jan 27, 2022
7873cd6
fix unit tests
antixar Jan 27, 2022
4a08a2b
bump version
antixar Jan 27, 2022
df836a9
raise test timeouts
antixar Jan 28, 2022
7782cba
debug minio tests
antixar Jan 28, 2022
e4b0f3d
Merge remote-tracking branch 'origin/master' into antixar/6870-source…
antixar Jan 31, 2022
89e80e3
debug test issues
antixar Jan 31, 2022
2a42bf2
corrected by isort
antixar Jan 31, 2022
6a7b01d
clean temp test buckets
antixar Jan 31, 2022
9e6d5b0
test locale issue
antixar Feb 1, 2022
c321fe4
move new tests to single test file
antixar Feb 1, 2022
abb6044
correction after flake8
antixar Feb 1, 2022
a07ee66
debug reports
antixar Feb 1, 2022
a4cc4c8
test covarage logic
antixar Feb 1, 2022
ababf7d
test covarage logic
antixar Feb 1, 2022
c93f1d3
test covarage logic
antixar Feb 1, 2022
5b48a94
test covarage logic
antixar Feb 1, 2022
8c933d6
test covarage logic
antixar Feb 1, 2022
c7f227f
test failed unittests
antixar Feb 1, 2022
2efb604
test failed integration tests
antixar Feb 1, 2022
f64d8e3
test failed SAT
antixar Feb 1, 2022
e2ecb0c
successful tests
antixar Feb 1, 2022
b753296
update spec file
antixar Feb 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,13 @@
#


import shutil
import tempfile
from zipfile import ZipFile

import docker
import pytest
from typing import Iterable

pytest_plugins = ("source_acceptance_test.plugin",)


@pytest.fixture(scope="session", autouse=True)
def connector_setup():
def connector_setup() -> Iterable[None]:
"""This fixture is a placeholder for external resources that acceptance test might require."""
yield


@pytest.fixture(scope="session", autouse=True)
def minio_setup():
client = docker.from_env()
tmp_dir = tempfile.mkdtemp()
with ZipFile("./integration_tests/minio_data.zip") as archive:
archive.extractall(tmp_dir)

container = client.containers.run(
"minio/minio",
f"server {tmp_dir}/minio_data",
network_mode="host",
volumes=["/tmp:/tmp", "/var/run/docker.sock:/var/run/docker.sock"],
detach=True,
)
yield
shutil.rmtree(tmp_dir)
container.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import os
from pathlib import Path
from typing import Any, Dict
import pytest
from airbyte_cdk import AirbyteLogger
from source_s3.source import SourceS3
from unit_tests.abstract_test_parser import memory_limit
from unit_tests.test_csv_parser import generate_big_file

from .conftest import TMP_FOLDER

HERE = Path(__file__).resolve().parent


class TestIntegrationCsvFiles:
logger = AirbyteLogger()

@memory_limit(150) # max used memory should be less than 150Mb
def read_source(self, credentials: Dict[str, Any], catalog: Dict[str, Any]) -> int:
read_count = 0
for msg in SourceS3().read(logger=self.logger, config=credentials, catalog=catalog):
if msg.record:
read_count += 1
return read_count

@pytest.mark.order(1)
def test_big_file(self, minio_credentials: Dict[str, Any]) -> None:
"""tests a big csv file (>= 1.0G records)"""
# generates a big CSV files separately
big_file_folder = os.path.join(TMP_FOLDER, "minio_data", "test-bucket", "big_files")
os.makedirs(big_file_folder)
filepath = os.path.join(big_file_folder, "file.csv")

# please change this value if you need to test another file size
future_file_size = 0.5 # in gigabytes
_, file_size = generate_big_file(filepath, future_file_size, 500)
expected_count = sum(1 for _ in open(filepath)) - 1
self.logger.info(f"generated file {filepath} with size {file_size}Gb, lines: {expected_count}")

minio_credentials["path_pattern"] = "big_files/*.csv"
minio_credentials["format"]["block_size"] = 5 * 1024 ** 2
source = SourceS3()
catalog = source.read_catalog(HERE / "configured_catalog.json")

assert self.read_source(minio_credentials, catalog) == expected_count
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://localhost:9000"
"endpoint": "http://<local_ip>:9000"
},
"format": {
"filetype": "csv"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json
import tempfile
import time
from pathlib import Path
from typing import Mapping, Any, Iterable, List
from zipfile import ZipFile

import docker
import pytest
import requests # type: ignore[import]
from airbyte_cdk import AirbyteLogger
from docker.errors import APIError
from netifaces import AF_INET, ifaddresses, interfaces
from requests.exceptions import ConnectionError # type: ignore[import]

logger = AirbyteLogger()
TMP_FOLDER = tempfile.mkdtemp()


def get_local_ip() -> str:
all_interface_ips: List[str] = []
for iface_name in interfaces():
all_interface_ips += [i["addr"] for i in ifaddresses(iface_name).setdefault(AF_INET, [{"addr": None}]) if i["addr"]]
logger.info(f"detected interface IPs: {all_interface_ips}")
for ip in sorted(all_interface_ips):
if not ip.startswith("127."):
return ip

assert False, "not found an non-localhost interface"


@pytest.fixture(scope="session")
def minio_credentials() -> Mapping[str, Any]:
config_template = Path(__file__).parent / "config_minio.template.json"
assert config_template.is_file() is not None, f"not found {config_template}"
config_file = Path(__file__).parent / "config_minio.json"
config_file.write_text(config_template.read_text().replace("<local_ip>", get_local_ip()))
credentials = {}
with open(str(config_file)) as f:
credentials = json.load(f)
return credentials


@pytest.fixture(scope="session", autouse=True)
def minio_setup(minio_credentials: Mapping[str, Any]) -> Iterable[None]:
with ZipFile("./integration_tests/minio_data.zip") as archive:
archive.extractall(TMP_FOLDER)
client = docker.from_env()
# Minio should be attached to non-localhost interface.
# Because another test container should have direct connection to it
local_ip = get_local_ip()
logger.debug(f"minio settings: {minio_credentials}")
try:
container = client.containers.run(
image="minio/minio:RELEASE.2021-10-06T23-36-31Z",
command=f"server {TMP_FOLDER}",
name="ci_test_minio",
auto_remove=True,
volumes=[f"/{TMP_FOLDER}/minio_data:/{TMP_FOLDER}"],
detach=True,
ports={"9000/tcp": (local_ip, 9000)},
)
except APIError as err:
if err.status_code == 409:
for container in client.containers.list():
if container.name == "ci_test_minio":
logger.info("minio was started before")
break
else:
raise

check_url = f"http://{local_ip}:9000/minio/health/live"
while True:
try:
data = requests.get(check_url)
except ConnectionError as err:
logger.warn(f"minio error: {err}")
time.sleep(0.5)
continue
if data.status_code == 200:
break
logger.info("Run a minio/minio container...")
yield
# this minio container was not finished because it is needed for all integration adn acceptance tests
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def credentials(self) -> Mapping:
def provider(self, bucket_name: str) -> Mapping:
return {"storage": "S3", "bucket": bucket_name}

def _s3_connect(self, credentials: Mapping):
def _s3_connect(self, credentials: Mapping) -> None:
region = "eu-west-3"
self.s3_client = boto3.client(
"s3",
Expand Down Expand Up @@ -85,7 +85,7 @@ def cloud_files(self, cloud_bucket_name: str, credentials: Mapping, files_to_upl
self.s3_client.upload_file(str(filepath), bucket_name, upload_path, ExtraArgs=extra_args)
yield f"{bucket_name}/{upload_path}"

def teardown_infra(self, cloud_bucket_name: str, credentials: Mapping):
def teardown_infra(self, cloud_bucket_name: str, credentials: Mapping) -> None:
self._s3_connect(credentials)
bucket = self.s3_resource.Bucket(cloud_bucket_name)
bucket.objects.all().delete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Iterator, List, Mapping
from typing import Iterator, List, Mapping, Any
from uuid import uuid4

import pytest
Expand All @@ -29,7 +29,7 @@ def cloud_bucket_prefix(self) -> str:
return "airbytetest-"

@pytest.fixture(scope="session")
def format(self) -> str:
def format(self) -> Mapping[str, Any]:
return {"filetype": "csv"}

@pytest.fixture(scope="session")
Expand Down Expand Up @@ -71,7 +71,7 @@ def cloud_files(self, cloud_bucket_name: str, credentials: Mapping, files_to_upl
"""

@abstractmethod
def teardown_infra(self, cloud_bucket_name: str, credentials: Mapping):
def teardown_infra(self, cloud_bucket_name: str, credentials: Mapping) -> None:
"""
Provider-specific logic to tidy up any cloud resources.
See S3 for example.
Expand All @@ -82,20 +82,20 @@ def teardown_infra(self, cloud_bucket_name: str, credentials: Mapping):

def _stream_records_test_logic(
self,
cloud_bucket_name,
format,
airbyte_system_columns,
sync_mode,
files,
path_pattern,
private,
num_columns,
num_records,
expected_schema,
user_schema,
fails,
state=None,
):
cloud_bucket_name: str,
format: Mapping[str, str],
airbyte_system_columns: Mapping[str, str],
sync_mode: Any,
files: List[str],
path_pattern: str,
private: bool,
num_columns: Any,
num_records: Any,
expected_schema: Mapping[str, Any],
user_schema: Mapping[str, Any],
fails: Any,
state: Any = None,
) -> Any:
uploaded_files = [fpath for fpath in self.cloud_files(cloud_bucket_name, self.credentials, files, private)]
LOGGER.info(f"file(s) uploaded: {uploaded_files}")

Expand All @@ -112,7 +112,6 @@ def _stream_records_test_logic(
provider = {**self.provider(cloud_bucket_name), **self.credentials} if private else self.provider(cloud_bucket_name)

if not fails:
print(str_user_schema)
fs = self.stream_class("dataset", provider, format, path_pattern, str_user_schema)
LOGGER.info(f"Testing stream_records() in SyncMode:{sync_mode.value}")

Expand All @@ -124,10 +123,10 @@ def _stream_records_test_logic(
if stream_slice is not None:
# we need to do this in order to work out which extra columns (if any) we expect in this stream_slice
expected_columns = []
for file_dict in stream_slice:
for file_dict in stream_slice["files"]:
# TODO: if we ever test other filetypes in these tests this will need fixing
file_reader = CsvParser(format)
with file_dict["storagefile"].open(file_reader.is_binary) as f:
with file_dict["storage_file"].open(file_reader.is_binary) as f:
expected_columns.extend(list(file_reader.get_inferred_schema(f).keys()))
expected_columns = set(expected_columns) # de-dupe

Expand Down Expand Up @@ -167,7 +166,8 @@ def _stream_records_test_logic(

LOGGER.info(f"Failed as expected, error: {e_info}")

@pytest.mark.parametrize( # make user_schema None to test auto-inference. Exclude any _airbyte system columns in expected_schema.
@pytest.mark.parametrize(
# make user_schema None to test auto-inference. Exclude any _airbyte system columns in expected_schema.
"files, path_pattern, private, num_columns, num_records, expected_schema, user_schema, incremental, fails",
[
# single file tests
Expand Down Expand Up @@ -477,19 +477,19 @@ def _stream_records_test_logic(
)
def test_stream_records(
self,
cloud_bucket_prefix,
format,
airbyte_system_columns,
files,
path_pattern,
private,
num_columns,
num_records,
expected_schema,
user_schema,
incremental,
fails,
):
cloud_bucket_prefix: str,
format: Mapping[str, Any],
airbyte_system_columns: Mapping[str, str],
files: List[str],
path_pattern: str,
private: bool,
num_columns: List[int],
num_records: List[int],
expected_schema: Mapping[str, Any],
user_schema: Mapping[str, Any],
incremental: bool,
fails: List[bool],
) -> None:
try:
if not incremental: # we expect matching behaviour here in either sync_mode
for sync_mode in [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@
"title": "Advanced Options",
"description": "Optionally add a valid JSON string here to provide additional <a href=\"https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html#pyarrow.csv.ReadOptions\" target=\"_blank\">Pyarrow ReadOptions</a>. Specify 'column_names' here if your CSV doesn't have header, or if you want to use custom column names. 'block_size' and 'encoding' are already used above, specify them again here will override the values above.",
"default": "{}",
"examples": ["{\"column_names\": [\"column1\", \"column2\"]}"],
"examples": [
"{\"column_names\": [\"column1\", \"column2\"]}"
],
"type": "string"
},
"infer_datatypes": {
Expand All @@ -110,7 +112,6 @@
}
}
},

{
"title": "parquet",
"description": "This connector utilises <a href=\"https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html\" target=\"_blank\">PyArrow (Apache Arrow)</a> for Parquet parsing.",
Expand All @@ -131,11 +132,13 @@
"title": "Columns",
"description": "If you only want to sync a subset of the columns from the file(s), add the columns you want here. Leave it empty to sync all columns.",
"type": "array",
"items": { "type": "string" }
"items": {
"type": "string"
}
},
"batch_size": {
"title": "Batch Size",
"description": "Maximum number of records per batch. Batches may be smaller if there aren\u2019t enough rows in the file. This option can help to optimize a work with memory if your data is particularly wide or failing during detection of OOM errors.",
"description": "Maximum number of records per batch. Batches may be smaller if there aren’t enough rows in the file. This option can help to optimize a work with memory if your data is particularly wide or failing during detection of OOM errors.",
"default": 65536,
"type": "integer"
}
Expand Down Expand Up @@ -187,11 +190,21 @@
"type": "boolean"
}
},
"required": ["bucket"]
"required": [
"bucket"
]
}
},
"required": ["dataset", "path_pattern", "provider"]
"required": [
"dataset",
"path_pattern",
"provider"
]
},
"supportsIncremental": true,
"supported_destination_sync_modes": ["overwrite", "append", "append_dedup"]
}
"supported_destination_sync_modes": [
"overwrite",
"append",
"append_dedup"
]
}
Loading