Skip to content

Connector builder: read input state if it exists #37495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

import dataclasses
from datetime import datetime
from typing import Any, Mapping
from typing import Any, List, Mapping

from airbyte_cdk.connector_builder.message_grouper import MessageGrouper
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import Type
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
Expand Down Expand Up @@ -54,12 +54,12 @@ def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> Manifest


def read_stream(
source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, limits: TestReadLimits
source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, state: List[AirbyteStateMessage], limits: TestReadLimits
) -> AirbyteMessage:
try:
handler = MessageGrouper(limits.max_pages_per_slice, limits.max_slices, limits.max_records)
stream_name = configured_catalog.streams[0].stream.name # The connector builder only supports a single stream
stream_read = handler.get_message_groups(source, config, configured_catalog, limits.max_records)
stream_read = handler.get_message_groups(source, config, configured_catalog, state, limits.max_records)
return AirbyteMessage(
type=MessageType.RECORD,
record=AirbyteRecordMessage(data=dataclasses.asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()),
Expand Down
18 changes: 11 additions & 7 deletions airbyte-cdk/python/airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
from airbyte_cdk.connector import BaseConnector
from airbyte_cdk.connector_builder.connector_builder_handler import TestReadLimits, create_source, get_limits, read_stream, resolve_manifest
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.source import Source
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


def get_config_and_catalog_from_args(args: List[str]) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog]]:
def get_config_and_catalog_from_args(args: List[str]) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
# TODO: Add functionality for the `debug` logger.
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
parsed_args = AirbyteEntrypoint.parse_args(args)
config_path, catalog_path = parsed_args.config, parsed_args.catalog
config_path, catalog_path, state_path = parsed_args.config, parsed_args.catalog, parsed_args.state
if parsed_args.command != "read":
raise ValueError("Only read commands are allowed for Connector Builder requests.")

Expand All @@ -32,38 +33,41 @@ def get_config_and_catalog_from_args(args: List[str]) -> Tuple[str, Mapping[str,
command = config["__command"]
if command == "test_read":
catalog = ConfiguredAirbyteCatalog.parse_obj(BaseConnector.read_config(catalog_path))
state = Source.read_state(state_path)
else:
catalog = None
state = []

if "__injected_declarative_manifest" not in config:
raise ValueError(
f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}"
)

return command, config, catalog
return command, config, catalog, state


def handle_connector_builder_request(
source: ManifestDeclarativeSource,
command: str,
config: Mapping[str, Any],
catalog: Optional[ConfiguredAirbyteCatalog],
state: List[AirbyteStateMessage],
Copy link
Contributor

@lmossman lmossman Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked the same question over slack, but asking here too for posterity: it seems like the connection state UI shows a list of AirbyteStreamState objects, whereas this method takes in a list of AirbyteStateMessages.

Screenshot 2024-04-23 at 2 23 21 PM

Which one is right? Ideally these would be consistent so anyone could copy the state from the connection UI and paste it into the builder's state input

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed on slack:

  • we'll use AirbyteStateMessages in the builder because they contain more information
  • we won't couple this with the platform view. We'll brainstorm novel solutions if the need to paste configs from existing connections really is a need

limits: TestReadLimits,
) -> AirbyteMessage:
if command == "resolve_manifest":
return resolve_manifest(source)
elif command == "test_read":
assert catalog is not None, "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
return read_stream(source, config, catalog, limits)
return read_stream(source, config, catalog, state, limits)
else:
raise ValueError(f"Unrecognized command {command}.")


def handle_request(args: List[str]) -> AirbyteMessage:
command, config, catalog = get_config_and_catalog_from_args(args)
command, config, catalog, state = get_config_and_catalog_from_args(args)
limits = get_limits(config)
source = create_source(config, limits)
return handle_connector_builder_request(source, command, config, catalog, limits).json(exclude_unset=True)
return handle_connector_builder_request(source, command, config, catalog, state, limits).json(exclude_unset=True)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
AirbyteControlMessage,
AirbyteLogMessage,
AirbyteMessage,
AirbyteStateMessage,
AirbyteTraceMessage,
ConfiguredAirbyteCatalog,
OrchestratorType,
Expand Down Expand Up @@ -75,6 +76,7 @@ def get_message_groups(
source: DeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
record_limit: Optional[int] = None,
) -> StreamRead:
if record_limit is not None and not (1 <= record_limit <= self._max_record_limit):
Expand All @@ -96,7 +98,7 @@ def get_message_groups(
latest_config_update: AirbyteControlMessage = None
auxiliary_requests = []
for message_group in self._get_message_groups(
self._read_stream(source, config, configured_catalog),
self._read_stream(source, config, configured_catalog, state),
schema_inferrer,
datetime_format_inferrer,
record_limit,
Expand Down Expand Up @@ -279,12 +281,13 @@ def _close_page(
current_page_records.clear()

def _read_stream(
self, source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog
self, source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage]
) -> Iterator[AirbyteMessage]:
# the generator can raise an exception
# iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage
try:
yield from AirbyteEntrypoint(source).read(source.spec(self.logger), config, configured_catalog, {})
yield from AirbyteEntrypoint(source).read(source.spec(self.logger), config, configured_catalog, state)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the input state used to be hardcoded to an empty legacy state object

except Exception as e:
error_message = f"{e.args[0] if len(e.args) > 0 else str(e)}"
yield AirbyteTracedException.from_exception(e, message=error_message).as_airbyte_message()
Expand Down
Loading