Skip to content

Commit cf63ee5

Browse files
authored
[ISSUE #20771] adding slices to connector builder read request (#21605)
* [ISSUE #20771] adding slices to connector builder read request * [ISSUE #20771] formatting * [ISSUE #20771] set flag when limit requests reached (#21619) * [ISSUE #20771] set flag when limit requests reached * [ISSUE #20771] assert proper value on test read objects __init__ * [ISSUE #20771] code review and fix edge case
1 parent 6631698 commit cf63ee5

File tree

16 files changed

+417
-85
lines changed

16 files changed

+417
-85
lines changed

airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,20 @@
22
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33
#
44

5+
import json
56
import logging
67
from abc import ABC, abstractmethod
78
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
89

910
from airbyte_cdk.models import (
1011
AirbyteCatalog,
1112
AirbyteConnectionStatus,
13+
AirbyteLogMessage,
1214
AirbyteMessage,
1315
AirbyteStateMessage,
1416
ConfiguredAirbyteCatalog,
1517
ConfiguredAirbyteStream,
18+
Level,
1619
Status,
1720
SyncMode,
1821
)
@@ -232,7 +235,8 @@ def _read_incremental(
232235
has_slices = False
233236
for _slice in slices:
234237
has_slices = True
235-
logger.debug("Processing stream slice", extra={"slice": _slice})
238+
if logger.isEnabledFor(logging.DEBUG):
239+
yield AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"slice:{json.dumps(_slice)}"))
236240
records = stream_instance.read_records(
237241
sync_mode=SyncMode.incremental,
238242
stream_slice=_slice,
@@ -281,7 +285,8 @@ def _read_full_refresh(
281285
)
282286
total_records_counter = 0
283287
for _slice in slices:
284-
logger.debug("Processing stream slice", extra={"slice": _slice})
288+
if logger.isEnabledFor(logging.DEBUG):
289+
yield AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"slice:{json.dumps(_slice)}"))
285290
record_data_or_messages = stream_instance.read_records(
286291
stream_slice=_slice,
287292
sync_mode=SyncMode.full_refresh,

airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,10 @@
116116

117117

118118
class ModelToComponentFactory:
119-
def __init__(self, is_test_read=False):
119+
def __init__(self, limit_pages_fetched_per_slice: int = None, limit_slices_fetched: int = None):
120120
self._init_mappings()
121-
self._is_test_read = is_test_read
121+
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
122+
self._limit_slices_fetched = limit_slices_fetched
122123

123124
def _init_mappings(self):
124125
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: [Type[BaseModel], Callable] = {
@@ -482,8 +483,8 @@ def create_default_paginator(self, model: DefaultPaginatorModel, config: Config,
482483
config=config,
483484
options=model.options,
484485
)
485-
if self._is_test_read:
486-
return PaginatorTestReadDecorator(paginator)
486+
if self._limit_pages_fetched_per_slice:
487+
return PaginatorTestReadDecorator(paginator, self._limit_pages_fetched_per_slice)
487488
return paginator
488489

489490
def create_dpath_extractor(self, model: DpathExtractorModel, config: Config, **kwargs) -> DpathExtractor:
@@ -681,7 +682,7 @@ def create_simple_retriever(self, model: SimpleRetrieverModel, config: Config, *
681682
self._create_component_from_model(model=model.stream_slicer, config=config) if model.stream_slicer else SingleSlice(options={})
682683
)
683684

684-
if self._is_test_read:
685+
if self._limit_slices_fetched:
685686
return SimpleRetrieverTestReadDecorator(
686687
name=model.name,
687688
paginator=paginator,
@@ -690,6 +691,7 @@ def create_simple_retriever(self, model: SimpleRetrieverModel, config: Config, *
690691
record_selector=record_selector,
691692
stream_slicer=stream_slicer,
692693
config=config,
694+
maximum_number_of_slices=self._limit_slices_fetched,
693695
options=model.options,
694696
)
695697
return SimpleRetriever(

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,11 @@ class PaginatorTestReadDecorator(Paginator):
172172
_DEFAULT_PAGINATION_LIMIT = 5
173173

174174
def __init__(self, decorated, maximum_number_of_pages: int = None):
175+
if maximum_number_of_pages and maximum_number_of_pages < 1:
176+
raise ValueError(f"The maximum number of pages on a test read needs to be strictly positive. Got {maximum_number_of_pages}")
177+
self._maximum_number_of_pages = maximum_number_of_pages if maximum_number_of_pages else self._DEFAULT_PAGINATION_LIMIT
175178
self._decorated = decorated
176179
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
177-
self._maximum_number_of_pages = maximum_number_of_pages if maximum_number_of_pages else self._DEFAULT_PAGINATION_LIMIT
178180

179181
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
180182
if self._page_count >= self._maximum_number_of_pages:

airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -417,18 +417,26 @@ def _parse_records_and_emit_request_and_responses(self, request, response, strea
417417
yield from self.parse_response(response, stream_slice=stream_slice, stream_state=stream_state)
418418

419419

420+
@dataclass
420421
class SimpleRetrieverTestReadDecorator(SimpleRetriever):
421422
"""
422423
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
423424
slices that are queried throughout a read command.
424425
"""
425426

426-
_MAXIMUM_NUMBER_OF_SLICES = 5
427+
maximum_number_of_slices: int = 5
428+
429+
def __post_init__(self, options: Mapping[str, Any]):
430+
super().__post_init__(options)
431+
if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
432+
raise ValueError(
433+
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
434+
)
427435

428436
def stream_slices(
429437
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Optional[StreamState] = None
430438
) -> Iterable[Optional[Mapping[str, Any]]]:
431-
return islice(super().stream_slices(sync_mode=sync_mode, stream_state=stream_state), self._MAXIMUM_NUMBER_OF_SLICES)
439+
return islice(super().stream_slices(sync_mode=sync_mode, stream_state=stream_state), self.maximum_number_of_slices)
432440

433441

434442
def _prepared_request_to_airbyte_message(request: requests.PreparedRequest) -> AirbyteMessage:

airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -633,19 +633,25 @@ def test_response_to_airbyte_message(test_name, response_body, response_headers,
633633

634634

635635
def test_limit_stream_slices():
636+
maximum_number_of_slices = 4
636637
stream_slicer = MagicMock()
637-
stream_slicer.stream_slices.return_value = [{"date": f"2022-01-0{day}"} for day in range(1, 10)]
638+
stream_slicer.stream_slices.return_value = _generate_slices(maximum_number_of_slices * 2)
638639
retriever = SimpleRetrieverTestReadDecorator(
639640
name="stream_name",
640641
primary_key=primary_key,
641642
requester=MagicMock(),
642643
paginator=MagicMock(),
643644
record_selector=MagicMock(),
644645
stream_slicer=stream_slicer,
646+
maximum_number_of_slices=maximum_number_of_slices,
645647
options={},
646648
config={},
647649
)
648650

649-
truncated_slices = retriever.stream_slices(sync_mode=SyncMode.incremental, stream_state=None)
651+
truncated_slices = list(retriever.stream_slices(sync_mode=SyncMode.incremental, stream_state=None))
650652

651-
assert truncated_slices == [{"date": f"2022-01-0{day}"} for day in range(1, 6)]
653+
assert truncated_slices == _generate_slices(maximum_number_of_slices)
654+
655+
656+
def _generate_slices(number_of_slices):
657+
return [{"date": f"2022-01-0{day + 1}"} for day in range(number_of_slices)]

airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
1313
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
1414
from jsonschema.exceptions import ValidationError
15+
from unittest.mock import patch
1516

1617
logger = logging.getLogger("airbyte")
1718

@@ -542,6 +543,95 @@ def test_manifest_without_at_least_one_stream(self, construct_using_pydantic_mod
542543
ManifestDeclarativeSource(source_config=manifest, construct_using_pydantic_models=construct_using_pydantic_models)
543544

544545

546+
@patch("airbyte_cdk.sources.declarative.declarative_source.DeclarativeSource.read")
547+
def test_given_debug_when_read_then_set_log_level(self, declarative_source_read):
548+
any_valid_manifest = {
549+
"version": "version",
550+
"definitions": {
551+
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
552+
"retriever": {
553+
"paginator": {
554+
"type": "DefaultPaginator",
555+
"page_size": 10,
556+
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
557+
"page_token_option": {"inject_into": "path"},
558+
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
559+
},
560+
"requester": {
561+
"path": "/v3/marketing/lists",
562+
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
563+
"request_parameters": {"page_size": 10},
564+
},
565+
"record_selector": {"extractor": {"field_pointer": ["result"]}},
566+
},
567+
},
568+
"streams": [
569+
{
570+
"type": "DeclarativeStream",
571+
"$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"},
572+
"schema_loader": {
573+
"name": "{{ options.stream_name }}",
574+
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
575+
},
576+
"retriever": {
577+
"paginator": {
578+
"type": "DefaultPaginator",
579+
"page_size": 10,
580+
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
581+
"page_token_option": {"inject_into": "path"},
582+
"pagination_strategy": {
583+
"type": "CursorPagination",
584+
"cursor_value": "{{ response._metadata.next }}",
585+
"page_size": 10,
586+
},
587+
},
588+
"requester": {
589+
"path": "/v3/marketing/lists",
590+
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
591+
"request_parameters": {"page_size": 10},
592+
},
593+
"record_selector": {"extractor": {"field_pointer": ["result"]}},
594+
},
595+
},
596+
{
597+
"type": "DeclarativeStream",
598+
"$options": {"name": "stream_with_custom_requester", "primary_key": "id", "url_base": "https://api.sendgrid.com"},
599+
"schema_loader": {
600+
"name": "{{ options.stream_name }}",
601+
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
602+
},
603+
"retriever": {
604+
"paginator": {
605+
"type": "DefaultPaginator",
606+
"page_size": 10,
607+
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
608+
"page_token_option": {"inject_into": "path"},
609+
"pagination_strategy": {
610+
"type": "CursorPagination",
611+
"cursor_value": "{{ response._metadata.next }}",
612+
"page_size": 10,
613+
},
614+
},
615+
"requester": {
616+
"type": "CustomRequester",
617+
"class_name": "unit_tests.sources.declarative.external_component.SampleCustomComponent",
618+
"path": "/v3/marketing/lists",
619+
"custom_request_parameters": {"page_size": 10},
620+
},
621+
"record_selector": {"extractor": {"field_pointer": ["result"]}},
622+
},
623+
},
624+
],
625+
"check": {"type": "CheckStream", "stream_names": ["lists"]},
626+
}
627+
source = ManifestDeclarativeSource(source_config=any_valid_manifest, debug=True, construct_using_pydantic_models=True)
628+
629+
debug_logger = logging.getLogger("logger.debug")
630+
list(source.read(debug_logger, {}, {}, {}))
631+
632+
assert debug_logger.isEnabledFor(logging.DEBUG)
633+
634+
545635
def test_generate_schema():
546636
schema_str = ManifestDeclarativeSource.generate_schema()
547637
schema = json.loads(schema_str)

airbyte-cdk/python/unit_tests/sources/test_abstract_source.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,57 @@ def test_valid_full_refresh_read_with_slices(mocker):
331331
assert expected == messages
332332

333333

334+
def test_read_full_refresh_with_slices_sends_slice_messages(mocker):
335+
"""Given the logger is debug and a full refresh, AirbyteMessages are sent for slices"""
336+
debug_logger = logging.getLogger("airbyte.debug")
337+
debug_logger.setLevel(logging.DEBUG)
338+
slices = [{"1": "1"}, {"2": "2"}]
339+
stream = MockStream(
340+
[({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
341+
name="s1",
342+
)
343+
344+
mocker.patch.object(MockStream, "get_json_schema", return_value={})
345+
mocker.patch.object(MockStream, "stream_slices", return_value=slices)
346+
347+
src = MockSource(streams=[stream])
348+
catalog = ConfiguredAirbyteCatalog(
349+
streams=[
350+
_configured_stream(stream, SyncMode.full_refresh),
351+
]
352+
)
353+
354+
messages = src.read(debug_logger, {}, catalog)
355+
356+
assert 2 == len(list(filter(lambda message: message.log and message.log.message.startswith("slice:"), messages)))
357+
358+
359+
def test_read_incremental_with_slices_sends_slice_messages(mocker):
360+
"""Given the logger is debug and a incremental, AirbyteMessages are sent for slices"""
361+
debug_logger = logging.getLogger("airbyte.debug")
362+
debug_logger.setLevel(logging.DEBUG)
363+
slices = [{"1": "1"}, {"2": "2"}]
364+
stream = MockStream(
365+
[({"sync_mode": SyncMode.incremental, "stream_slice": s, 'stream_state': {}}, [s]) for s in slices],
366+
name="s1",
367+
)
368+
369+
MockStream.supports_incremental = mocker.PropertyMock(return_value=True)
370+
mocker.patch.object(MockStream, "get_json_schema", return_value={})
371+
mocker.patch.object(MockStream, "stream_slices", return_value=slices)
372+
373+
src = MockSource(streams=[stream])
374+
catalog = ConfiguredAirbyteCatalog(
375+
streams=[
376+
_configured_stream(stream, SyncMode.incremental),
377+
]
378+
)
379+
380+
messages = src.read(debug_logger, {}, catalog)
381+
382+
assert 2 == len(list(filter(lambda message: message.log and message.log.message.startswith("slice:"), messages)))
383+
384+
334385
class TestIncrementalRead:
335386
@pytest.mark.parametrize(
336387
"use_legacy",

airbyte-cdk/python/unit_tests/sources/test_source.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ def test_internal_config_limit(abstract_source, catalog):
390390
logger_mock.level = logging.DEBUG
391391
del catalog.streams[1]
392392
STREAM_LIMIT = 2
393+
SLICE_DEBUG_LOG_COUNT = 1
393394
FULL_RECORDS_NUMBER = 3
394395
streams = abstract_source.streams(None)
395396
http_stream = streams[0]
@@ -398,7 +399,7 @@ def test_internal_config_limit(abstract_source, catalog):
398399

399400
catalog.streams[0].sync_mode = SyncMode.full_refresh
400401
records = [r for r in abstract_source.read(logger=logger_mock, config=internal_config, catalog=catalog, state={})]
401-
assert len(records) == STREAM_LIMIT
402+
assert len(records) == STREAM_LIMIT + SLICE_DEBUG_LOG_COUNT
402403
logger_info_args = [call[0][0] for call in logger_mock.info.call_args_list]
403404
# Check if log line matches number of limit
404405
read_log_record = [_l for _l in logger_info_args if _l.startswith("Read")]
@@ -407,13 +408,13 @@ def test_internal_config_limit(abstract_source, catalog):
407408
# No limit, check if state record produced for incremental stream
408409
catalog.streams[0].sync_mode = SyncMode.incremental
409410
records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})]
410-
assert len(records) == FULL_RECORDS_NUMBER + 1
411+
assert len(records) == FULL_RECORDS_NUMBER + SLICE_DEBUG_LOG_COUNT + 1
411412
assert records[-1].type == Type.STATE
412413

413414
# Set limit and check if state is produced when limit is set for incremental stream
414415
logger_mock.reset_mock()
415416
records = [r for r in abstract_source.read(logger=logger_mock, config=internal_config, catalog=catalog, state={})]
416-
assert len(records) == STREAM_LIMIT + 1
417+
assert len(records) == STREAM_LIMIT + SLICE_DEBUG_LOG_COUNT + 1
417418
assert records[-1].type == Type.STATE
418419
logger_info_args = [call[0][0] for call in logger_mock.info.call_args_list]
419420
read_log_record = [_l for _l in logger_info_args if _l.startswith("Read")]
@@ -425,40 +426,43 @@ def test_internal_config_limit(abstract_source, catalog):
425426

426427
def test_source_config_no_transform(abstract_source, catalog):
427428
logger_mock = MagicMock()
429+
SLICE_DEBUG_LOG_COUNT = 1
428430
logger_mock.level = logging.DEBUG
429431
streams = abstract_source.streams(None)
430432
http_stream, non_http_stream = streams
431433
http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA
432434
http_stream.read_records.return_value, non_http_stream.read_records.return_value = [[{"value": 23}] * 5] * 2
433435
records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})]
434-
assert len(records) == 2 * 5
435-
assert [r.record.data for r in records] == [{"value": 23}] * 2 * 5
436+
assert len(records) == 2 * (5 + SLICE_DEBUG_LOG_COUNT)
437+
assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": 23}] * 2 * 5
436438
assert http_stream.get_json_schema.call_count == 5
437439
assert non_http_stream.get_json_schema.call_count == 5
438440

439441

440442
def test_source_config_transform(abstract_source, catalog):
441443
logger_mock = MagicMock()
442444
logger_mock.level = logging.DEBUG
445+
SLICE_DEBUG_LOG_COUNT = 2
443446
streams = abstract_source.streams(None)
444447
http_stream, non_http_stream = streams
445448
http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
446449
non_http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
447450
http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA
448451
http_stream.read_records.return_value, non_http_stream.read_records.return_value = [{"value": 23}], [{"value": 23}]
449452
records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})]
450-
assert len(records) == 2
451-
assert [r.record.data for r in records] == [{"value": "23"}] * 2
453+
assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT
454+
assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": "23"}] * 2
452455

453456

454457
def test_source_config_transform_and_no_transform(abstract_source, catalog):
455458
logger_mock = MagicMock()
456459
logger_mock.level = logging.DEBUG
460+
SLICE_DEBUG_LOG_COUNT = 2
457461
streams = abstract_source.streams(None)
458462
http_stream, non_http_stream = streams
459463
http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
460464
http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA
461465
http_stream.read_records.return_value, non_http_stream.read_records.return_value = [{"value": 23}], [{"value": 23}]
462466
records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})]
463-
assert len(records) == 2
464-
assert [r.record.data for r in records] == [{"value": "23"}, {"value": 23}]
467+
assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT
468+
assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": "23"}, {"value": 23}]

0 commit comments

Comments
 (0)