Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airbyte-cdk): replace pydantic BaseModel with dataclasses + serpyco-rs in protocol #44444

Merged

Conversation

artem1205
Copy link
Contributor

@artem1205 artem1205 commented Aug 20, 2024

What

Resolve https://github.com/airbytehq/airbyte-internal-issues/issues/8945 by use of dataclasses and serpyco-rs for serialization

Following #44026

How

  1. Dataclass models of protocol is used from, PR: feat: add version with dataclasses airbyte-protocol#91
  2. Refactor to use dataclasses and corresponding Serializers from serpyco-rs

Tip

Example of serialization dataclass -> str:

orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()

Review guide

  1. airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
  2. airbyte-cdk/python/airbyte_cdk/entrypoint.py

Note

All Serializers are in airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py

Warning

Excel Parser parse_ method changed to to_json, because to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson. See airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/excel_parser.py
To align with previous pydantic serialization logic, date_format="iso", date_unit="us" (microseconds) will be used, since pydantic cannot handle nanoseconds and strips seconds to 6 digits, i.e. microseconds)
Polars dataframe.to_dicts() has the same behaviour

Code Example
from pydantic import BaseModel
import pandas as pd
from typing import Dict, Any
class MyModel(BaseModel):
    timestamp: Dict[str, Any]
# Create an instance of the model
model_instance = MyModel(timestamp={'ss': pd.Timestamp('2023-08-27 10:00:00')})
# Serialize to JSON
print(model_instance.model_dump_json())
{"timestamp":{"ss":"2023-08-27T10:00:00"}}
model_instance = MyModel(timestamp={'ss': pd.Timestamp('2023-08-27 10:00:00.12')})
print(model_instance.model_dump_json())
{"timestamp":{"ss":"2023-08-27T10:00:00.120000"}}
model_instance = MyModel(timestamp={'ss': pd.Timestamp('2023-08-27 10:00:00.123456')})
print(model_instance.model_dump_json())
{"timestamp":{"ss":"2023-08-27T10:00:00.123456"}}
model_instance = MyModel(timestamp={'ss': pd.Timestamp('2023-08-27 10:00:00.123456789')}) # nanoseconds are missing
print(model_instance.model_dump_json())
{"timestamp":{"ss":"2023-08-27T10:00:00.123456"}}

Warning

Avro Parser _to_output_value will decode bytes to string, cause bytes are not serializable by orjson. See airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/avro_parser.py, Slack thread

User Impact

-- changes in protocol models only

Tip

It is recommended to import protocol classes not directly by import airbyte_protocol statement, but from aribyte_cdk.models package

Note

All Serializers have omit_none=True parameter that is applied recursively. Thus, all None values are excluded from output.
This is expected behaviour and does not break anything in protocol.

Tests

Test with docker run; 10_000_000 records; stream: dummy_records

/usr/bin/time -h docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-hardcoded-records:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json > /tmp/test.txt

10,000,000 records are about 1,770 MB.

TIME real user sys Mb/s
Before (0.0.6) 2m37.51s 3.83s 14.85s 11.27
After (dev) 1m33.95s (1.67x) 2.71s 7.53s 18.8 (1.67x)

Test with local platform (deployed with abctl)

24,000,000 records are about 4,270 MB.

Version Platform sync time E2E Mb/s
Before (0.0.6) 10m 43s 6.64
After (dev) 5m 42s 12.48 (1.88x)

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
…dk-protocol-dataclasses

# Conflicts:
#	airbyte-cdk/python/poetry.lock
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
Copy link

vercel bot commented Aug 20, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Aug 30, 2024 9:29pm

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Aug 20, 2024
@artem1205 artem1205 changed the title Artem1205/airbyte cdk protocol dataclasses serpyco rs feat(airbyte-cdk): replace pydantic BaseModel with dataclasses + serpyco-rs in protocol Aug 20, 2024
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
…205/airbyte-cdk-protocol-dataclasses-serpyco-rs

# Conflicts:
#	airbyte-cdk/python/airbyte_cdk/sources/connector_state_manager.py
#	airbyte-cdk/python/unit_tests/sources/file_based/stream/concurrent/test_file_based_concurrent_cursor.py
#	airbyte-cdk/python/unit_tests/sources/test_abstract_source.py
#	airbyte-cdk/python/unit_tests/sources/test_connector_state_manager.py
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
@artem1205 artem1205 self-assigned this Aug 22, 2024
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci]

Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
@octavia-squidington-iii octavia-squidington-iii removed the area/connectors Connector related issues label Aug 28, 2024
codeflash-ai bot added a commit that referenced this pull request Aug 28, 2024
…8% in PR #44444 (`artem1205/airbyte-cdk-protocol-dataclasses-serpyco-rs`)

Certainly! Here is an optimized version of the provided Python program. I've streamlined the import statements, removed redundant lines, and optimized the `airbyte_message_to_string` method to eliminate repetitive operations and boost performance.



### Key Optimizations.
1. **Streamlined Imports**: Removed unnecessary imports and redundancies for clarity and faster loading.
2. **Inheritance**: Used inheritance to avoid redefining `AirbyteMessage`, since it largely overlaps with `OriginalAirbyteMessage`.
3. **Static method optimization**: The `airbyte_message_to_string` method was already efficient with `orjson.dumps()`, and I've ensured clean data serialization by direct encoding and decoding via methods that use low-level efficiencies.

### Notes.
- `is_cloud_environment()` and `_init_internal_request_filter()` are assumed to be already defined somewhere in the imported modules.
- Logging and exception handling initialization only needed to be called once, simplified setup within the initializer.
  
This version of the code should be more efficient while maintaining the same logic and output as the original.
codeflash-ai bot added a commit that referenced this pull request Aug 28, 2024
…` by 49% in PR #44444 (`artem1205/airbyte-cdk-protocol-dataclasses-serpyco-rs`)

Certainly! Let's focus on optimizing the program for better performance. Here's the rewritten version.


### Changes made.
1. **Removed unnecessary list comprehension during initialization**.
    - Instead of initializing `self._messages` with a list comprehension and then appending to it, initialized it to an empty list and used a separate method `_parse_messages` to append messages directly.
    - This reduces the call overhead associated with list comprehensions and error handling inside them.

2. **Optimized `_get_trace_message_by_trace_type`**.
    - Directly filtered `self._messages` in a single pass reducing unnecessary function calls and memory allocations.

These changes aim to streamline the code execution and reduce unnecessary overhead, particularly in the initialization phase.
codeflash-ai bot added a commit that referenced this pull request Aug 28, 2024
…(`artem1205/airbyte-cdk-protocol-dataclasses-serpyco-rs`)

To optimize the given Python program for faster execution, we'll focus on improving the list comprehension within the `__init__` method and optimizing the `is_in_logs` method. We will use more efficient coding practices and eliminate unnecessary operations where possible.

Here's the optimized version of the program.



### Changes and Improvements.
1. **List Comprehension to For Loop**: Converted the list comprehension in `__init__` method to a `for` loop for clarity and debugging ease.
2. **Cache the Compiled Regex**: In `is_in_logs`, precompiled the regex pattern outside the loop. This avoids recompiling the pattern on every iteration which is more efficient.
3. **Direct Filtering in Logs**: Simplified `_get_message_by_types` logic by using direct filtering inline in the `logs` property.
codeflash-ai bot added a commit that referenced this pull request Aug 28, 2024
…em1205/airbyte-cdk-protocol-dataclasses-serpyco-rs`)

To optimize the given Python program for better performance, you can minimize redundant accesses and checks, and utilize pattern matching efficiently. Here’s an optimized version.



### Explanation.
- The `message.type` and other repeated attribute accesses are cached to local variables to avoid redundant attribute lookups.
- The logic and functionality remain the same to ensure the return value is the same as before.
- Slight restructuring results in cleaner and more efficient code.

This method aims to minimize the overhead of attribute access and logical checks, which can contribute to a faster runtime, especially when this function is called frequently.
@airbytehq airbytehq deleted a comment from codeflash-ai bot Aug 29, 2024
@airbytehq airbytehq deleted a comment from codeflash-ai bot Aug 29, 2024
@airbytehq airbytehq deleted a comment from codeflash-ai bot Aug 29, 2024
@airbytehq airbytehq deleted a comment from codeflash-ai bot Aug 29, 2024
Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great just a few questions! :shipit:

@@ -62,7 +61,7 @@ def __init__(
self._streams_done: Set[str] = set()
self._exceptions_per_stream_name: dict[str, List[Exception]] = {}

def on_partition_generation_completed(self, sentinel: PartitionGenerationCompletedSentinel) -> Iterable[AirbyteMessage]:
def on_partition_generation_completed(self, sentinel: PartitionGenerationCompletedSentinel) -> Iterable[AirbyteMessage | None]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the iterable really include Nones? This sounds difficult to work with

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mypy raises an error due to start_next_partition_generator, that can return None
I did not really dig into this class, only fixed mypy. I suppose we'll refactor this much during the next project: concurrency for Low-Code

def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
"""
Start the next partition generator.
1. Pop the next stream to read from
2. Submit the partition generator to the thread pool manager
3. Add the stream to the list of streams currently generating partitions
4. Return a stream status message
"""
if self._stream_instances_to_start_partition_generation:
stream = self._stream_instances_to_start_partition_generation.pop(0)
self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
self._streams_currently_generating_partitions.append(stream.name)
self._logger.info(f"Marking stream {stream.name} as STARTED")
self._logger.info(f"Syncing stream: {stream.name} ")
return stream_status_as_airbyte_message(
stream.as_airbyte_stream(),
AirbyteStreamStatus.STARTED,
)
else:
return None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's just for mypy, let's add a type ignore. Otherwise, the type will indicate that it is valid to yield None, which might not be the case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignored

updates the `airbyte-protocol-models` dependency to a version that uses dataclasses models.

The changes to Airbyte CDK itself are backwards-compatible, but some changes are required if the connector:
- uses the `airbyte_protocol` models directly, or `airbyte_cdk.models`, which points to `airbyte_protocol` models
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artem1205 do you know if this will break any connectors?

If yes, I'm fine with upgrading to 5.0.0, but if not, I think it's preferable to stay on 4.X because of how difficult it still is to keep connectors on the latest major version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CAT tests were ✅ for source-:

  • hardcoded-records
  • zendesk-support
  • shopify

These changes will not break anything, unless airbyte_protocol is imported directly (not as a part of airbyte_cdk.models)

I'll change the versioning to 4.x then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you check if any connector directly import airbyte_protocol? It might be worth fixing them first to avoid the need to flag it as a breaking change

Copy link
Contributor Author

@artem1205 artem1205 Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With grep and excluding test folders got 18 connectors:

grep -r --include="*.py" --exclude-dir="unit_tests" --exclude-dir="integration_tests" "from airbyte_protocol" ./airbyte-integrations/connectors/
  1. /source-linkedin-ads/
  2. /source-shopify/
  3. /source-amazon-ads/
  4. /source-mixpanel/
  5. /source-amazon-seller-partner/
  6. /source-bing-ads/
  7. /source-google-ads/
  8. /source-salesforce/
  9. /source-stripe/
  10. /source-mailchimp/
  11. /destination-snowflake-cortex/
  12. /source-airtable/
  13. /source-zendesk-support/
  14. /source-marketo/
  15. /destination-pinecone/
  16. /source-pinterest/
  17. /source-github/

Do you want me to fix all of them before merging this PR?

…dk-protocol-dataclasses-serpyco-rs

# Conflicts:
#	airbyte-cdk/python/poetry.lock
#	airbyte-cdk/python/unit_tests/sources/streams/http/error_handlers/test_json_error_message_parser.py
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
codeflash-ai bot added a commit that referenced this pull request Aug 30, 2024
…`artem1205/airbyte-cdk-protocol-dataclasses-serpyco-rs`)

To optimize the given Python program, we'll focus on reducing redundancy, unnecessary object creations, and repetitive computations.

1. **Avoid Redundant Imports:** Ensure to import only necessary modules and classes.
2. **Optimize `filter_secrets`:** Use `str.replace` in a more efficient loop.
3. **Optimize `AirbyteLogFormatter.format`:** Avoid redundant computations and direct return strings where possible.

Below is the optimized code.



### Changes and Optimizations.

1. **Consolidated Imports:** Removed redundant and unused imports, keeping the necessary ones.
2. **Enhanced `extract_extra_args_from_record`:** Initialized `default_attrs` once in the constructor to avoid recalculating it for each log record.
3. **Direct Use `base_filter_secrets`:** Refactored to directly use the `base_filter_secrets` function.
4. **Use Efficient JSON Libraries:** Continued the use of `orjson` for JSON serialization for performance efficiency.
codeflash-ai bot added a commit that referenced this pull request Aug 30, 2024
…` by 72% in PR #44444 (`artem1205/airbyte-cdk-protocol-dataclasses-serpyco-rs`)

To optimize this code, several improvements can be made. We can avoid unnecessary deep copies, reduce redundant checks, and simplify certain parts of the code for better readability and performance. Here's the optimized version.



### Key Improvements.

1. **Removal of `copy.deepcopy`**: The use of `copy.deepcopy` was unnecessary since the original code did not mutate the state.
2. **Single Dictionary Update**: Combined two updates into one to reduce the number of dictionary operations in `AirbyteStateBlob.__init__`.
3. **Simplified Boolean Checks**: Simplified boolean checks and avoided redundant type checks for performance.
4. **Removed Redundant Comments**: Retained only essential comments to keep the codebase clean and easy to read.

These changes should make the code more efficient and optimize its runtime and memory usage.
@airbytehq airbytehq deleted a comment from codeflash-ai bot Aug 31, 2024
@airbytehq airbytehq deleted a comment from codeflash-ai bot Sep 2, 2024
@artem1205
Copy link
Contributor Author

artem1205 commented Sep 2, 2024

/approve-regression-tests

Check job output.

✅ Approving regression tests

@artem1205 artem1205 merged commit df34893 into master Sep 2, 2024
32 of 35 checks passed
@artem1205 artem1205 deleted the artem1205/airbyte-cdk-protocol-dataclasses-serpyco-rs branch September 2, 2024 15:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking-change Don't merge me unless you are ready. CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants