From 61b6c03b423120085b29a4d64cf527a5efc4a49c Mon Sep 17 00:00:00 2001 From: brianjlai Date: Wed, 9 Nov 2022 18:34:56 -0800 Subject: [PATCH 1/4] don't update cursor for log messages and and default schema path coming from connector builder --- .../retrievers/simple_retriever.py | 7 +- .../schema/json_file_schema_loader.py | 11 ++- .../retrievers/test_simple_retriever.py | 70 ++++++++++++++++++- 3 files changed, 81 insertions(+), 7 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index e35820ed9d32e..62dbaf09adddc 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -371,11 +371,14 @@ def read_records( stream_state, ) for record in records_generator: - self.stream_slicer.update_cursor(stream_slice, last_record=record) + # Only record messages should be parsed to update the cursor which is indicated by the Mapping type + if isinstance(record, Mapping): + self.stream_slicer.update_cursor(stream_slice, last_record=record) yield record else: last_record = self._last_records[-1] if self._last_records else None - self.stream_slicer.update_cursor(stream_slice, last_record=last_record) + if last_record and isinstance(last_record, Mapping): + self.stream_slicer.update_cursor(stream_slice, last_record=last_record) yield from [] def stream_slices( diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py index de3fb380d1bcf..739ee69e72af6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py @@ -15,11 +15,16 @@ def _default_file_path() -> str: - # schema files are always in "source_/schemas/.json - # the connector's module name can be inferred by looking at the modules loaded and look for the one starting with source_ + # Schema files are always in "source_/schemas/.json + # The connector's module name can be inferred by looking at the modules loaded and look for the one starting with source_ + # One exception to this rule are reads invoked from the connector builder server which is prefixed by connector_builder source_modules = [ - k for k, v in sys.modules.items() if "source_" in k # example: ['source_exchange_rates', 'source_exchange_rates.source'] + k + for k, v in sys.modules.items() + if "source_" in k or "connector_builder" in k # example: ['source_exchange_rates', 'source_exchange_rates.source'] ] + if not source_modules: + raise RuntimeError("Expected at least one module starting with 'source_' or 'connector_builder'") if not source_modules: raise RuntimeError("Expected at least one module starting with 'source_'") module = source_modules[0].split(".")[0] diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index c9dba84fb1017..9f8059cb8e4fe 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -7,18 +7,23 @@ import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status import pytest import requests -from airbyte_cdk.models import SyncMode +from airbyte_cdk.models import AirbyteLogMessage, Level, SyncMode from airbyte_cdk.sources.declarative.exceptions import ReadException from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever +from airbyte_cdk.sources.declarative.stream_slicers import DatetimeStreamSlicer from airbyte_cdk.sources.streams.http.auth import NoAuth from airbyte_cdk.sources.streams.http.http import HttpStream primary_key = "pk" records = [{"id": 1}, {"id": 2}] +request_response_logs = [ + AirbyteLogMessage(level=Level.INFO, message="request:{}"), + AirbyteLogMessage(level=Level.INFO, message="response{}"), +] config = {} @@ -90,7 +95,7 @@ def test_simple_retriever_full(mock_http_stream): assert retriever._last_response is None assert retriever._last_records is None - assert retriever.parse_response(response, stream_state=None) == records + assert retriever.parse_response(response, stream_state={}) == records assert retriever._last_response == response assert retriever._last_records == records @@ -107,6 +112,67 @@ def test_simple_retriever_full(mock_http_stream): paginator.reset.assert_called() +@patch.object(HttpStream, "_read_pages", return_value=[*request_response_logs, *records]) +def test_simple_retriever_with_request_response_logs(mock_http_stream): + requester = MagicMock() + paginator = MagicMock() + record_selector = MagicMock() + iterator = DatetimeStreamSlicer( + start_datetime="", end_datetime="", step="1d", cursor_field="id", datetime_format="", config={}, options={} + ) + + retriever = SimpleRetriever( + name="stream_name", + primary_key=primary_key, + requester=requester, + paginator=paginator, + record_selector=record_selector, + stream_slicer=iterator, + options={}, + config={}, + ) + + actual_messages = [r for r in retriever.read_records(SyncMode.full_refresh)] + paginator.reset.assert_called() + + assert isinstance(actual_messages[0], AirbyteLogMessage) + assert isinstance(actual_messages[1], AirbyteLogMessage) + assert actual_messages[2] == records[0] + assert actual_messages[3] == records[1] + + +@patch.object(HttpStream, "_read_pages", return_value=[]) +def test_simple_retriever_with_request_response_log_last_records(mock_http_stream): + requester = MagicMock() + paginator = MagicMock() + record_selector = MagicMock() + record_selector.select_records.return_value = request_response_logs + response = requests.Response() + iterator = DatetimeStreamSlicer( + start_datetime="", end_datetime="", step="1d", cursor_field="id", datetime_format="", config={}, options={} + ) + + retriever = SimpleRetriever( + name="stream_name", + primary_key=primary_key, + requester=requester, + paginator=paginator, + record_selector=record_selector, + stream_slicer=iterator, + options={}, + config={}, + ) + + assert retriever._last_response is None + assert retriever._last_records is None + assert retriever.parse_response(response, stream_state={}) == request_response_logs + assert retriever._last_response == response + assert retriever._last_records == request_response_logs + + [r for r in retriever.read_records(SyncMode.full_refresh)] + paginator.reset.assert_called() + + @pytest.mark.parametrize( "test_name, requester_response, expected_should_retry, expected_backoff_time", [ From 87cabf6fc0e722e1a0763a8e971ff3681cd86ec8 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Wed, 9 Nov 2022 21:09:15 -0800 Subject: [PATCH 2/4] replace check for connector_builder module with a basic default file path --- .../schema/json_file_schema_loader.py | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py index 739ee69e72af6..fa4e5a99a0054 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py @@ -17,18 +17,16 @@ def _default_file_path() -> str: # Schema files are always in "source_/schemas/.json # The connector's module name can be inferred by looking at the modules loaded and look for the one starting with source_ - # One exception to this rule are reads invoked from the connector builder server which is prefixed by connector_builder source_modules = [ - k - for k, v in sys.modules.items() - if "source_" in k or "connector_builder" in k # example: ['source_exchange_rates', 'source_exchange_rates.source'] - ] - if not source_modules: - raise RuntimeError("Expected at least one module starting with 'source_' or 'connector_builder'") - if not source_modules: - raise RuntimeError("Expected at least one module starting with 'source_'") - module = source_modules[0].split(".")[0] - return f"./{module}/schemas/{{{{options['name']}}}}.json" + k for k, v in sys.modules.items() if "source_" in k + ] # example: ['source_exchange_rates', 'source_exchange_rates.source'] + if source_modules: + module = source_modules[0].split(".")[0] + return f"./{module}/schemas/{{{{options['name']}}}}.json" + + # If we are not in a source_ module, the most likely scenario is we're processing a manifest from the connector builder + # server which does not require a json schema to be defined. + return "./{{options['name']}}.json" @dataclass From 1596ae87a24111aa49165d041011e46f4b119b20 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Wed, 9 Nov 2022 21:11:48 -0800 Subject: [PATCH 3/4] update changelog and patch version --- airbyte-cdk/python/CHANGELOG.md | 3 +++ airbyte-cdk/python/setup.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index df550bb0d53af..1c5a7ed750de4 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.8.1 +Low-code: Don't update cursor for non-record messages and fix default loader for connector builder manifests + ## 0.8.0 Low-code: Allow for request and response to be emitted as log messages diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 09570bdbe5823..773743d05205c 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.8.0", + version="0.8.1", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", From 3e99d2b8504718e6f230ff2158bd7caecc1d19eb Mon Sep 17 00:00:00 2001 From: brianjlai Date: Wed, 9 Nov 2022 22:52:43 -0800 Subject: [PATCH 4/4] catch the correct exception when pkgutil can't load json file --- .../sources/declarative/schema/default_schema_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/default_schema_loader.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/default_schema_loader.py index 8e5129ebeab94..9344ffeed684b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/default_schema_loader.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/default_schema_loader.py @@ -38,7 +38,7 @@ def get_json_schema(self) -> Mapping[str, Any]: try: return self.default_loader.get_json_schema() - except FileNotFoundError: + except OSError: # A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the # runtime options stores stream name 'name' so we'll do the same here stream_name = self._options.get("name", "")