-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(airbyte-cdk): replace pydantic BaseModel
with dataclasses
+ serpyco-rs
in protocol
#44444
feat(airbyte-cdk): replace pydantic BaseModel
with dataclasses
+ serpyco-rs
in protocol
#44444
Conversation
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
…dk-protocol-dataclasses # Conflicts: # airbyte-cdk/python/poetry.lock
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
BaseModel
with dataclasses
+ serpyco-rs
in protocol
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
…205/airbyte-cdk-protocol-dataclasses-serpyco-rs # Conflicts: # airbyte-cdk/python/airbyte_cdk/sources/connector_state_manager.py # airbyte-cdk/python/unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py # airbyte-cdk/python/unit_tests/sources/test_abstract_source.py # airbyte-cdk/python/unit_tests/sources/test_connector_state_manager.py
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
…8% in PR #44444 (`artem1205/airbyte-cdk-protocol-dataclasses-serpyco-rs`) Certainly! Here is an optimized version of the provided Python program. I've streamlined the import statements, removed redundant lines, and optimized the `airbyte_message_to_string` method to eliminate repetitive operations and boost performance. ### Key Optimizations. 1. **Streamlined Imports**: Removed unnecessary imports and redundancies for clarity and faster loading. 2. **Inheritance**: Used inheritance to avoid redefining `AirbyteMessage`, since it largely overlaps with `OriginalAirbyteMessage`. 3. **Static method optimization**: The `airbyte_message_to_string` method was already efficient with `orjson.dumps()`, and I've ensured clean data serialization by direct encoding and decoding via methods that use low-level efficiencies. ### Notes. - `is_cloud_environment()` and `_init_internal_request_filter()` are assumed to be already defined somewhere in the imported modules. - Logging and exception handling initialization only needed to be called once, simplified setup within the initializer. This version of the code should be more efficient while maintaining the same logic and output as the original.
…` by 49% in PR #44444 (`artem1205/airbyte-cdk-protocol-dataclasses-serpyco-rs`) Certainly! Let's focus on optimizing the program for better performance. Here's the rewritten version. ### Changes made. 1. **Removed unnecessary list comprehension during initialization**. - Instead of initializing `self._messages` with a list comprehension and then appending to it, initialized it to an empty list and used a separate method `_parse_messages` to append messages directly. - This reduces the call overhead associated with list comprehensions and error handling inside them. 2. **Optimized `_get_trace_message_by_trace_type`**. - Directly filtered `self._messages` in a single pass reducing unnecessary function calls and memory allocations. These changes aim to streamline the code execution and reduce unnecessary overhead, particularly in the initialization phase.
…(`artem1205/airbyte-cdk-protocol-dataclasses-serpyco-rs`) To optimize the given Python program for faster execution, we'll focus on improving the list comprehension within the `__init__` method and optimizing the `is_in_logs` method. We will use more efficient coding practices and eliminate unnecessary operations where possible. Here's the optimized version of the program. ### Changes and Improvements. 1. **List Comprehension to For Loop**: Converted the list comprehension in `__init__` method to a `for` loop for clarity and debugging ease. 2. **Cache the Compiled Regex**: In `is_in_logs`, precompiled the regex pattern outside the loop. This avoids recompiling the pattern on every iteration which is more efficient. 3. **Direct Filtering in Logs**: Simplified `_get_message_by_types` logic by using direct filtering inline in the `logs` property.
…em1205/airbyte-cdk-protocol-dataclasses-serpyco-rs`) To optimize the given Python program for better performance, you can minimize redundant accesses and checks, and utilize pattern matching efficiently. Here’s an optimized version. ### Explanation. - The `message.type` and other repeated attribute accesses are cached to local variables to avoid redundant attribute lookups. - The logic and functionality remain the same to ensure the return value is the same as before. - Slight restructuring results in cleaner and more efficient code. This method aims to minimize the overhead of attribute access and logical checks, which can contribute to a faster runtime, especially when this function is called frequently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great just a few questions!
@@ -62,7 +61,7 @@ def __init__( | |||
self._streams_done: Set[str] = set() | |||
self._exceptions_per_stream_name: dict[str, List[Exception]] = {} | |||
|
|||
def on_partition_generation_completed(self, sentinel: PartitionGenerationCompletedSentinel) -> Iterable[AirbyteMessage]: | |||
def on_partition_generation_completed(self, sentinel: PartitionGenerationCompletedSentinel) -> Iterable[AirbyteMessage | None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the iterable really include None
s? This sounds difficult to work with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mypy
raises an error due to start_next_partition_generator
, that can return None
I did not really dig into this class, only fixed mypy
. I suppose we'll refactor this much during the next project: concurrency for Low-Code
airbyte/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
Lines 160 to 179 in feded1f
def start_next_partition_generator(self) -> Optional[AirbyteMessage]: | |
""" | |
Start the next partition generator. | |
1. Pop the next stream to read from | |
2. Submit the partition generator to the thread pool manager | |
3. Add the stream to the list of streams currently generating partitions | |
4. Return a stream status message | |
""" | |
if self._stream_instances_to_start_partition_generation: | |
stream = self._stream_instances_to_start_partition_generation.pop(0) | |
self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) | |
self._streams_currently_generating_partitions.append(stream.name) | |
self._logger.info(f"Marking stream {stream.name} as STARTED") | |
self._logger.info(f"Syncing stream: {stream.name} ") | |
return stream_status_as_airbyte_message( | |
stream.as_airbyte_stream(), | |
AirbyteStreamStatus.STARTED, | |
) | |
else: | |
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's just for mypy, let's add a type ignore. Otherwise, the type will indicate that it is valid to yield None, which might not be the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignored
updates the `airbyte-protocol-models` dependency to a version that uses dataclasses models. | ||
|
||
The changes to Airbyte CDK itself are backwards-compatible, but some changes are required if the connector: | ||
- uses the `airbyte_protocol` models directly, or `airbyte_cdk.models`, which points to `airbyte_protocol` models |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@artem1205 do you know if this will break any connectors?
If yes, I'm fine with upgrading to 5.0.0, but if not, I think it's preferable to stay on 4.X because of how difficult it still is to keep connectors on the latest major version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CAT tests were ✅ for source-:
- hardcoded-records
- zendesk-support
- shopify
These changes will not break anything, unless airbyte_protocol is imported directly (not as a part of airbyte_cdk.models
)
I'll change the versioning to 4.x then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you check if any connector directly import airbyte_protocol? It might be worth fixing them first to avoid the need to flag it as a breaking change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With grep and excluding test folders got 18
connectors:
grep -r --include="*.py" --exclude-dir="unit_tests" --exclude-dir="integration_tests" "from airbyte_protocol" ./airbyte-integrations/connectors/
- /source-linkedin-ads/
- /source-shopify/
- /source-amazon-ads/
- /source-mixpanel/
- /source-amazon-seller-partner/
- /source-bing-ads/
- /source-google-ads/
- /source-salesforce/
- /source-stripe/
- /source-mailchimp/
- /destination-snowflake-cortex/
- /source-airtable/
- /source-zendesk-support/
- /source-marketo/
- /destination-pinecone/
- /source-pinterest/
- /source-github/
Do you want me to fix all of them before merging this PR?
…dk-protocol-dataclasses-serpyco-rs # Conflicts: # airbyte-cdk/python/poetry.lock # airbyte-cdk/python/unit_tests/sources/streams/http/error_handlers/test_json_error_message_parser.py
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
…`artem1205/airbyte-cdk-protocol-dataclasses-serpyco-rs`) To optimize the given Python program, we'll focus on reducing redundancy, unnecessary object creations, and repetitive computations. 1. **Avoid Redundant Imports:** Ensure to import only necessary modules and classes. 2. **Optimize `filter_secrets`:** Use `str.replace` in a more efficient loop. 3. **Optimize `AirbyteLogFormatter.format`:** Avoid redundant computations and direct return strings where possible. Below is the optimized code. ### Changes and Optimizations. 1. **Consolidated Imports:** Removed redundant and unused imports, keeping the necessary ones. 2. **Enhanced `extract_extra_args_from_record`:** Initialized `default_attrs` once in the constructor to avoid recalculating it for each log record. 3. **Direct Use `base_filter_secrets`:** Refactored to directly use the `base_filter_secrets` function. 4. **Use Efficient JSON Libraries:** Continued the use of `orjson` for JSON serialization for performance efficiency.
…` by 72% in PR #44444 (`artem1205/airbyte-cdk-protocol-dataclasses-serpyco-rs`) To optimize this code, several improvements can be made. We can avoid unnecessary deep copies, reduce redundant checks, and simplify certain parts of the code for better readability and performance. Here's the optimized version. ### Key Improvements. 1. **Removal of `copy.deepcopy`**: The use of `copy.deepcopy` was unnecessary since the original code did not mutate the state. 2. **Single Dictionary Update**: Combined two updates into one to reduce the number of dictionary operations in `AirbyteStateBlob.__init__`. 3. **Simplified Boolean Checks**: Simplified boolean checks and avoided redundant type checks for performance. 4. **Removed Redundant Comments**: Retained only essential comments to keep the codebase clean and easy to read. These changes should make the code more efficient and optimize its runtime and memory usage.
/approve-regression-tests
|
What
Resolve https://github.com/airbytehq/airbyte-internal-issues/issues/8945 by use of dataclasses and serpyco-rs for serialization
Following #44026
How
dataclasses
and correspondingSerializer
s from serpyco-rsTip
Example of serialization
dataclass
->str
:Review guide
airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
airbyte-cdk/python/airbyte_cdk/entrypoint.py
Note
All Serializers are in
airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Warning
Excel Parser
parse_
method changed toto_json
, becauseto_dict()
method returns datetime values inpandas.Timestamp
values, which are not serializable byorjson
. Seeairbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/excel_parser.py
To align with previous pydantic serialization logic,
date_format="iso", date_unit="us"
(microseconds) will be used, since pydantic cannot handlenanoseconds
andstrips
seconds to 6 digits, i.e.microseconds
)Polars dataframe.to_dicts() has the same behaviour
Code Example
Warning
Avro Parser
_to_output_value
willdecode
bytes
tostring
, cause bytes are not serializable byorjson
. Seeairbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/avro_parser.py
, Slack threadUser Impact
-- changes in protocol models only
Tip
It is recommended to import
protocol
classes not directly byimport airbyte_protocol
statement, but fromaribyte_cdk.models
packageNote
All Serializers have
omit_none=True
parameter that is applied recursively. Thus, all None values are excluded from output.This is expected behaviour and does not break anything in protocol.
Tests
Test with docker run; 10_000_000 records; stream: dummy_records
/usr/bin/time -h docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-hardcoded-records:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json > /tmp/test.txt
10,000,000 records are about 1,770 MB.
Test with local platform (deployed with
abctl
)24,000,000 records are about 4,270 MB.
Can this PR be safely reverted and rolled back?