Skip to content

Commit 385a70d

Browse files
authored
Support user-specified test read limits in connector_builder code (#35312)
1 parent 6046581 commit 385a70d

File tree

4 files changed

+25
-17
lines changed

4 files changed

+25
-17
lines changed

airbyte-cdk/python/airbyte_cdk/connector_builder/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ Note:
2222
*See [ConnectionSpecification](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#actor-specification) for details on the `"config"` key if needed.
2323

2424
- When the `__command` is `resolve_manifest`, the argument to `catalog` should be an empty string.
25+
- The config can optionally contain an object under the `__test_read_config` key which can define custom test read limits with `max_records`, `max_slices`, and `max_pages_per_slice` properties. All custom limits are optional; a default will be used for any limit that is not provided.
2526

2627
### Locally running the docker image
2728

airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def read_stream(
5757
source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, limits: TestReadLimits
5858
) -> AirbyteMessage:
5959
try:
60-
handler = MessageGrouper(limits.max_pages_per_slice, limits.max_slices)
60+
handler = MessageGrouper(limits.max_pages_per_slice, limits.max_slices, limits.max_records)
6161
stream_name = configured_catalog.streams[0].stream.name # The connector builder only supports a single stream
6262
stream_read = handler.get_message_groups(source, config, configured_catalog, limits.max_records)
6363
return AirbyteMessage(

airbyte-cdk/python/airbyte_cdk/connector_builder/message_grouper.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ def get_message_groups(
5252
configured_catalog: ConfiguredAirbyteCatalog,
5353
record_limit: Optional[int] = None,
5454
) -> StreamRead:
55-
if record_limit is not None and not (1 <= record_limit <= 1000):
56-
raise ValueError(f"Record limit must be between 1 and 1000. Got {record_limit}")
55+
if record_limit is not None and not (1 <= record_limit <= self._max_record_limit):
56+
raise ValueError(f"Record limit must be between 1 and {self._max_record_limit}. Got {record_limit}")
5757
schema_inferrer = SchemaInferrer()
5858
datetime_format_inferrer = DatetimeFormatInferrer()
5959

airbyte-cdk/python/unit_tests/connector_builder/test_message_grouper.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -218,14 +218,14 @@ def test_get_grouped_messages_with_logs(mock_entrypoint_read: Mock) -> None:
218218

219219

220220
@pytest.mark.parametrize(
221-
"request_record_limit, max_record_limit",
221+
"request_record_limit, max_record_limit, should_fail",
222222
[
223-
pytest.param(1, 3, id="test_create_request_with_record_limit"),
224-
pytest.param(3, 1, id="test_create_request_record_limit_exceeds_max"),
223+
pytest.param(1, 3, False, id="test_create_request_with_record_limit"),
224+
pytest.param(3, 1, True, id="test_create_request_record_limit_exceeds_max"),
225225
],
226226
)
227227
@patch("airbyte_cdk.connector_builder.message_grouper.AirbyteEntrypoint.read")
228-
def test_get_grouped_messages_record_limit(mock_entrypoint_read: Mock, request_record_limit: int, max_record_limit: int) -> None:
228+
def test_get_grouped_messages_record_limit(mock_entrypoint_read: Mock, request_record_limit: int, max_record_limit: int, should_fail: bool) -> None:
229229
url = "https://demonslayers.com/api/v1/hashiras?era=taisho"
230230
request = {
231231
"headers": {"Content-Type": "application/json"},
@@ -249,16 +249,23 @@ def test_get_grouped_messages_record_limit(mock_entrypoint_read: Mock, request_r
249249
record_limit = min(request_record_limit, max_record_limit)
250250

251251
api = MessageGrouper(MAX_PAGES_PER_SLICE, MAX_SLICES, max_record_limit=max_record_limit)
252-
actual_response: StreamRead = api.get_message_groups(
253-
mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras"), record_limit=request_record_limit
254-
)
255-
single_slice = actual_response.slices[0]
256-
total_records = 0
257-
for i, actual_page in enumerate(single_slice.pages):
258-
total_records += len(actual_page.records)
259-
assert total_records == min([record_limit, n_records])
260-
261-
assert (total_records >= max_record_limit) == actual_response.test_read_limit_reached
252+
# this is the call we expect to raise an exception
253+
if should_fail:
254+
with pytest.raises(ValueError):
255+
api.get_message_groups(
256+
mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras"), record_limit=request_record_limit
257+
)
258+
else:
259+
actual_response: StreamRead = api.get_message_groups(
260+
mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras"), record_limit=request_record_limit
261+
)
262+
single_slice = actual_response.slices[0]
263+
total_records = 0
264+
for i, actual_page in enumerate(single_slice.pages):
265+
total_records += len(actual_page.records)
266+
assert total_records == min([record_limit, n_records])
267+
268+
assert (total_records >= max_record_limit) == actual_response.test_read_limit_reached
262269

263270

264271
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)