Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk-cdk-core-base: optimized record serialization #44880

Merged
merged 5 commits into from
Aug 29, 2024

Conversation

postamar
Copy link
Contributor

@postamar postamar commented Aug 29, 2024

What

Informs https://github.com/airbytehq/airbyte-internal-issues/issues/9461

Follow-up to #44865

This PR optimizes the performance of the serialization of AirbyteMessage instances of type RECORD in the Bulk CDK by leveraging the fact that these instances don't actually need much of jackson's object mapping logic. The most significant datum in those messages is the data field value which is already an ObjectNode.

How

Here's a CPU profile for the source-oracle perf benchmark sync that I ran on my machine, zoomed in on StdoutOutputConsumer::accept:

Screenshot 2024-08-28 at 22 29 40

Notice how there's a whole bunch of BeanSerializer nonsense which shows up? That's because we're using the jackson object mapper to serialize a java class instance to JSON. It turns out that we can ditch that for RECORD messages, and then we get something much nicer looking:

Screenshot 2024-08-28 at 22 29 54

Review guide

The first commit has the significant change.
The second commit is a straightforward simplification made possible by the previous change.

User Impact

This change improves Bulk CDK source connector performance.

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

Copy link

vercel bot commented Aug 29, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Aug 29, 2024 8:07pm

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Aug 29, 2024
@postamar postamar force-pushed the postamar/whacky-perf-hacks branch from 37da628 to 4ecdff3 Compare August 29, 2024 02:18
@postamar postamar changed the title [WIP] bulk-cdk-core-base: savagely optimized record serialization [WIP] bulk-cdk-core-base: optimized record serialization Aug 29, 2024
@postamar postamar changed the title [WIP] bulk-cdk-core-base: optimized record serialization bulk-cdk-core-base: optimized record serialization Aug 29, 2024
@postamar postamar force-pushed the postamar/whacky-perf-hacks branch from 4ecdff3 to d9e89d4 Compare August 29, 2024 02:26
@postamar postamar marked this pull request as ready for review August 29, 2024 02:34
@postamar postamar requested review from a team as code owners August 29, 2024 02:34
@postamar postamar force-pushed the postamar/whacky-perf-hacks branch from d9e89d4 to 2f0c2df Compare August 29, 2024 15:21
* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes.
*/
val bufferByteSizeThresholdForFlush: Int = 4 * 1024
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the default used to be 1MB instead of 4kB. I used to think this value didn't matter much, but I've changed my mind after looking at profiles from connectors in Airbyte Cloud:

Copy link
Contributor

@xiaohansong xiaohansong Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: what is the average/50th percentile size of a record message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! It depends on the stream. I figure there's enough "thin" tables out there to make this optimization worth it. By "thin" I'm thinking of less than 5 columns, which are numbers/short strings/uuids/timestamps.

Copy link
Contributor

@evantahler evantahler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some usability nits from me! I assume since the tests all still pass, that the JSON we generate remains legit

@@ -48,15 +47,10 @@ sealed class JdbcPartitionReader<P : JdbcPartition<*>>(
private val outData: ObjectNode = Jsons.objectNode()

private val msg =
AirbyteMessage()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Any way to more strongly discourage the use of this object in the framework?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate, please? I'm not sure what you mean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nit, and it might not matter if this is an internal concept, but if the partitionReaders we make in the toolkit should use AirbyteRecordMessage rather than AirbyteMessage, is there a way to mark it deprecated or something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. No, it's not quite like that. You can use either-or and the outcome is the same! Therefore, since it's the same, let's use the more specific AirbyteRecordMessage.

* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes.
*/
val bufferByteSizeThresholdForFlush: Int = 4 * 1024
Copy link
Contributor

@xiaohansong xiaohansong Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: what is the average/50th percentile size of a record message?

sequenceWriter.write(airbyteMessage)
sequenceWriter.flush()
if (!isRecord || buffer.size() >= BUFFER_MAX_SIZE) {
if (airbyteMessage.type == AirbyteMessage.Type.RECORD) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why record only? Everything seems to be preserved in the original order in the buffer so I feel like all airbyteMessages can be applied here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct in that we could apply this to all message types. The reasons I'm doing it just for records are:

  1. If you ignore RECORDs and LOGs there's really not much left over! Most of the remainder are STATE which are much less frequent than RECORDs and other than that it's just a few TRACE and CONTROL. In other words they don't matter
  2. On the other hand suppose there are very few or no RECORDs in a stream; we don't want the TRACE and STATE messages to sit in the buffer indefinitely, instead, we want the platform to know about them immediately. It's going to inform the UI, it's going to impact the other connector, etc.

@postamar
Copy link
Contributor Author

Just some usability nits from me! I assume since the tests all still pass, that the JSON we generate remains legit

Yes, but come to think of it some more, it's only the CAT tests that check for this... I should make the integration tests use this.

@postamar postamar force-pushed the postamar/whacky-perf-hacks branch from b3f6ef8 to 4866d78 Compare August 29, 2024 17:18
private class StdoutOutputConsumer : OutputConsumer {
override val emittedAt: Instant = Instant.now()
private class StdoutOutputConsumer(
val stdout: PrintStream,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear how this is being wired in. Does micronaut default PrintStream to ConsolePrintStream? Couldn't find any doc around that behavior. Not sure how this is related to record serialization opt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Micronaut doesn't make those kinds of assumptions. To inject a non-bean instance it will look for a @Factory, in this case PrintStreamFactory defined further below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah my bad. Not sure how I missed it...

@postamar
Copy link
Contributor Author

Thanks for taking a look. None of these address the source-oracle perf issue, which is centered around writes somehow.

@postamar postamar enabled auto-merge (squash) August 29, 2024 20:08
@postamar postamar merged commit 130bff8 into master Aug 29, 2024
29 checks passed
@postamar postamar deleted the postamar/whacky-perf-hacks branch August 29, 2024 20:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants