-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bulk-cdk-core-base: optimized record serialization #44880
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
37da628
to
4ecdff3
Compare
4ecdff3
to
d9e89d4
Compare
d9e89d4
to
2f0c2df
Compare
* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any | ||
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes. | ||
*/ | ||
val bufferByteSizeThresholdForFlush: Int = 4 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the default used to be 1MB instead of 4kB. I used to think this value didn't matter much, but I've changed my mind after looking at profiles from connectors in Airbyte Cloud:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious: what is the average/50th percentile size of a record message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question! It depends on the stream. I figure there's enough "thin" tables out there to make this optimization worth it. By "thin" I'm thinking of less than 5 columns, which are numbers/short strings/uuids/timestamps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some usability nits from me! I assume since the tests all still pass, that the JSON we generate remains legit
airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/OutputConsumer.kt
Show resolved
Hide resolved
@@ -48,15 +47,10 @@ sealed class JdbcPartitionReader<P : JdbcPartition<*>>( | |||
private val outData: ObjectNode = Jsons.objectNode() | |||
|
|||
private val msg = | |||
AirbyteMessage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Any way to more strongly discourage the use of this object in the framework?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate, please? I'm not sure what you mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nit, and it might not matter if this is an internal concept, but if the partitionReaders we make in the toolkit should use AirbyteRecordMessage
rather than AirbyteMessage
, is there a way to mark it deprecated or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. No, it's not quite like that. You can use either-or and the outcome is the same! Therefore, since it's the same, let's use the more specific AirbyteRecordMessage
.
* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any | ||
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes. | ||
*/ | ||
val bufferByteSizeThresholdForFlush: Int = 4 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious: what is the average/50th percentile size of a record message?
sequenceWriter.write(airbyteMessage) | ||
sequenceWriter.flush() | ||
if (!isRecord || buffer.size() >= BUFFER_MAX_SIZE) { | ||
if (airbyteMessage.type == AirbyteMessage.Type.RECORD) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why record only? Everything seems to be preserved in the original order in the buffer so I feel like all airbyteMessages can be applied here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're correct in that we could apply this to all message types. The reasons I'm doing it just for records are:
- If you ignore RECORDs and LOGs there's really not much left over! Most of the remainder are STATE which are much less frequent than RECORDs and other than that it's just a few TRACE and CONTROL. In other words they don't matter
- On the other hand suppose there are very few or no RECORDs in a stream; we don't want the TRACE and STATE messages to sit in the buffer indefinitely, instead, we want the platform to know about them immediately. It's going to inform the UI, it's going to impact the other connector, etc.
Yes, but come to think of it some more, it's only the CAT tests that check for this... I should make the integration tests use this. |
b3f6ef8
to
4866d78
Compare
private class StdoutOutputConsumer : OutputConsumer { | ||
override val emittedAt: Instant = Instant.now() | ||
private class StdoutOutputConsumer( | ||
val stdout: PrintStream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not clear how this is being wired in. Does micronaut default PrintStream to ConsolePrintStream? Couldn't find any doc around that behavior. Not sure how this is related to record serialization opt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Micronaut doesn't make those kinds of assumptions. To inject a non-bean instance it will look for a @Factory
, in this case PrintStreamFactory
defined further below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah my bad. Not sure how I missed it...
airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/OutputConsumer.kt
Outdated
Show resolved
Hide resolved
airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/OutputConsumer.kt
Outdated
Show resolved
Hide resolved
Thanks for taking a look. None of these address the source-oracle perf issue, which is centered around writes somehow. |
What
Informs https://github.com/airbytehq/airbyte-internal-issues/issues/9461
Follow-up to #44865
This PR optimizes the performance of the serialization of
AirbyteMessage
instances of typeRECORD
in the Bulk CDK by leveraging the fact that these instances don't actually need much of jackson's object mapping logic. The most significant datum in those messages is thedata
field value which is already anObjectNode
.How
Here's a CPU profile for the source-oracle perf benchmark sync that I ran on my machine, zoomed in on
StdoutOutputConsumer::accept
:Notice how there's a whole bunch of
BeanSerializer
nonsense which shows up? That's because we're using the jackson object mapper to serialize a java class instance to JSON. It turns out that we can ditch that for RECORD messages, and then we get something much nicer looking:Review guide
The first commit has the significant change.
The second commit is a straightforward simplification made possible by the previous change.
User Impact
This change improves Bulk CDK source connector performance.
Can this PR be safely reverted and rolled back?