Skip to content

Commit 15d0648

Browse files
committed
pr feedback to get rid of emit strategy, add caching
1 parent 93f83ca commit 15d0648

File tree

6 files changed

+21
-69
lines changed

6 files changed

+21
-69
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ definitions:
341341
properties:
342342
type:
343343
type: string
344-
enum: [ DynamicStreamCheckConfig ]
344+
enum: [DynamicStreamCheckConfig]
345345
dynamic_stream_name:
346346
title: Dynamic Stream Name
347347
description: The dynamic stream name.
@@ -1044,18 +1044,6 @@ definitions:
10441044
$parameters:
10451045
type: object
10461046
additionalProperties: true
1047-
EmitPartialRecordMergeStrategy:
1048-
title: Emit Partial Record
1049-
description: Record merge strategy where in the case where multiple requests are needed to retrieve all properties, properties are not consolidated back into a single record and are instead emitted as separate groups of properties. This strategy should only be used when records do not have a unique identifier like a primary key.
1050-
required:
1051-
- type
1052-
properties:
1053-
type:
1054-
type: string
1055-
enum: [EmitPartialRecordMergeStrategy]
1056-
$parameters:
1057-
type: object
1058-
additionalProperties: true
10591047
JwtAuthenticator:
10601048
title: JWT Authenticator
10611049
description: Authenticator for requests using JWT authentication flow.
@@ -3081,9 +3069,7 @@ definitions:
30813069
record_merge_strategy:
30823070
title: Record Merge Strategy
30833071
description: Dictates how to records that require multiple requests to get all properties should be emitted to the destination
3084-
anyOf:
3085-
- "$ref": "#/definitions/EmitPartialRecordMergeStrategy"
3086-
- "$ref": "#/definitions/GroupByKeyMergeStrategy"
3072+
"$ref": "#/definitions/GroupByKeyMergeStrategy"
30873073
$parameters:
30883074
type: object
30893075
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class DynamicStreamCheckConfig(BaseModel):
5151
)
5252
stream_count: Optional[int] = Field(
5353
0,
54-
description="Numbers of the streams to try reading from when running a check operation.",
54+
description="The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.",
5555
title="Stream Count",
5656
)
5757

@@ -347,11 +347,6 @@ class Clamping(BaseModel):
347347
target_details: Optional[Dict[str, Any]] = None
348348

349349

350-
class EmitPartialRecordMergeStrategy(BaseModel):
351-
type: Literal["EmitPartialRecordMergeStrategy"]
352-
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
353-
354-
355350
class Algorithm(Enum):
356351
HS256 = "HS256"
357352
HS384 = "HS384"
@@ -1224,9 +1219,7 @@ class PropertyChunking(BaseModel):
12241219
description="The maximum amount of properties that can be retrieved per request according to the limit type.",
12251220
title="Property Limit",
12261221
)
1227-
record_merge_strategy: Optional[
1228-
Union[EmitPartialRecordMergeStrategy, GroupByKeyMergeStrategy]
1229-
] = Field(
1222+
record_merge_strategy: Optional[GroupByKeyMergeStrategy] = Field(
12301223
None,
12311224
description="Dictates how to records that require multiple requests to get all properties should be emitted to the destination",
12321225
title="Record Merge Strategy",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,6 @@
225225
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
226226
DynamicStreamCheckConfig as DynamicStreamCheckConfigModel,
227227
)
228-
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
229-
EmitPartialRecordMergeStrategy as EmitPartialRecordMergeStrategyModel,
230-
)
231228
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
232229
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
233230
)
@@ -459,7 +456,6 @@
459456
PropertyLimitType,
460457
)
461458
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import (
462-
EmitPartialRecord,
463459
GroupByKey,
464460
)
465461
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
@@ -623,7 +619,6 @@ def _init_mappings(self) -> None:
623619
DefaultErrorHandlerModel: self.create_default_error_handler,
624620
DefaultPaginatorModel: self.create_default_paginator,
625621
DpathExtractorModel: self.create_dpath_extractor,
626-
EmitPartialRecordMergeStrategyModel: self.create_emit_partial_record,
627622
ResponseToFileExtractorModel: self.create_response_to_file_extractor,
628623
ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy,
629624
SessionTokenAuthenticatorModel: self.create_session_token_authenticator,
@@ -805,12 +800,6 @@ def create_dpath_flatten_fields(
805800
parameters=model.parameters or {},
806801
)
807802

808-
@staticmethod
809-
def create_emit_partial_record(
810-
model: EmitPartialRecordMergeStrategyModel, config: Config, **kwargs: Any
811-
) -> EmitPartialRecord:
812-
return EmitPartialRecord(config=config, parameters=model.parameters or {})
813-
814803
@staticmethod
815804
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
816805
if not value_type:
@@ -2149,6 +2138,7 @@ def create_http_requester(
21492138
config: Config,
21502139
decoder: Decoder = JsonDecoder(parameters={}),
21512140
query_properties_key: Optional[str] = None,
2141+
use_cache: Optional[bool] = None,
21522142
*,
21532143
name: str,
21542144
) -> HttpRequester:
@@ -2189,7 +2179,7 @@ def create_http_requester(
21892179
assert model.use_cache is not None # for mypy
21902180
assert model.http_method is not None # for mypy
21912181

2192-
use_cache = model.use_cache and not self._disable_cache
2182+
should_use_cache = (model.use_cache or bool(use_cache)) and not self._disable_cache
21932183

21942184
return HttpRequester(
21952185
name=name,
@@ -2204,7 +2194,7 @@ def create_http_requester(
22042194
disable_retries=self._disable_retries,
22052195
parameters=model.parameters or {},
22062196
message_repository=self._message_repository,
2207-
use_cache=use_cache,
2197+
use_cache=should_use_cache,
22082198
decoder=decoder,
22092199
stream_response=decoder.is_stream_response() if decoder else False,
22102200
)
@@ -2308,10 +2298,11 @@ def create_dynamic_schema_loader(
23082298
retriever = self._create_component_from_model(
23092299
model=model.retriever,
23102300
config=config,
2311-
name="",
2301+
name="dynamic_properties",
23122302
primary_key=None,
23132303
stream_slicer=combined_slicers,
23142304
transformations=[],
2305+
use_cache=True,
23152306
)
23162307
schema_type_identifier = self._create_component_from_model(
23172308
model.schema_type_identifier, config=config, parameters=model.parameters or {}
@@ -2652,14 +2643,14 @@ def create_parent_stream_config(
26522643
def create_properties_from_endpoint(
26532644
self, model: PropertiesFromEndpointModel, config: Config, **kwargs: Any
26542645
) -> PropertiesFromEndpoint:
2655-
name = "property_retriever"
26562646
retriever = self._create_component_from_model(
26572647
model=model.retriever,
26582648
config=config,
2659-
name=name,
2649+
name="dynamic_properties",
26602650
primary_key=None,
26612651
stream_slicer=None,
26622652
transformations=[],
2653+
use_cache=True, # Enable caching on the HttpRequester/HttpClient because the properties endpoint will be called for every slice being processed, and it is highly unlikely for the response to different
26632654
)
26642655
return PropertiesFromEndpoint(
26652656
property_field_path=model.property_field_path,
@@ -2867,6 +2858,7 @@ def create_simple_retriever(
28672858
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
28682859
]
28692860
] = None,
2861+
use_cache: Optional[bool] = None,
28702862
**kwargs: Any,
28712863
) -> SimpleRetriever:
28722864
decoder = (
@@ -2928,9 +2920,10 @@ def create_simple_retriever(
29282920
requester = self._create_component_from_model(
29292921
model=model.requester,
29302922
decoder=decoder,
2923+
name=name,
29312924
query_properties_key=query_properties_key,
2925+
use_cache=use_cache,
29322926
config=config,
2933-
name=name,
29342927
)
29352928
url_base = (
29362929
model.requester.url_base

airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ class QueryProperties:
2828
def get_request_property_chunks(
2929
self, stream_slice: Optional[StreamSlice] = None
3030
) -> Iterable[List[str]]:
31+
"""
32+
Uses the defined property_list to fetch the total set of properties dynamically or from a static list
33+
and based on the resulting properties, performs property chunking if applicable.
34+
:param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included
35+
because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object
36+
"""
3137
fields: Union[Iterable[str], List[str]]
3238
if isinstance(self.property_list, PropertiesFromEndpoint):
3339
fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice)
Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
22

3-
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.emit_partial_record import (
4-
EmitPartialRecord,
5-
)
63
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.group_by_key import (
74
GroupByKey,
85
)
96
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
107
RecordMergeStrategy,
118
)
129

13-
__all__ = ["EmitPartialRecord", "GroupByKey", "RecordMergeStrategy"]
10+
__all__ = ["GroupByKey", "RecordMergeStrategy"]

airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)