-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Connector builder: read input state if it exists #37495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
b90b34a
4e0268e
9cfc434
7a2ee53
543e233
c57b31b
4c17b72
c5ad904
bf825d2
188c998
cd4d129
0e44d25
c8efa13
117ea42
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ | |
AirbyteControlMessage, | ||
AirbyteLogMessage, | ||
AirbyteMessage, | ||
AirbyteStateMessage, | ||
AirbyteTraceMessage, | ||
ConfiguredAirbyteCatalog, | ||
OrchestratorType, | ||
|
@@ -75,6 +76,7 @@ def get_message_groups( | |
source: DeclarativeSource, | ||
config: Mapping[str, Any], | ||
configured_catalog: ConfiguredAirbyteCatalog, | ||
state: List[AirbyteStateMessage], | ||
record_limit: Optional[int] = None, | ||
) -> StreamRead: | ||
if record_limit is not None and not (1 <= record_limit <= self._max_record_limit): | ||
|
@@ -96,7 +98,7 @@ def get_message_groups( | |
latest_config_update: AirbyteControlMessage = None | ||
auxiliary_requests = [] | ||
for message_group in self._get_message_groups( | ||
self._read_stream(source, config, configured_catalog), | ||
self._read_stream(source, config, configured_catalog, state), | ||
schema_inferrer, | ||
datetime_format_inferrer, | ||
record_limit, | ||
|
@@ -279,12 +281,13 @@ def _close_page( | |
current_page_records.clear() | ||
|
||
def _read_stream( | ||
self, source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog | ||
self, source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, | ||
state: List[AirbyteStateMessage] | ||
) -> Iterator[AirbyteMessage]: | ||
# the generator can raise an exception | ||
# iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage | ||
try: | ||
yield from AirbyteEntrypoint(source).read(source.spec(self.logger), config, configured_catalog, {}) | ||
yield from AirbyteEntrypoint(source).read(source.spec(self.logger), config, configured_catalog, state) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the input state used to be hardcoded to an empty legacy state object |
||
except Exception as e: | ||
error_message = f"{e.args[0] if len(e.args) > 0 else str(e)}" | ||
yield AirbyteTracedException.from_exception(e, message=error_message).as_airbyte_message() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,8 @@ | |
_stream_options = {"name": _stream_name, "primary_key": _stream_primary_key, "url_base": _stream_url_base} | ||
_page_size = 2 | ||
|
||
_NO_STATE = [] | ||
|
||
MANIFEST = { | ||
"version": "0.30.3", | ||
"definitions": { | ||
|
@@ -266,7 +268,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): | |
config["__command"] = command | ||
source = ManifestDeclarativeSource(MANIFEST) | ||
limits = TestReadLimits() | ||
resolved_manifest = handle_connector_builder_request(source, command, config, create_configured_catalog("dummy_stream"), limits) | ||
resolved_manifest = handle_connector_builder_request(source, command, config, create_configured_catalog("dummy_stream"), _NO_STATE, limits) | ||
|
||
expected_resolved_manifest = { | ||
"type": "DeclarativeSource", | ||
|
@@ -457,7 +459,7 @@ def test_read(): | |
limits = TestReadLimits() | ||
with patch("airbyte_cdk.connector_builder.message_grouper.MessageGrouper.get_message_groups", return_value=stream_read): | ||
output_record = handle_connector_builder_request( | ||
source, "test_read", config, ConfiguredAirbyteCatalog.parse_obj(CONFIGURED_CATALOG), limits | ||
source, "test_read", config, ConfiguredAirbyteCatalog.parse_obj(CONFIGURED_CATALOG), _NO_STATE, limits | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add a test here that actually sets some state, to make sure that is handled properly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good idea. Updated the input state and added an assertion https://github.com/airbytehq/airbyte/pull/37495/files#diff-f655e63774ac8a9d9b3b6f37f553a5e726643dc5da46d39ab11e879775b8a3edR477 |
||
) | ||
output_record.record.emitted_at = 1 | ||
assert output_record == expected_airbyte_message | ||
|
@@ -492,7 +494,7 @@ def test_config_update(): | |
return_value=refresh_request_response, | ||
): | ||
output = handle_connector_builder_request( | ||
source, "test_read", config, ConfiguredAirbyteCatalog.parse_obj(CONFIGURED_CATALOG), TestReadLimits() | ||
source, "test_read", config, ConfiguredAirbyteCatalog.parse_obj(CONFIGURED_CATALOG), _NO_STATE, TestReadLimits() | ||
) | ||
assert output.record.data["latest_config_update"] | ||
|
||
|
@@ -529,7 +531,7 @@ def check_config_against_spec(self): | |
|
||
source = MockManifestDeclarativeSource() | ||
limits = TestReadLimits() | ||
response = read_stream(source, TEST_READ_CONFIG, ConfiguredAirbyteCatalog.parse_obj(CONFIGURED_CATALOG), limits) | ||
response = read_stream(source, TEST_READ_CONFIG, ConfiguredAirbyteCatalog.parse_obj(CONFIGURED_CATALOG), _NO_STATE, limits) | ||
|
||
expected_stream_read = StreamRead( | ||
logs=[LogMessage("error_message - a stack trace", "ERROR")], | ||
|
@@ -716,7 +718,7 @@ def test_read_source(mock_http_stream): | |
|
||
source = create_source(config, limits) | ||
|
||
output_data = read_stream(source, config, catalog, limits).record.data | ||
output_data = read_stream(source, config, catalog, _NO_STATE, limits).record.data | ||
slices = output_data["slices"] | ||
|
||
assert len(slices) == max_slices | ||
|
@@ -761,7 +763,7 @@ def test_read_source_single_page_single_slice(mock_http_stream): | |
|
||
source = create_source(config, limits) | ||
|
||
output_data = read_stream(source, config, catalog, limits).record.data | ||
output_data = read_stream(source, config, catalog, _NO_STATE, limits).record.data | ||
slices = output_data["slices"] | ||
|
||
assert len(slices) == max_slices | ||
|
@@ -817,7 +819,7 @@ def test_handle_read_external_requests(deployment_mode, url_base, expected_error | |
source = create_source(config, limits) | ||
|
||
with mock.patch.dict(os.environ, {"DEPLOYMENT_MODE": deployment_mode}, clear=False): | ||
output_data = read_stream(source, config, catalog, limits).record.data | ||
output_data = read_stream(source, config, catalog, _NO_STATE, limits).record.data | ||
if expected_error: | ||
assert len(output_data["logs"]) > 0, "Expected at least one log message with the expected error" | ||
error_message = output_data["logs"][0] | ||
|
@@ -875,7 +877,7 @@ def test_handle_read_external_oauth_request(deployment_mode, token_url, expected | |
source = create_source(config, limits) | ||
|
||
with mock.patch.dict(os.environ, {"DEPLOYMENT_MODE": deployment_mode}, clear=False): | ||
output_data = read_stream(source, config, catalog, limits).record.data | ||
output_data = read_stream(source, config, catalog, _NO_STATE, limits).record.data | ||
if expected_error: | ||
assert len(output_data["logs"]) > 0, "Expected at least one log message with the expected error" | ||
error_message = output_data["logs"][0] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I asked the same question over slack, but asking here too for posterity: it seems like the connection state UI shows a list of AirbyteStreamState objects, whereas this method takes in a list of AirbyteStateMessages.
Which one is right? Ideally these would be consistent so anyone could copy the state from the connection UI and paste it into the builder's state input
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed on slack:
AirbyteStateMessage
s in the builder because they contain more information