Skip to content

🎉Source Amazon S3: increase unit test coverage at least 90% #11967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.13
dockerImageTag: 0.1.14
documentationUrl: https://docs.airbyte.io/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7158,7 +7158,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-s3:0.1.13"
- dockerImage: "airbyte/source-s3:0.1.14"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.io/integrations/sources/s3"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.version=0.1.14
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def _setup_boto_session(self) -> None:
Currently grabbing last_modified across multiple files asynchronously and may implement more multi-threading in future.
See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html (anchor link broken, scroll to bottom)
"""
if self.use_aws_account:
if self.use_aws_account(self._provider):
self._boto_session = boto3session.Session(
aws_access_key_id=self._provider.get("aws_access_key_id"),
aws_secret_access_key=self._provider.get("aws_secret_access_key"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def fileformatparser_class(self) -> type:
:return: reference to the relevant fileformatparser class e.g. CsvParser
"""
filetype = self._format.get("filetype")
file_reader = self.fileformatparser_map.get(self._format.get("filetype"))
file_reader = self.fileformatparser_map.get(filetype)
if not file_reader:
raise RuntimeError(
f"Detected mismatched file format '{filetype}'. Available values: '{list(self.fileformatparser_map.keys())}''."
Expand Down Expand Up @@ -221,23 +221,24 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
# this compares datatype of every column that the two schemas have in common
for col in column_superset:
if (col in master_schema.keys()) and (col in this_schema.keys()) and (master_schema[col] != this_schema[col]):
# if this column exists in a provided schema or schema state, we'll WARN here rather than throw an error
# this is to allow more leniency as we may be able to coerce this datatype mismatch on read according to provided schema state
# if not, then the read will error anyway
# If this column exists in a provided schema or schema state, we'll WARN here rather than throw an error
# this is to allow more leniency as we may be able to coerce this datatype mismatch on read according to
# provided schema state. If not, then the read will error anyway
if col in self._schema.keys():
LOGGER.warn(
f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. "
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'. "
+ f"Airbyte will attempt to coerce this to {master_schema[col]} on read."
)
# else we're inferring the schema (or at least this column) from scratch and therefore throw an error on mismatching datatypes
# else we're inferring the schema (or at least this column) from scratch and therefore
# throw an error on mismatching datatypes
else:
raise RuntimeError(
f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. "
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'."
)

# missing columns in this_schema doesn't affect our master_schema so we don't check for it here
# missing columns in this_schema doesn't affect our master_schema, so we don't check for it here

# add to master_schema any columns from this_schema that aren't already present
for col, datatype in this_schema.items():
Expand Down
55 changes: 54 additions & 1 deletion airbyte-integrations/connectors/source-s3/unit_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,21 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json
import os
import shutil
import tempfile
from typing import Any
from pathlib import Path
from typing import Any, List, Mapping

import requests # noqa
from airbyte_cdk import AirbyteLogger
from netifaces import AF_INET, ifaddresses, interfaces
from pytest import fixture
from requests.exceptions import ConnectionError # noqa
from source_s3 import SourceS3

logger = AirbyteLogger()

TMP_FOLDER = os.path.join(tempfile.gettempdir(), "test_generated")

Expand All @@ -22,3 +33,45 @@ def pytest_generate_tests(metafunc: Any) -> None:
def pytest_sessionfinish(session: Any, exitstatus: Any) -> None:
"""whole test run finishes."""
shutil.rmtree(TMP_FOLDER, ignore_errors=True)


@fixture(name="config")
def config_fixture(tmp_path):
config_file = tmp_path / "config.json"
with open(config_file, "w") as fp:
json.dump(
{
"dataset": "dummy",
"provider": {"bucket": "test-test", "endpoint": "test", "use_ssl": "test", "verify_ssl_cert": "test"},
"path_pattern": "",
"format": {"delimiter": "\\t"},
},
fp,
)
source = SourceS3()
config = source.read_config(config_file)
return config


def get_local_ip() -> str:
all_interface_ips: List[str] = []
for iface_name in interfaces():
all_interface_ips += [i["addr"] for i in ifaddresses(iface_name).setdefault(AF_INET, [{"addr": None}]) if i["addr"]]
logger.info(f"detected interface IPs: {all_interface_ips}")
for ip in sorted(all_interface_ips):
if not ip.startswith("127."):
return ip

assert False, "not found an non-localhost interface"


@fixture(scope="session")
def minio_credentials() -> Mapping[str, Any]:
config_template = Path(__file__).parent / "config_minio.template.json"
assert config_template.is_file() is not None, f"not found {config_template}"
config_file = Path(__file__).parent / "config_minio.json"
config_file.write_text(config_template.read_text().replace("<local_ip>", get_local_ip()))

with open(str(config_file)) as f:
credentials = json.load(f)
return credentials
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from source_s3.source_files_abstract.formats.parquet_parser import PARQUET_TYPES, ParquetParser

from .abstract_test_parser import AbstractTestParser
Expand Down Expand Up @@ -239,3 +240,11 @@ def cases(cls) -> Mapping[str, Any]:
"fails": [],
}
return cases

def test_parse_field_type(self):
with pytest.raises(TypeError):
assert ParquetParser.parse_field_type(needed_logical_type="", need_physical_type="")

def test_convert_field_data(self):
with pytest.raises(TypeError):
ParquetParser.convert_field_data(logical_type="", field_value="")
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@


from typing import Mapping
from unittest.mock import MagicMock

import pytest
import smart_open
from airbyte_cdk import AirbyteLogger
from source_s3.s3file import S3File

Expand All @@ -30,3 +32,15 @@ class TestS3File:
)
def test_use_aws_account(self, provider: Mapping[str, str], return_true: bool) -> None:
assert S3File.use_aws_account(provider) is return_true

@pytest.mark.parametrize( # passing in full provider to emulate real usage (dummy values are unused by func)
"provider",
[
({"storage": "S3", "bucket": "dummy", "aws_access_key_id": "id", "aws_secret_access_key": "key", "path_prefix": "dummy"}),
({"storage": "S3", "bucket": "dummy", "aws_access_key_id": None, "aws_secret_access_key": None, "path_prefix": "dummy"}),
],
)
def test_s3_file_contextmanager(self, provider):
smart_open.open = MagicMock()
with S3File(file_info=MagicMock(), provider=provider).open("rb") as s3_file:
assert s3_file
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@
#

import json
from unittest.mock import MagicMock, patch

import pytest
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import ConnectorSpecification
from source_s3 import SourceS3
from source_s3.source_files_abstract.spec import SourceFilesAbstractSpec

logger = AirbyteLogger()


def test_transform_backslash_t_to_tab(tmp_path):
Expand All @@ -14,3 +21,47 @@ def test_transform_backslash_t_to_tab(tmp_path):
source = SourceS3()
config = source.read_config(config_file)
assert config["format"]["delimiter"] == "\t"


def test_check_connection_empty_config():
config = {}
ok, error_msg = SourceS3().check_connection(logger, config=config)

assert not ok
assert error_msg


def test_check_connection_exception(config):
ok, error_msg = SourceS3().check_connection(logger, config=config)

assert not ok
assert error_msg


def test_check_connection(config):
instance = SourceS3()
with patch.object(instance.stream_class, "filepath_iterator", MagicMock()):
ok, error_msg = instance.check_connection(logger, config=config)

assert ok
assert not error_msg


def test_streams(config):
instance = SourceS3()
assert len(instance.streams(config)) == 1


def test_spec():
spec = SourceS3().spec()

assert isinstance(spec, ConnectorSpecification)


def test_check_provider_added():
with pytest.raises(Exception):
SourceFilesAbstractSpec.check_provider_added({"properties": []})


def test_change_format_to_oneOf():
assert SourceFilesAbstractSpec.change_format_to_oneOf({"properties": {"format": {"oneOf": ""}}})
Loading