Skip to content

SAT: compatibility tests for catalogs #15486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.62
Backward compatibility tests: add syntactic validation of catalogs [#15486](https://github.com/airbytehq/airbyte/pull/15486/)

## 0.1.61
Add unit tests coverage computation [#15443](https://github.com/airbytehq/airbyte/pull/15443/).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.1.61
LABEL io.airbyte.version=0.1.62
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class Status(Enum):
class DiscoveryTestConfig(BaseConfig):
config_path: str = config_path
timeout_seconds: int = timeout_seconds
backward_compatibility_tests_config: BackwardCompatibilityTestsConfig = Field(
description="Configuration for the backward compatibility tests.", default=BackwardCompatibilityTestsConfig()
)


class ExpectedRecordsConfig(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ def cached_schemas_fixture() -> MutableMapping[str, AirbyteStream]:
return {}


@pytest.fixture(name="previous_cached_schemas", scope="session")
def previous_cached_schemas_fixture() -> MutableMapping[str, AirbyteStream]:
"""Simple cache for discovered catalog of previous connector: stream_name -> json_schema"""
return {}


@pytest.fixture(name="discovered_catalog")
def discovered_catalog_fixture(connector_config, docker_runner: ConnectorRunner, cached_schemas) -> MutableMapping[str, AirbyteStream]:
"""JSON schemas for each stream"""
Expand All @@ -190,6 +196,19 @@ def discovered_catalog_fixture(connector_config, docker_runner: ConnectorRunner,
return cached_schemas


@pytest.fixture(name="previous_discovered_catalog")
def previous_discovered_catalog_fixture(
connector_config, previous_connector_docker_runner: ConnectorRunner, previous_cached_schemas
) -> MutableMapping[str, AirbyteStream]:
"""JSON schemas for each stream"""
if not previous_cached_schemas:
output = previous_connector_docker_runner.call_discover(config=connector_config)
catalogs = [message.catalog for message in output if message.type == Type.CATALOG]
for stream in catalogs[-1].streams:
previous_cached_schemas[stream.name] = stream
return previous_cached_schemas


@pytest.fixture
def detailed_logger() -> Logger:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from source_acceptance_test.base import BaseTest
from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig, SpecTestConfig
from source_acceptance_test.utils import ConnectorRunner, SecretDict, filter_output, make_hashable, verify_records_schema
from source_acceptance_test.utils.backward_compatibility import SpecDiffChecker, validate_previous_configs
from source_acceptance_test.utils.backward_compatibility import CatalogDiffChecker, SpecDiffChecker, validate_previous_configs
from source_acceptance_test.utils.common import find_all_values_for_key_in_schema, find_keyword_schema
from source_acceptance_test.utils.json_schema_helper import JsonSchemaHelper, get_expected_schema_structure, get_object_structure

Expand Down Expand Up @@ -192,7 +192,7 @@ def test_backward_compatibility(
assert isinstance(actual_connector_spec, ConnectorSpecification) and isinstance(previous_connector_spec, ConnectorSpecification)
spec_diff = self.compute_spec_diff(actual_connector_spec, previous_connector_spec)
checker = SpecDiffChecker(spec_diff)
checker.assert_spec_is_backward_compatible()
checker.assert_is_backward_compatible()
validate_previous_configs(previous_connector_spec, actual_connector_spec, number_of_configs_to_generate)

def test_additional_properties_is_true(self, actual_connector_spec: ConnectorSpecification):
Expand Down Expand Up @@ -235,6 +235,29 @@ def test_check(self, connector_config, inputs: ConnectionTestConfig, docker_runn

@pytest.mark.default_timeout(30)
class TestDiscovery(BaseTest):
@staticmethod
def compute_discovered_catalog_diff(
discovered_catalog: MutableMapping[str, AirbyteStream], previous_discovered_catalog: MutableMapping[str, AirbyteStream]
):
return DeepDiff(
{stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in previous_discovered_catalog.items()},
{stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in discovered_catalog.items()},
view="tree",
ignore_order=True,
)

@pytest.fixture(name="skip_backward_compatibility_tests")
def skip_backward_compatibility_tests_fixture(self, inputs: SpecTestConfig, previous_connector_docker_runner: ConnectorRunner) -> bool:
if previous_connector_docker_runner is None:
pytest.skip("The previous connector image could not be retrieved.")

# Get the real connector version in case 'latest' is used in the config:
previous_connector_version = previous_connector_docker_runner._image.labels.get("io.airbyte.version")

if previous_connector_version == inputs.backward_compatibility_tests_config.disable_for_version:
pytest.skip(f"Backward compatibility tests are disabled for version {previous_connector_version}.")
return False

def test_discover(self, connector_config, docker_runner: ConnectorRunner):
"""Verify that discover produce correct schema."""
output = docker_runner.call_discover(config=connector_config)
Expand Down Expand Up @@ -307,6 +330,23 @@ def test_additional_properties_is_true(self, discovered_catalog: Mapping[str, An
[additional_properties_value is True for additional_properties_value in additional_properties_values]
), "When set, additionalProperties field value must be true for backward compatibility."

@pytest.mark.default_timeout(60)
@pytest.mark.backward_compatibility
def test_backward_compatibility(
self,
skip_backward_compatibility_tests: bool,
discovered_catalog: MutableMapping[str, AirbyteStream],
previous_discovered_catalog: MutableMapping[str, AirbyteStream],
):
"""Check if the current spec is backward_compatible:
1. Perform multiple hardcoded syntactic checks with SpecDiffChecker.
2. Validate fake generated previous configs against the actual connector specification with validate_previous_configs.
"""
assert isinstance(discovered_catalog, MutableMapping) and isinstance(previous_discovered_catalog, MutableMapping)
catalog_diff = self.compute_discovered_catalog_diff(discovered_catalog, previous_discovered_catalog)
checker = CatalogDiffChecker(catalog_diff)
checker.assert_is_backward_compatible()


def primary_keys_for_records(streams, records):
streams_with_primary_key = [stream for stream in streams if stream.stream.source_defined_primary_key]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from multiprocessing import context

import jsonschema
from airbyte_cdk.models import ConnectorSpecification
from deepdiff import DeepDiff
Expand All @@ -10,44 +13,25 @@
from source_acceptance_test.utils import SecretDict


class NonBackwardCompatibleSpecError(Exception):
class NonBackwardCompatibleError(Exception):
pass


class SpecDiffChecker:
"""A class to perform multiple backward compatible checks on a spec diff"""

class BaseDiffChecker(ABC):
def __init__(self, diff: DeepDiff) -> None:
self._diff = diff

def assert_spec_is_backward_compatible(self):
self.check_if_declared_new_required_field()
self.check_if_added_a_new_required_property()
self.check_if_value_of_type_field_changed()
# self.check_if_new_type_was_added() We want to allow type expansion atm
self.check_if_type_of_type_field_changed()
self.check_if_field_was_made_not_nullable()
self.check_if_enum_was_narrowed()
self.check_if_declared_new_enum_field()

def _raise_error(self, message: str):
raise NonBackwardCompatibleSpecError(f"{message}: {self._diff.pretty()}")
raise NonBackwardCompatibleError(f"{context} - {message}: {self._diff.pretty()}")

def check_if_declared_new_required_field(self):
"""Check if the new spec declared a 'required' field."""
added_required_fields = [
addition for addition in self._diff.get("dictionary_item_added", []) if addition.path(output_format="list")[-1] == "required"
]
if added_required_fields:
self._raise_error("The current spec declared a new 'required' field")
@property
@abstractmethod
def context(self):
pass

def check_if_added_a_new_required_property(self):
"""Check if the new spec added a property to the 'required' list."""
added_required_properties = [
addition for addition in self._diff.get("iterable_item_added", []) if addition.up.path(output_format="list")[-1] == "required"
]
if added_required_properties:
self._raise_error("A new property was added to 'required'")
@abstractmethod
def assert_is_backward_compatible(self):
pass

def check_if_value_of_type_field_changed(self):
"""Check if a type was changed"""
Expand All @@ -59,7 +43,7 @@ def check_if_value_of_type_field_changed(self):
change for change in self._diff.get("values_changed", []) if change.path(output_format="list")[-2] == "type"
]
if type_values_changed or type_values_changed_in_list:
self._raise_error("The current spec changed the value of a 'type' field")
self._raise_error("The value of a 'type' field was changed.")

def check_if_new_type_was_added(self):
"""Detect type value added to type list if new type value is not None (e.g ["str"] -> ["str", "int"])"""
Expand All @@ -70,7 +54,7 @@ def check_if_new_type_was_added(self):
if change.t2 != "null"
]
if new_values_in_type_list:
self._raise_error("The current spec changed the value of a 'type' field")
self._raise_error("A new value was added to a 'type' field.")

def check_if_type_of_type_field_changed(self):
"""
Expand All @@ -90,29 +74,61 @@ def check_if_type_of_type_field_changed(self):
# This might be something already guaranteed by JSON schema validation.
if isinstance(change.t1, str):
if not isinstance(change.t2, list):
self._raise_error("The current spec change a type field from string to an invalid value.")
self._raise_error("The current {context} change a type field from string to an invalid value.")
if not 0 < len(change.t2) <= 2:
self._raise_error(
"The current spec change a type field from string to an invalid value. The type list length should not be empty and have a maximum of two items."
"A type field changed from string to an invalid value. The type list should not be empty and have a maximum of two items."
)
# If the new type field is a list we want to make sure it only has the original type (t1) and null: e.g. "str" -> ["str", "null"]
# We want to raise an error otherwise.
t2_not_null_types = [_type for _type in change.t2 if _type != "null"]
if not (len(t2_not_null_types) == 1 and t2_not_null_types[0] == change.t1):
self._raise_error("The current spec change a type field to a list with multiple invalid values.")
self._raise_error("The type field changed to a list with multiple invalid values.")
if isinstance(change.t1, list):
if not isinstance(change.t2, str):
self._raise_error("The current spec change a type field from list to an invalid value.")
self._raise_error("The type field changed from a list to an invalid value.")
if not (len(change.t1) == 1 and change.t2 == change.t1[0]):
self._raise_error("The current spec narrowed a field type.")
self._raise_error("An element was removed from the list of valid types.")


class SpecDiffChecker(BaseDiffChecker):
"""A class to perform backward compatibility checks on a connector specification diff"""

context = "Specification"

def assert_is_backward_compatible(self):
self.check_if_declared_new_required_field()
self.check_if_added_a_new_required_property()
self.check_if_value_of_type_field_changed()
# self.check_if_new_type_was_added() We want to allow type expansion atm
self.check_if_type_of_type_field_changed()
self.check_if_field_was_made_not_nullable()
self.check_if_enum_was_narrowed()
self.check_if_declared_new_enum_field()

def check_if_declared_new_required_field(self):
"""Check if the new spec declared a 'required' field."""
added_required_fields = [
addition for addition in self._diff.get("dictionary_item_added", []) if addition.path(output_format="list")[-1] == "required"
]
if added_required_fields:
self._raise_error(f"The current {context} declared a new 'required' field")

def check_if_added_a_new_required_property(self):
"""Check if the new spec added a property to the 'required' list."""
added_required_properties = [
addition for addition in self._diff.get("iterable_item_added", []) if addition.up.path(output_format="list")[-1] == "required"
]
if added_required_properties:
self._raise_error("A new property was added to 'required'")

def check_if_field_was_made_not_nullable(self):
"""Detect when field was made not nullable but is still a list: e.g ["string", "null"] -> ["string"]"""
removed_nullable = [
change for change in self._diff.get("iterable_item_removed", []) if change.path(output_format="list")[-2] == "type"
]
if removed_nullable:
self._raise_error("The current spec narrowed a field type or made a field not nullable.")
self._raise_error("A field type was narrowed or made a field not nullable.")

def check_if_enum_was_narrowed(self):
"""Check if the list of values in a enum was shortened in a spec."""
Expand All @@ -122,7 +138,7 @@ def check_if_enum_was_narrowed(self):
if enum_removal.up.path(output_format="list")[-1] == "enum"
]
if enum_removals:
self._raise_error("The current spec narrowed an enum field.")
self._raise_error("An enum field was narrowed.")

def check_if_declared_new_enum_field(self):
"""Check if an 'enum' field was added to the spec."""
Expand Down Expand Up @@ -150,6 +166,26 @@ def check_fake_previous_config_against_actual_spec(fake_previous_config):
try:
jsonschema.validate(instance=filtered_fake_previous_config, schema=actual_connector_spec.connectionSpecification)
except jsonschema.exceptions.ValidationError as err:
raise NonBackwardCompatibleSpecError(err)
raise NonBackwardCompatibleError(err)

check_fake_previous_config_against_actual_spec()


class CatalogDiffChecker(BaseDiffChecker):
"""A class to perform backward compatibility checks on a discoverd catalog diff"""

context = "Catalog"

def assert_is_backward_compatible(self):
self.check_if_stream_was_removed()
self.check_if_value_of_type_field_changed()
self.check_if_type_of_type_field_changed()

def check_if_stream_was_removed(self):
"""Check if a stream was removed from the catalog."""
removed_streams = []
for removal in self._diff.get("dictionary_item_removed", []):
if removal.path() != "root" and removal.up.path() == "root":
removed_streams.append(removal.path(output_format="list")[0])
if removed_streams:
self._raise_error(f"The following streams were removed: {','.join(removed_streams)}")
Loading