diff --git a/airbyte-cdk/python/airbyte_cdk/connector_builder/README.md b/airbyte-cdk/python/airbyte_cdk/connector_builder/README.md index 3f3402fab5360..6788a6f226b16 100644 --- a/airbyte-cdk/python/airbyte_cdk/connector_builder/README.md +++ b/airbyte-cdk/python/airbyte_cdk/connector_builder/README.md @@ -22,6 +22,7 @@ Note: *See [ConnectionSpecification](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#actor-specification) for details on the `"config"` key if needed. - When the `__command` is `resolve_manifest`, the argument to `catalog` should be an empty string. +- The config can optionally contain an object under the `__test_read_config` key which can define custom test read limits with `max_records`, `max_slices`, and `max_pages_per_slice` properties. All custom limits are optional; a default will be used for any limit that is not provided. ### Locally running the docker image diff --git a/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py index c53bda0dfc623..4dfe4a3dd05d6 100644 --- a/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -57,7 +57,7 @@ def read_stream( source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, limits: TestReadLimits ) -> AirbyteMessage: try: - handler = MessageGrouper(limits.max_pages_per_slice, limits.max_slices) + handler = MessageGrouper(limits.max_pages_per_slice, limits.max_slices, limits.max_records) stream_name = configured_catalog.streams[0].stream.name # The connector builder only supports a single stream stream_read = handler.get_message_groups(source, config, configured_catalog, limits.max_records) return AirbyteMessage( diff --git a/airbyte-cdk/python/airbyte_cdk/connector_builder/message_grouper.py b/airbyte-cdk/python/airbyte_cdk/connector_builder/message_grouper.py index 42a0e1051b526..b30a3a3744f48 100644 --- a/airbyte-cdk/python/airbyte_cdk/connector_builder/message_grouper.py +++ b/airbyte-cdk/python/airbyte_cdk/connector_builder/message_grouper.py @@ -52,8 +52,8 @@ def get_message_groups( configured_catalog: ConfiguredAirbyteCatalog, record_limit: Optional[int] = None, ) -> StreamRead: - if record_limit is not None and not (1 <= record_limit <= 1000): - raise ValueError(f"Record limit must be between 1 and 1000. Got {record_limit}") + if record_limit is not None and not (1 <= record_limit <= self._max_record_limit): + raise ValueError(f"Record limit must be between 1 and {self._max_record_limit}. Got {record_limit}") schema_inferrer = SchemaInferrer() datetime_format_inferrer = DatetimeFormatInferrer() diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_message_grouper.py b/airbyte-cdk/python/unit_tests/connector_builder/test_message_grouper.py index ae98f6ad70ab5..437a775dd8dee 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_message_grouper.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_message_grouper.py @@ -218,14 +218,14 @@ def test_get_grouped_messages_with_logs(mock_entrypoint_read: Mock) -> None: @pytest.mark.parametrize( - "request_record_limit, max_record_limit", + "request_record_limit, max_record_limit, should_fail", [ - pytest.param(1, 3, id="test_create_request_with_record_limit"), - pytest.param(3, 1, id="test_create_request_record_limit_exceeds_max"), + pytest.param(1, 3, False, id="test_create_request_with_record_limit"), + pytest.param(3, 1, True, id="test_create_request_record_limit_exceeds_max"), ], ) @patch("airbyte_cdk.connector_builder.message_grouper.AirbyteEntrypoint.read") -def test_get_grouped_messages_record_limit(mock_entrypoint_read: Mock, request_record_limit: int, max_record_limit: int) -> None: +def test_get_grouped_messages_record_limit(mock_entrypoint_read: Mock, request_record_limit: int, max_record_limit: int, should_fail: bool) -> None: url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -249,16 +249,23 @@ def test_get_grouped_messages_record_limit(mock_entrypoint_read: Mock, request_r record_limit = min(request_record_limit, max_record_limit) api = MessageGrouper(MAX_PAGES_PER_SLICE, MAX_SLICES, max_record_limit=max_record_limit) - actual_response: StreamRead = api.get_message_groups( - mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras"), record_limit=request_record_limit - ) - single_slice = actual_response.slices[0] - total_records = 0 - for i, actual_page in enumerate(single_slice.pages): - total_records += len(actual_page.records) - assert total_records == min([record_limit, n_records]) - - assert (total_records >= max_record_limit) == actual_response.test_read_limit_reached + # this is the call we expect to raise an exception + if should_fail: + with pytest.raises(ValueError): + api.get_message_groups( + mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras"), record_limit=request_record_limit + ) + else: + actual_response: StreamRead = api.get_message_groups( + mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras"), record_limit=request_record_limit + ) + single_slice = actual_response.slices[0] + total_records = 0 + for i, actual_page in enumerate(single_slice.pages): + total_records += len(actual_page.records) + assert total_records == min([record_limit, n_records]) + + assert (total_records >= max_record_limit) == actual_response.test_read_limit_reached @pytest.mark.parametrize(