Skip to content

Commit bf3aecc

Browse files
authored
Merge branch 'master' into akash/wass-algo
2 parents 37a9c41 + 68903b4 commit bf3aecc

File tree

62 files changed

+1868
-643
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+1868
-643
lines changed

.github/workflows/connectors_up_to_date.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ on:
1111
inputs:
1212
connectors-options:
1313
description: "Options to pass to the 'airbyte-ci connectors' command group."
14-
default: "--concurrency=10 --language=python --language=low-code"
14+
default: "--concurrency=10 --language=python --language=low-code --language=manifest-only"
1515
auto-merge:
1616
description: "Whether to auto-merge the PRs created by the action."
1717
default: "false"

airbyte-cdk/python/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## 3.4.0
4+
file-based cdk: add config option to limit number of files for schema discover
5+
36
## 3.3.0
47
CDK: add incomplete status to availability check during read
58

airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
4+
import logging
55
from dataclasses import InitVar, dataclass
66
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Optional, Union
77

88
import dpath
9-
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
9+
from airbyte_cdk.models import AirbyteMessage
10+
from airbyte_cdk.models import Type as MessageType
1011
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1112
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
1213
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
1314
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
15+
from airbyte_cdk.utils import AirbyteTracedException
1416

1517
if TYPE_CHECKING:
1618
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
@@ -131,40 +133,70 @@ def stream_slices(self) -> Iterable[StreamSlice]:
131133
parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string
132134
partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string
133135
incremental_dependency = parent_stream_config.incremental_dependency
134-
for parent_stream_slice in parent_stream.stream_slices(
135-
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=None
136-
):
137-
parent_partition = parent_stream_slice.partition if parent_stream_slice else {}
138-
139-
# we need to read all records for slice to update the parent stream cursor
140-
stream_slices_for_parent = []
141-
142-
# only stream_slice param is used in the declarative stream, stream state is set in PerPartitionCursor set_initial_state
143-
for parent_record in parent_stream.read_records(
144-
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None
145-
):
146-
# Skip non-records (eg AirbyteLogMessage)
147-
if isinstance(parent_record, AirbyteMessage):
148-
if parent_record.type == Type.RECORD:
149-
parent_record = parent_record.record.data
150-
else:
151-
continue
152-
elif isinstance(parent_record, Record):
153-
parent_record = parent_record.data
154-
try:
155-
partition_value = dpath.get(parent_record, parent_field)
156-
except KeyError:
157-
pass
136+
137+
stream_slices_for_parent = []
138+
previous_associated_slice = None
139+
140+
# read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
141+
# not support either substreams or RFR, but something that needs to be considered once we do
142+
for parent_record in parent_stream.read_only_records():
143+
parent_partition = None
144+
parent_associated_slice = None
145+
# Skip non-records (eg AirbyteLogMessage)
146+
if isinstance(parent_record, AirbyteMessage):
147+
self.logger.warning(
148+
f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state."
149+
)
150+
if parent_record.type == MessageType.RECORD:
151+
parent_record = parent_record.record.data
158152
else:
159-
stream_slices_for_parent.append(
160-
StreamSlice(partition={partition_field: partition_value, "parent_slice": parent_partition}, cursor_slice={})
153+
continue
154+
elif isinstance(parent_record, Record):
155+
parent_partition = parent_record.associated_slice.partition if parent_record.associated_slice else {}
156+
parent_associated_slice = parent_record.associated_slice
157+
parent_record = parent_record.data
158+
elif not isinstance(parent_record, Mapping):
159+
# The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid
160+
raise AirbyteTracedException(message=f"Parent stream returned records as invalid type {type(parent_record)}")
161+
try:
162+
partition_value = dpath.get(parent_record, parent_field)
163+
except KeyError:
164+
pass
165+
else:
166+
if incremental_dependency:
167+
if previous_associated_slice is None:
168+
previous_associated_slice = parent_associated_slice
169+
elif previous_associated_slice != parent_associated_slice:
170+
# Update the parent state, as parent stream read all record for current slice and state
171+
# is already updated.
172+
#
173+
# When the associated slice of the current record of the parent stream changes, this
174+
# indicates the parent stream has finished processing the current slice and has moved onto
175+
# the next. When this happens, we should update the partition router's current state and
176+
# flush the previous set of collected records and start a new set
177+
#
178+
# Note: One tricky aspect to take note of here is that parent_stream.state will actually
179+
# fetch state of the stream of the previous record's slice NOT the current record's slice.
180+
# This is because in the retriever, we only update stream state after yielding all the
181+
# records. And since we are in the middle of the current slice, parent_stream.state is
182+
# still set to the previous state.
183+
self._parent_state[parent_stream.name] = parent_stream.state
184+
yield from stream_slices_for_parent
185+
186+
# Reset stream_slices_for_parent after we've flushed parent records for the previous parent slice
187+
stream_slices_for_parent = []
188+
previous_associated_slice = parent_associated_slice
189+
stream_slices_for_parent.append(
190+
StreamSlice(
191+
partition={partition_field: partition_value, "parent_slice": parent_partition or {}}, cursor_slice={}
161192
)
193+
)
162194

163-
# update the parent state, as parent stream read all record for current slice and state is already updated
164-
if incremental_dependency:
165-
self._parent_state[parent_stream.name] = parent_stream.state
195+
# A final parent state update and yield of records is needed, so we don't skip records for the final parent slice
196+
if incremental_dependency:
197+
self._parent_state[parent_stream.name] = parent_stream.state
166198

167-
yield from stream_slices_for_parent
199+
yield from stream_slices_for_parent
168200

169201
def set_initial_state(self, stream_state: StreamState) -> None:
170202
"""
@@ -215,3 +247,7 @@ def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
215247
}
216248
"""
217249
return self._parent_state
250+
251+
@property
252+
def logger(self) -> logging.Logger:
253+
return logging.getLogger("airbyte.SubstreamPartitionRouter")

airbyte-cdk/python/airbyte_cdk/sources/file_based/config/file_based_stream_config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ class FileBasedStreamConfig(BaseModel):
6464
description="When enabled, syncs will not validate or structure records against the stream's schema.",
6565
default=False,
6666
)
67+
recent_n_files_to_read_for_schema_discovery: Optional[int] = Field(
68+
title="Files To Read For Schema Discover",
69+
description="The number of resent files which will be used to discover the schema for this stream.",
70+
default=None,
71+
gt=0,
72+
)
6773

6874
@validator("input_schema", pre=True)
6975
def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:

airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class AbstractFileBasedStream(Stream):
3232
files in the stream.
3333
- A DiscoveryPolicy that controls the number of concurrent requests sent to the source
3434
during discover, and the number of files used for schema discovery.
35-
- A dictionary of FileType:Parser that holds all of the file types that can be handled
35+
- A dictionary of FileType:Parser that holds all the file types that can be handled
3636
by the stream.
3737
"""
3838

@@ -70,7 +70,7 @@ def list_files(self) -> List[RemoteFile]:
7070
List all files that belong to the stream.
7171
7272
The output of this method is cached so we don't need to list the files more than once.
73-
This means we won't pick up changes to the files during a sync. This meethod uses the
73+
This means we won't pick up changes to the files during a sync. This method uses the
7474
get_files method which is implemented by the concrete stream class.
7575
"""
7676
return list(self.get_files())

airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -191,30 +191,40 @@ def _get_raw_json_schema(self) -> JsonSchema:
191191
return schemaless_schema
192192
else:
193193
files = self.list_files()
194-
total_n_files = len(files)
195-
196-
if total_n_files == 0:
197-
self.logger.warning(msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream.")
198-
return schemaless_schema
199-
200-
max_n_files_for_schema_inference = self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser())
201-
if total_n_files > max_n_files_for_schema_inference:
202-
# Use the most recent files for schema inference, so we pick up schema changes during discovery.
203-
files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:max_n_files_for_schema_inference]
204-
self.logger.warn(
205-
msg=f"Refusing to infer schema for all {total_n_files} files; using {max_n_files_for_schema_inference} files."
194+
first_n_files = len(files)
195+
196+
if self.config.recent_n_files_to_read_for_schema_discovery:
197+
self.logger.info(
198+
msg=(
199+
f"Only first {self.config.recent_n_files_to_read_for_schema_discovery} files will be used to infer schema "
200+
f"for stream {self.name} due to limitation in config."
201+
)
206202
)
203+
first_n_files = self.config.recent_n_files_to_read_for_schema_discovery
207204

208-
inferred_schema = self.infer_schema(files)
205+
if first_n_files == 0:
206+
self.logger.warning(msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream.")
207+
return schemaless_schema
209208

210-
if not inferred_schema:
211-
raise InvalidSchemaError(
212-
FileBasedSourceError.INVALID_SCHEMA_ERROR,
213-
details=f"Empty schema. Please check that the files are valid for format {self.config.format}",
214-
stream=self.name,
215-
)
209+
max_n_files_for_schema_inference = self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser())
210+
211+
if first_n_files > max_n_files_for_schema_inference:
212+
# Use the most recent files for schema inference, so we pick up schema changes during discovery.
213+
self.logger.warning(msg=f"Refusing to infer schema for {first_n_files} files; using {max_n_files_for_schema_inference} files.")
214+
first_n_files = max_n_files_for_schema_inference
215+
216+
files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:first_n_files]
217+
218+
inferred_schema = self.infer_schema(files)
219+
220+
if not inferred_schema:
221+
raise InvalidSchemaError(
222+
FileBasedSourceError.INVALID_SCHEMA_ERROR,
223+
details=f"Empty schema. Please check that the files are valid for format {self.config.format}",
224+
stream=self.name,
225+
)
216226

217-
schema = {"type": "object", "properties": inferred_schema}
227+
schema = {"type": "object", "properties": inferred_schema}
218228

219229
return schema
220230

airbyte-cdk/python/airbyte_cdk/sources/streams/core.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
1111

1212
import airbyte_cdk.sources.utils.casing as casing
13-
from airbyte_cdk.models import AirbyteMessage, AirbyteStream, ConfiguredAirbyteStream, SyncMode
13+
from airbyte_cdk.models import AirbyteMessage, AirbyteStream, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode
1414
from airbyte_cdk.models import Type as MessageType
1515
from airbyte_cdk.sources.streams.checkpoint import (
1616
CheckpointMode,
@@ -24,7 +24,7 @@
2424

2525
# list of all possible HTTP methods which can be used for sending of request bodies
2626
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, ResourceSchemaLoader
27-
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
27+
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
2828
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
2929
from deprecated import deprecated
3030

@@ -156,6 +156,7 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o
156156
except AttributeError:
157157
pass
158158

159+
should_checkpoint = bool(state_manager)
159160
checkpoint_reader = self._get_checkpoint_reader(
160161
logger=logger, cursor_field=cursor_field, sync_mode=sync_mode, stream_state=stream_state
161162
)
@@ -193,25 +194,51 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o
193194

194195
checkpoint_interval = self.state_checkpoint_interval
195196
checkpoint = checkpoint_reader.get_checkpoint()
196-
if checkpoint_interval and record_counter % checkpoint_interval == 0 and checkpoint is not None:
197+
if should_checkpoint and checkpoint_interval and record_counter % checkpoint_interval == 0 and checkpoint is not None:
197198
airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager)
198199
yield airbyte_state_message
199200

200201
if internal_config.is_limit_reached(record_counter):
201202
break
202203
self._observe_state(checkpoint_reader)
203204
checkpoint_state = checkpoint_reader.get_checkpoint()
204-
if checkpoint_state is not None:
205+
if should_checkpoint and checkpoint_state is not None:
205206
airbyte_state_message = self._checkpoint_state(checkpoint_state, state_manager=state_manager)
206207
yield airbyte_state_message
207208

208209
next_slice = checkpoint_reader.next()
209210

210211
checkpoint = checkpoint_reader.get_checkpoint()
211-
if checkpoint is not None:
212+
if should_checkpoint and checkpoint is not None:
212213
airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager)
213214
yield airbyte_state_message
214215

216+
def read_only_records(self, state: Optional[Mapping[str, Any]] = None) -> Iterable[StreamData]:
217+
"""
218+
Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports
219+
incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter)
220+
or emit state messages.
221+
"""
222+
223+
configured_stream = ConfiguredAirbyteStream(
224+
stream=AirbyteStream(
225+
name=self.name,
226+
json_schema={},
227+
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
228+
),
229+
sync_mode=SyncMode.incremental if state else SyncMode.full_refresh,
230+
destination_sync_mode=DestinationSyncMode.append,
231+
)
232+
233+
yield from self.read(
234+
configured_stream=configured_stream,
235+
logger=self.logger,
236+
slice_logger=DebugSliceLogger(),
237+
stream_state=dict(state) if state else {}, # read() expects MutableMapping instead of Mapping which is used more often
238+
state_manager=None,
239+
internal_config=InternalConfig(),
240+
)
241+
215242
@abstractmethod
216243
def read_records(
217244
self,

0 commit comments

Comments
 (0)