Skip to content

fix(connector-builder): fix property chunking in connector builder #567

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def __init__(
component_factory
if component_factory
else ModelToComponentFactory(
emit_connector_builder_messages,
emit_connector_builder_messages=emit_connector_builder_messages,
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,6 @@
AsyncRetriever,
LazySimpleRetriever,
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
from airbyte_cdk.sources.declarative.retrievers.file_uploader import (
ConnectorBuilderFileUploader,
Expand All @@ -530,7 +529,10 @@
)
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
from airbyte_cdk.sources.declarative.spec import ConfigMigration, Spec
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers import (
StreamSlicer,
StreamSlicerTestReadDecorator,
)
from airbyte_cdk.sources.declarative.transformations import (
AddFields,
RecordTransformation,
Expand Down Expand Up @@ -3241,6 +3243,14 @@ def _get_url() -> str:
request_options_provider = DefaultRequestOptionsProvider(parameters={})

stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
if self._should_limit_slices_fetched():
stream_slicer = cast(
StreamSlicer,
StreamSlicerTestReadDecorator(
wrapped_slicer=stream_slicer,
maximum_number_of_slices=self._limit_slices_fetched or 5,
),
)

cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
paginator = (
Expand Down Expand Up @@ -3299,22 +3309,6 @@ def _get_url() -> str:
parameters=model.parameters or {},
)

if self._limit_slices_fetched or self._emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
name=name,
paginator=paginator,
primary_key=primary_key,
requester=requester,
record_selector=record_selector,
stream_slicer=stream_slicer,
request_option_provider=request_options_provider,
cursor=cursor,
config=config,
maximum_number_of_slices=self._limit_slices_fetched or 5,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
log_formatter=log_formatter,
parameters=model.parameters or {},
)
return SimpleRetriever(
name=name,
paginator=paginator,
Expand All @@ -3327,9 +3321,35 @@ def _get_url() -> str:
config=config,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
additional_query_properties=query_properties,
log_formatter=self._get_log_formatter(log_formatter, name),
parameters=model.parameters or {},
)

def _get_log_formatter(
self, log_formatter: Callable[[Response], Any] | None, name: str
) -> Callable[[Response], Any] | None:
if self._should_limit_slices_fetched():
return (
(
lambda response: format_http_message(
response,
f"Stream '{name}' request",
f"Request performed in order to extract records for stream '{name}'",
name,
)
)
if not log_formatter
else log_formatter
)
return None

def _should_limit_slices_fetched(self) -> bool:
"""
Returns True if the number of slices fetched should be limited, False otherwise.
This is used to limit the number of slices fetched during tests.
"""
return bool(self._limit_slices_fetched or self._emit_connector_builder_messages)

@staticmethod
def _query_properties_in_request_parameters(
requester: Union[HttpRequesterModel, CustomRequesterModel],
Expand Down Expand Up @@ -3420,7 +3440,7 @@ def create_async_retriever(
transformations: List[RecordTransformation],
**kwargs: Any,
) -> AsyncRetriever:
def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetriever:
def _get_download_retriever() -> SimpleRetriever:
record_selector = RecordSelector(
extractor=download_extractor,
name=name,
Expand All @@ -3440,19 +3460,6 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
if model.download_paginator
else NoPagination(parameters={})
)
maximum_number_of_slices = self._limit_slices_fetched or 5

if self._limit_slices_fetched or self._emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
requester=download_requester,
record_selector=record_selector,
primary_key=None,
name=job_download_components_name,
paginator=paginator,
config=config,
parameters={},
maximum_number_of_slices=maximum_number_of_slices,
)

return SimpleRetriever(
requester=download_requester,
Expand Down Expand Up @@ -3498,7 +3505,17 @@ def _get_job_timeout() -> datetime.timedelta:
transformations=transformations,
client_side_incremental_sync=client_side_incremental_sync,
)

stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
if self._should_limit_slices_fetched():
stream_slicer = cast(
StreamSlicer,
StreamSlicerTestReadDecorator(
wrapped_slicer=stream_slicer,
maximum_number_of_slices=self._limit_slices_fetched or 5,
),
)

creation_requester = self._create_component_from_model(
model=model.creation_requester,
decoder=decoder,
Expand Down
2 changes: 0 additions & 2 deletions airbyte_cdk/sources/declarative/retrievers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
LazySimpleRetriever,
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)

__all__ = [
"Retriever",
"SimpleRetriever",
"SimpleRetrieverTestReadDecorator",
"AsyncRetriever",
"LazySimpleRetriever",
]
34 changes: 0 additions & 34 deletions airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,40 +645,6 @@ def _deep_merge(
target[key] = value


@dataclass
class SimpleRetrieverTestReadDecorator(SimpleRetriever):
"""
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
slices that are queried throughout a read command.
"""

maximum_number_of_slices: int = 5

def __post_init__(self, options: Mapping[str, Any]) -> None:
super().__post_init__(options)
self.log_formatter = (
(
lambda response: format_http_message(
response,
f"Stream '{self.name}' request",
f"Request performed in order to extract records for stream '{self.name}'",
self.name,
)
)
if not self.log_formatter
else self.log_formatter
)

if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
raise ValueError(
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
)

# stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
return islice(super().stream_slices(), self.maximum_number_of_slices)


@deprecated(
"This class is experimental. Use at your own risk.",
category=ExperimentalClassWarning,
Expand Down
5 changes: 4 additions & 1 deletion airbyte_cdk/sources/declarative/stream_slicers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@
#

from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer_test_read_decorator import (
StreamSlicerTestReadDecorator,
)

__all__ = ["StreamSlicer"]
__all__ = ["StreamSlicer", "StreamSlicerTestReadDecorator"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from itertools import islice
from typing import Any, Iterable, Mapping, Optional, Union

from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
from airbyte_cdk.sources.types import StreamSlice, StreamState


@dataclass
class StreamSlicerTestReadDecorator(StreamSlicer):
"""
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
slices that are queried throughout a read command.
"""

wrapped_slicer: StreamSlicer
maximum_number_of_slices: int = 5

def stream_slices(self) -> Iterable[StreamSlice]:
return islice(self.wrapped_slicer.stream_slices(), self.maximum_number_of_slices)

def __getattr__(self, name: str) -> Any:
# Delegate everything else to the wrapped object
return getattr(self.wrapped_slicer, name)
24 changes: 21 additions & 3 deletions airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,30 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from abc import ABC, abstractmethod
from typing import Iterable
from abc import ABC, ABCMeta, abstractmethod
from typing import Any, Iterable

from airbyte_cdk.sources.types import StreamSlice


class StreamSlicer(ABC):
class StreamSlicerMeta(ABCMeta):
"""
Metaclass for wrapper scenario that allows it to be used as a type check for StreamSlicer.
This is necessary because StreamSlicerTestReadDecorator wraps a StreamSlicer and we want to be able to check
if an instance is a StreamSlicer, even if it is wrapped in a StreamSlicerTestReadDecorator.

For example in ConcurrentDeclarativeSource, we do things like:
isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor))
"""

def __instancecheck__(cls, instance: Any) -> bool:
# Check if it's our wrapper with matching wrapped class
if hasattr(instance, "wrapped_slicer"):
return isinstance(instance.wrapped_slicer, cls)

return super().__instancecheck__(instance)


class StreamSlicer(ABC, metaclass=StreamSlicerMeta):
"""
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.retrievers import SimpleRetrieverTestReadDecorator
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets, update_secrets
from unit_tests.connector_builder.utils import create_configured_catalog
Expand Down Expand Up @@ -1112,7 +1112,8 @@ def test_read_source(mock_http_stream):

streams = source.streams(config)
for s in streams:
assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator)
assert isinstance(s.retriever, SimpleRetriever)
assert isinstance(s.retriever.stream_slicer, StreamSlicerTestReadDecorator)


@patch.object(
Expand Down Expand Up @@ -1158,7 +1159,8 @@ def test_read_source_single_page_single_slice(mock_http_stream):

streams = source.streams(config)
for s in streams:
assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator)
assert isinstance(s.retriever, SimpleRetriever)
assert isinstance(s.retriever.stream_slicer, StreamSlicerTestReadDecorator)


@pytest.mark.parametrize(
Expand Down
Loading
Loading