Skip to content

⚡️ Speed up method ConnectorStateManager._extract_from_state_message by 72% in PR #44444 (artem1205/airbyte-cdk-protocol-dataclasses-serpyco-rs) #44943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ class AirbyteGlobalState:
class AirbyteStateMessage:
type: Optional[AirbyteStateType] = None # type: ignore [name-defined]
stream: Optional[AirbyteStreamState] = None
global_: Annotated[
AirbyteGlobalState | None, Alias("global")
] = None # "global" is a reserved keyword in python ⇒ Alias is used for (de-)serialization
global_: Annotated[AirbyteGlobalState | None, Alias("global")] = (
None # "global" is a reserved keyword in python ⇒ Alias is used for (de-)serialization
)
data: Optional[Dict[str, Any]] = None
sourceStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined]
destinationStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined]
Expand Down
34 changes: 17 additions & 17 deletions airbyte-cdk/python/airbyte_cdk/sources/connector_state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from airbyte_cdk.models import AirbyteMessage, AirbyteStateBlob, AirbyteStateMessage, AirbyteStateType, AirbyteStreamState, StreamDescriptor
from airbyte_cdk.models import Type as MessageType
from airbyte_protocol_dataclasses.models import *


@dataclass(frozen=True)
Expand Down Expand Up @@ -95,35 +96,34 @@ def _extract_from_state_message(
:param state: The incoming state input
:return: A tuple of shared state and per stream state assembled from the incoming state list
"""
if state is None:
if not state:
return None, {}

is_global = cls._is_global_state(state)

if is_global:
global_state = state[0].global_ # type: ignore # We verified state is a list in _is_global_state
shared_state = copy.deepcopy(global_state.shared_state, {}) # type: ignore[union-attr] # global_state has shared_state
if cls._is_global_state(state):
global_state = state[0].global_
shared_state = global_state.shared_state # type: ignore[union-attr]
streams = {
HashableStreamDescriptor(
name=per_stream_state.stream_descriptor.name, namespace=per_stream_state.stream_descriptor.namespace
): per_stream_state.stream_state
for per_stream_state in global_state.stream_states # type: ignore[union-attr] # global_state has shared_state
for per_stream_state in global_state.stream_states # type: ignore[union-attr]
}
return shared_state, streams
else:
streams = {
HashableStreamDescriptor(
name=per_stream_state.stream.stream_descriptor.name, namespace=per_stream_state.stream.stream_descriptor.namespace # type: ignore[union-attr] # stream has stream_descriptor
): per_stream_state.stream.stream_state # type: ignore[union-attr] # stream has stream_state
for per_stream_state in state
if per_stream_state.type == AirbyteStateType.STREAM and hasattr(per_stream_state, "stream") # type: ignore # state is always a list of AirbyteStateMessage if is_per_stream is True
}
return None, streams

streams = {
HashableStreamDescriptor(
name=per_stream_state.stream.stream_descriptor.name,
namespace=per_stream_state.stream.stream_descriptor.namespace, # type: ignore[union-attr]
): per_stream_state.stream.stream_state # type: ignore[union-attr]
for per_stream_state in state
if per_stream_state.type == AirbyteStateType.STREAM
}
return None, streams

@staticmethod
def _is_global_state(state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]]) -> bool:
return (
isinstance(state, List)
isinstance(state, list)
and len(state) == 1
and isinstance(state[0], AirbyteStateMessage)
and state[0].type == AirbyteStateType.GLOBAL
Expand Down
Loading