|
2 | 2 |
|
3 | 3 | from airbyte_cdk.models import AirbyteMessage, Type
|
4 | 4 | from airbyte_cdk.sources.connector_state_manager import HashableStreamDescriptor
|
| 5 | +from airbyte_protocol_dataclasses.models import * |
5 | 6 |
|
6 | 7 |
|
7 | 8 | def get_stream_descriptor(message: AirbyteMessage) -> HashableStreamDescriptor:
|
8 |
| - match message.type: |
9 |
| - case Type.RECORD: |
10 |
| - return HashableStreamDescriptor(name=message.record.stream, namespace=message.record.namespace) # type: ignore[union-attr] # record has `stream` and `namespace` |
11 |
| - case Type.STATE: |
12 |
| - if not message.state.stream or not message.state.stream.stream_descriptor: # type: ignore[union-attr] # state has `stream` |
13 |
| - raise ValueError("State message was not in per-stream state format, which is required for record counts.") |
14 |
| - return HashableStreamDescriptor( |
15 |
| - name=message.state.stream.stream_descriptor.name, namespace=message.state.stream.stream_descriptor.namespace # type: ignore[union-attr] # state has `stream` |
16 |
| - ) |
17 |
| - case _: |
18 |
| - raise NotImplementedError(f"get_stream_descriptor is not implemented for message type '{message.type}'.") |
| 9 | + mtype = message.type # Cache the access to message.type |
| 10 | + if mtype == Type.RECORD: |
| 11 | + record = message.record # Cache the access to message.record |
| 12 | + return HashableStreamDescriptor(name=record.stream, namespace=record.namespace) |
| 13 | + elif mtype == Type.STATE: |
| 14 | + state = message.state # Cache the access to message.state |
| 15 | + stream = state.stream # Cache the access to state.stream |
| 16 | + descriptor = stream.stream_descriptor # Cache the access to stream.stream_descriptor |
| 17 | + if not stream or not descriptor: |
| 18 | + raise ValueError("State message was not in per-stream state format, which is required for record counts.") |
| 19 | + return HashableStreamDescriptor(name=descriptor.name, namespace=descriptor.namespace) |
| 20 | + else: |
| 21 | + raise NotImplementedError(f"get_stream_descriptor is not implemented for message type '{mtype}'.") |
0 commit comments