Skip to content

Commit 4343391

Browse files
fix(connector-builder): fix property chunking in connector builder (#567)
1 parent 9a73079 commit 4343391

File tree

14 files changed

+669
-141
lines changed

14 files changed

+669
-141
lines changed

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def __init__(
128128
component_factory
129129
if component_factory
130130
else ModelToComponentFactory(
131-
emit_connector_builder_messages,
131+
emit_connector_builder_messages=emit_connector_builder_messages,
132132
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
133133
)
134134
)

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,6 @@
510510
AsyncRetriever,
511511
LazySimpleRetriever,
512512
SimpleRetriever,
513-
SimpleRetrieverTestReadDecorator,
514513
)
515514
from airbyte_cdk.sources.declarative.retrievers.file_uploader import (
516515
ConnectorBuilderFileUploader,
@@ -530,7 +529,10 @@
530529
)
531530
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
532531
from airbyte_cdk.sources.declarative.spec import ConfigMigration, Spec
533-
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
532+
from airbyte_cdk.sources.declarative.stream_slicers import (
533+
StreamSlicer,
534+
StreamSlicerTestReadDecorator,
535+
)
534536
from airbyte_cdk.sources.declarative.transformations import (
535537
AddFields,
536538
RecordTransformation,
@@ -3241,6 +3243,14 @@ def _get_url() -> str:
32413243
request_options_provider = DefaultRequestOptionsProvider(parameters={})
32423244

32433245
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
3246+
if self._should_limit_slices_fetched():
3247+
stream_slicer = cast(
3248+
StreamSlicer,
3249+
StreamSlicerTestReadDecorator(
3250+
wrapped_slicer=stream_slicer,
3251+
maximum_number_of_slices=self._limit_slices_fetched or 5,
3252+
),
3253+
)
32443254

32453255
cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
32463256
paginator = (
@@ -3299,22 +3309,6 @@ def _get_url() -> str:
32993309
parameters=model.parameters or {},
33003310
)
33013311

3302-
if self._limit_slices_fetched or self._emit_connector_builder_messages:
3303-
return SimpleRetrieverTestReadDecorator(
3304-
name=name,
3305-
paginator=paginator,
3306-
primary_key=primary_key,
3307-
requester=requester,
3308-
record_selector=record_selector,
3309-
stream_slicer=stream_slicer,
3310-
request_option_provider=request_options_provider,
3311-
cursor=cursor,
3312-
config=config,
3313-
maximum_number_of_slices=self._limit_slices_fetched or 5,
3314-
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
3315-
log_formatter=log_formatter,
3316-
parameters=model.parameters or {},
3317-
)
33183312
return SimpleRetriever(
33193313
name=name,
33203314
paginator=paginator,
@@ -3327,9 +3321,35 @@ def _get_url() -> str:
33273321
config=config,
33283322
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
33293323
additional_query_properties=query_properties,
3324+
log_formatter=self._get_log_formatter(log_formatter, name),
33303325
parameters=model.parameters or {},
33313326
)
33323327

3328+
def _get_log_formatter(
3329+
self, log_formatter: Callable[[Response], Any] | None, name: str
3330+
) -> Callable[[Response], Any] | None:
3331+
if self._should_limit_slices_fetched():
3332+
return (
3333+
(
3334+
lambda response: format_http_message(
3335+
response,
3336+
f"Stream '{name}' request",
3337+
f"Request performed in order to extract records for stream '{name}'",
3338+
name,
3339+
)
3340+
)
3341+
if not log_formatter
3342+
else log_formatter
3343+
)
3344+
return None
3345+
3346+
def _should_limit_slices_fetched(self) -> bool:
3347+
"""
3348+
Returns True if the number of slices fetched should be limited, False otherwise.
3349+
This is used to limit the number of slices fetched during tests.
3350+
"""
3351+
return bool(self._limit_slices_fetched or self._emit_connector_builder_messages)
3352+
33333353
@staticmethod
33343354
def _query_properties_in_request_parameters(
33353355
requester: Union[HttpRequesterModel, CustomRequesterModel],
@@ -3420,7 +3440,7 @@ def create_async_retriever(
34203440
transformations: List[RecordTransformation],
34213441
**kwargs: Any,
34223442
) -> AsyncRetriever:
3423-
def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetriever:
3443+
def _get_download_retriever() -> SimpleRetriever:
34243444
record_selector = RecordSelector(
34253445
extractor=download_extractor,
34263446
name=name,
@@ -3440,19 +3460,6 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
34403460
if model.download_paginator
34413461
else NoPagination(parameters={})
34423462
)
3443-
maximum_number_of_slices = self._limit_slices_fetched or 5
3444-
3445-
if self._limit_slices_fetched or self._emit_connector_builder_messages:
3446-
return SimpleRetrieverTestReadDecorator(
3447-
requester=download_requester,
3448-
record_selector=record_selector,
3449-
primary_key=None,
3450-
name=job_download_components_name,
3451-
paginator=paginator,
3452-
config=config,
3453-
parameters={},
3454-
maximum_number_of_slices=maximum_number_of_slices,
3455-
)
34563463

34573464
return SimpleRetriever(
34583465
requester=download_requester,
@@ -3498,7 +3505,17 @@ def _get_job_timeout() -> datetime.timedelta:
34983505
transformations=transformations,
34993506
client_side_incremental_sync=client_side_incremental_sync,
35003507
)
3508+
35013509
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
3510+
if self._should_limit_slices_fetched():
3511+
stream_slicer = cast(
3512+
StreamSlicer,
3513+
StreamSlicerTestReadDecorator(
3514+
wrapped_slicer=stream_slicer,
3515+
maximum_number_of_slices=self._limit_slices_fetched or 5,
3516+
),
3517+
)
3518+
35023519
creation_requester = self._create_component_from_model(
35033520
model=model.creation_requester,
35043521
decoder=decoder,

airbyte_cdk/sources/declarative/retrievers/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,11 @@
77
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
88
LazySimpleRetriever,
99
SimpleRetriever,
10-
SimpleRetrieverTestReadDecorator,
1110
)
1211

1312
__all__ = [
1413
"Retriever",
1514
"SimpleRetriever",
16-
"SimpleRetrieverTestReadDecorator",
1715
"AsyncRetriever",
1816
"LazySimpleRetriever",
1917
]

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -645,40 +645,6 @@ def _deep_merge(
645645
target[key] = value
646646

647647

648-
@dataclass
649-
class SimpleRetrieverTestReadDecorator(SimpleRetriever):
650-
"""
651-
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
652-
slices that are queried throughout a read command.
653-
"""
654-
655-
maximum_number_of_slices: int = 5
656-
657-
def __post_init__(self, options: Mapping[str, Any]) -> None:
658-
super().__post_init__(options)
659-
self.log_formatter = (
660-
(
661-
lambda response: format_http_message(
662-
response,
663-
f"Stream '{self.name}' request",
664-
f"Request performed in order to extract records for stream '{self.name}'",
665-
self.name,
666-
)
667-
)
668-
if not self.log_formatter
669-
else self.log_formatter
670-
)
671-
672-
if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
673-
raise ValueError(
674-
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
675-
)
676-
677-
# stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
678-
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
679-
return islice(super().stream_slices(), self.maximum_number_of_slices)
680-
681-
682648
@deprecated(
683649
"This class is experimental. Use at your own risk.",
684650
category=ExperimentalClassWarning,

airbyte_cdk/sources/declarative/stream_slicers/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,8 @@
33
#
44

55
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
6+
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer_test_read_decorator import (
7+
StreamSlicerTestReadDecorator,
8+
)
69

7-
__all__ = ["StreamSlicer"]
10+
__all__ = ["StreamSlicer", "StreamSlicerTestReadDecorator"]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from itertools import islice
7+
from typing import Any, Iterable, Mapping, Optional, Union
8+
9+
from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
10+
from airbyte_cdk.sources.types import StreamSlice, StreamState
11+
12+
13+
@dataclass
14+
class StreamSlicerTestReadDecorator(StreamSlicer):
15+
"""
16+
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
17+
slices that are queried throughout a read command.
18+
"""
19+
20+
wrapped_slicer: StreamSlicer
21+
maximum_number_of_slices: int = 5
22+
23+
def stream_slices(self) -> Iterable[StreamSlice]:
24+
return islice(self.wrapped_slicer.stream_slices(), self.maximum_number_of_slices)
25+
26+
def __getattr__(self, name: str) -> Any:
27+
# Delegate everything else to the wrapped object
28+
return getattr(self.wrapped_slicer, name)

airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,30 @@
11
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
22

3-
from abc import ABC, abstractmethod
4-
from typing import Iterable
3+
from abc import ABC, ABCMeta, abstractmethod
4+
from typing import Any, Iterable
55

66
from airbyte_cdk.sources.types import StreamSlice
77

88

9-
class StreamSlicer(ABC):
9+
class StreamSlicerMeta(ABCMeta):
10+
"""
11+
Metaclass for wrapper scenario that allows it to be used as a type check for StreamSlicer.
12+
This is necessary because StreamSlicerTestReadDecorator wraps a StreamSlicer and we want to be able to check
13+
if an instance is a StreamSlicer, even if it is wrapped in a StreamSlicerTestReadDecorator.
14+
15+
For example in ConcurrentDeclarativeSource, we do things like:
16+
isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor))
17+
"""
18+
19+
def __instancecheck__(cls, instance: Any) -> bool:
20+
# Check if it's our wrapper with matching wrapped class
21+
if hasattr(instance, "wrapped_slicer"):
22+
return isinstance(instance.wrapped_slicer, cls)
23+
24+
return super().__instancecheck__(instance)
25+
26+
27+
class StreamSlicer(ABC, metaclass=StreamSlicerMeta):
1028
"""
1129
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
1230
"""

unit_tests/connector_builder/test_connector_builder_handler.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@
5858
from airbyte_cdk.models import Type as MessageType
5959
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
6060
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
61-
from airbyte_cdk.sources.declarative.retrievers import SimpleRetrieverTestReadDecorator
6261
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
62+
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator
6363
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
6464
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets, update_secrets
6565
from unit_tests.connector_builder.utils import create_configured_catalog
@@ -1112,7 +1112,8 @@ def test_read_source(mock_http_stream):
11121112

11131113
streams = source.streams(config)
11141114
for s in streams:
1115-
assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator)
1115+
assert isinstance(s.retriever, SimpleRetriever)
1116+
assert isinstance(s.retriever.stream_slicer, StreamSlicerTestReadDecorator)
11161117

11171118

11181119
@patch.object(
@@ -1158,7 +1159,8 @@ def test_read_source_single_page_single_slice(mock_http_stream):
11581159

11591160
streams = source.streams(config)
11601161
for s in streams:
1161-
assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator)
1162+
assert isinstance(s.retriever, SimpleRetriever)
1163+
assert isinstance(s.retriever.stream_slicer, StreamSlicerTestReadDecorator)
11621164

11631165

11641166
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)