-
Notifications
You must be signed in to change notification settings - Fork 4.5k
bulk-cdk-core-base: optimized record serialization #44880
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
b3103b3
2f0c2df
3b0b7da
4866d78
e306dfa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ | ||
package io.airbyte.cdk.output | ||
|
||
import com.fasterxml.jackson.core.JsonGenerator | ||
import com.fasterxml.jackson.databind.SequenceWriter | ||
import io.airbyte.cdk.util.Jsons | ||
import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage | ||
|
@@ -11,15 +12,18 @@ import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage | |
import io.airbyte.protocol.models.v0.AirbyteLogMessage | ||
import io.airbyte.protocol.models.v0.AirbyteMessage | ||
import io.airbyte.protocol.models.v0.AirbyteRecordMessage | ||
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta | ||
import io.airbyte.protocol.models.v0.AirbyteStateMessage | ||
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage | ||
import io.airbyte.protocol.models.v0.AirbyteTraceMessage | ||
import io.airbyte.protocol.models.v0.ConnectorSpecification | ||
import io.micronaut.context.annotation.ConfigurationProperties | ||
import io.micronaut.context.annotation.DefaultImplementation | ||
import io.micronaut.context.annotation.Secondary | ||
import jakarta.inject.Singleton | ||
import java.io.ByteArrayOutputStream | ||
import java.time.Instant | ||
import java.util.concurrent.ConcurrentHashMap | ||
import java.util.function.Consumer | ||
|
||
/** Emits the [AirbyteMessage] instances produced by the connector. */ | ||
|
@@ -93,35 +97,73 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable { | |
// Used for integration tests. | ||
const val CONNECTOR_OUTPUT_FILE = "airbyte.connector.output.file" | ||
|
||
/** Configuration properties prefix for [StdoutOutputConsumer]. */ | ||
const val CONNECTOR_OUTPUT_STDOUT_PREFIX = "airbyte.connector.output.stdout" | ||
|
||
/** Default implementation of [OutputConsumer]. */ | ||
@Singleton | ||
@Secondary | ||
private class StdoutOutputConsumer : OutputConsumer { | ||
@ConfigurationProperties(CONNECTOR_OUTPUT_STDOUT_PREFIX) | ||
private class StdoutOutputConsumer( | ||
/** | ||
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the | ||
* buffer's size (in bytes) grows past this value. | ||
* | ||
* Flushing the record buffer to stdout is done by calling [println] which is synchronized. The | ||
* choice of [println] is imposed by our use of the ConsoleJSONAppender log4j appended which | ||
* concurrently calls [println] to print [AirbyteMessage]s of type LOG to standard output. | ||
* | ||
* Because calling [println] incurs both a synchronization overhead and a syscall overhead, the | ||
* connector's performance will noticeably degrade if it's called too often. This happens | ||
* primarily when emitting lots of tiny RECORD messages, which is typical of source connectors. | ||
* | ||
* For this reason, the [bufferByteSizeThresholdForFlush] value should not be too small. The | ||
* default value of 4kB is good in this respect. For example, if the average serialized record | ||
* size is 100 bytes, this will reduce the volume of [println] calls by a factor of 40. | ||
* | ||
* Conversely, the [bufferByteSizeThresholdForFlush] value should also not be too large. | ||
* Otherwise, the output becomes bursty and this also degrades performance. As of today (and | ||
* hopefully not for long) the platform still pipes the connector's stdout into socat to emit | ||
* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any | ||
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes. | ||
*/ | ||
val bufferByteSizeThresholdForFlush: Int = 4 * 1024 | ||
) : OutputConsumer { | ||
override val emittedAt: Instant = Instant.now() | ||
|
||
private val buffer = ByteArrayOutputStream() | ||
postamar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(buffer) | ||
private val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer) | ||
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(jsonGenerator) | ||
|
||
override fun accept(airbyteMessage: AirbyteMessage) { | ||
// This method effectively println's its JSON-serialized argument. | ||
// Using println is not particularly efficient, however. | ||
// To improve performance, this method accumulates RECORD messages into a buffer | ||
// before writing them to standard output in a batch. | ||
// Other Airbyte message types are not buffered, instead they trigger an immediate flush. | ||
// Such messages should not linger indefinitely in a buffer. | ||
val isRecord: Boolean = airbyteMessage.type == AirbyteMessage.Type.RECORD | ||
synchronized(this) { | ||
if (buffer.size() > 0) { | ||
buffer.write('\n'.code) | ||
} | ||
sequenceWriter.write(airbyteMessage) | ||
sequenceWriter.flush() | ||
if (!isRecord || buffer.size() >= BUFFER_MAX_SIZE) { | ||
if (airbyteMessage.type == AirbyteMessage.Type.RECORD) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why record only? Everything seems to be preserved in the original order in the buffer so I feel like all airbyteMessages can be applied here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're correct in that we could apply this to all message types. The reasons I'm doing it just for records are:
|
||
// RECORD messages undergo a different serialization scheme. | ||
accept(airbyteMessage.record) | ||
} else { | ||
synchronized(this) { | ||
// Write a newline character to the buffer if it's not empty. | ||
withLockMaybeWriteNewline() | ||
// Non-RECORD AirbyteMessage instances are serialized and written to the buffer | ||
// using standard jackson object mapping facilities. | ||
sequenceWriter.write(airbyteMessage) | ||
sequenceWriter.flush() | ||
// Such messages don't linger in the buffer, they are flushed to stdout immediately, | ||
// along with whatever might have already been lingering inside. | ||
// This prints a newline after the message. | ||
withLockFlush() | ||
} | ||
} | ||
} | ||
|
||
private fun withLockMaybeWriteNewline() { | ||
if (buffer.size() > 0) { | ||
buffer.write('\n'.code) | ||
} | ||
} | ||
|
||
override fun close() { | ||
synchronized(this) { | ||
// Flush any remaining buffer contents to stdout before closing. | ||
|
@@ -136,7 +178,102 @@ private class StdoutOutputConsumer : OutputConsumer { | |
} | ||
} | ||
|
||
override fun accept(record: AirbyteRecordMessage) { | ||
evantahler marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// The serialization of RECORD messages can become a performance bottleneck for source | ||
// connectors because they can come in much higher volumes than other message types. | ||
// Specifically, with jackson, the bottleneck is in the object mapping logic. | ||
// As it turns out, this object mapping logic is not particularly useful for RECORD messages | ||
// because within a given stream the only variations occur in the "data" and the "meta" | ||
// fields: | ||
// - the "data" field is already an ObjectNode and is cheap to serialize, | ||
// - the "meta" field is often unset. | ||
// For this reason, this method builds and reuses a JSON template for each stream. | ||
// Then, for each record, it serializes just "data" and "meta" to populate the template. | ||
val template: RecordTemplate = getOrCreateRecordTemplate(record.stream, record.namespace) | ||
synchronized(this) { | ||
// Write a newline character to the buffer if it's not empty. | ||
withLockMaybeWriteNewline() | ||
// Write '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":'. | ||
buffer.write(template.prefix) | ||
// Serialize the record data ObjectNode to JSON, writing it to the buffer. | ||
Jsons.writeTree(jsonGenerator, record.data) | ||
jsonGenerator.flush() | ||
// If the record has a AirbyteRecordMessageMeta instance set, | ||
// write ',"meta":' followed by the serialized meta. | ||
val meta: AirbyteRecordMessageMeta? = record.meta | ||
if (meta != null) { | ||
buffer.write(metaPrefixBytes) | ||
sequenceWriter.write(meta) | ||
sequenceWriter.flush() | ||
} | ||
// Write ',"emitted_at":...}}'. | ||
buffer.write(template.suffix) | ||
// Flush the buffer to stdout only once it has reached a certain size. | ||
// Flushing to stdout incurs some overhead (mutex, syscall, etc.) | ||
// which otherwise becomes very apparent when lots of tiny records are involved. | ||
if (buffer.size() >= bufferByteSizeThresholdForFlush) { | ||
withLockFlush() | ||
} | ||
} | ||
} | ||
|
||
private val metaPrefixBytes: ByteArray = META_PREFIX.toByteArray() | ||
|
||
private fun getOrCreateRecordTemplate(stream: String, namespace: String?): RecordTemplate { | ||
val streamToTemplateMap: StreamToTemplateMap = | ||
if (namespace == null) { | ||
unNamespacedTemplates | ||
} else { | ||
namespacedTemplates.getOrPut(namespace) { StreamToTemplateMap() } | ||
} | ||
return streamToTemplateMap.getOrPut(stream) { | ||
RecordTemplate.create(stream, namespace, emittedAt) | ||
} | ||
} | ||
|
||
private val namespacedTemplates = ConcurrentHashMap<String, StreamToTemplateMap>() | ||
private val unNamespacedTemplates = StreamToTemplateMap() | ||
|
||
companion object { | ||
const val META_PREFIX = ""","meta":""" | ||
} | ||
} | ||
|
||
private typealias StreamToTemplateMap = ConcurrentHashMap<String, RecordTemplate> | ||
|
||
private class RecordTemplate( | ||
/** [prefix] is '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":' */ | ||
val prefix: ByteArray, | ||
/** [suffix] is ',"emitted_at":...}}' */ | ||
val suffix: ByteArray, | ||
) { | ||
companion object { | ||
const val BUFFER_MAX_SIZE = 1024 * 1024 | ||
fun create(stream: String, namespace: String?, emittedAt: Instant): RecordTemplate { | ||
// Generate a dummy AirbyteRecordMessage instance for the given args | ||
// using an empty object (i.e. '{}') for the "data" field value. | ||
val recordMessage = | ||
AirbyteRecordMessage() | ||
.withStream(stream) | ||
.withNamespace(namespace) | ||
.withEmittedAt(emittedAt.toEpochMilli()) | ||
.withData(Jsons.objectNode()) | ||
// Generate the corresponding dummy AirbyteMessage instance. | ||
val airbyteMessage = | ||
AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(recordMessage) | ||
// Serialize to JSON. | ||
val json: String = Jsons.writeValueAsString(airbyteMessage) | ||
// Split the string in 2 around the '"data":{}' substring. | ||
val parts: List<String> = json.split(splitRegex, limit = 2) | ||
postamar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
require(parts.size == 2) { "expected to find $splitRegex in $json" } | ||
// Re-attach the '"data":' substring to the first part | ||
// and return both parts in a RecordTemplate instance for this stream. | ||
return RecordTemplate( | ||
prefix = (parts.first() + DATA_PREFIX).toByteArray(), | ||
suffix = parts.last().toByteArray() | ||
) | ||
} | ||
|
||
private const val DATA_PREFIX = """"data":""" | ||
private val splitRegex: Regex = Regex.fromLiteral("$DATA_PREFIX{}") | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode | |
import io.airbyte.cdk.command.OpaqueStateValue | ||
import io.airbyte.cdk.output.OutputConsumer | ||
import io.airbyte.cdk.util.Jsons | ||
import io.airbyte.protocol.models.v0.AirbyteMessage | ||
import io.airbyte.protocol.models.v0.AirbyteRecordMessage | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
import java.util.concurrent.atomic.AtomicLong | ||
|
@@ -48,15 +47,10 @@ sealed class JdbcPartitionReader<P : JdbcPartition<*>>( | |
private val outData: ObjectNode = Jsons.objectNode() | ||
|
||
private val msg = | ||
AirbyteMessage() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Any way to more strongly discourage the use of this object in the framework? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you elaborate, please? I'm not sure what you mean. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A nit, and it might not matter if this is an internal concept, but if the partitionReaders we make in the toolkit should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I see. No, it's not quite like that. You can use either-or and the outcome is the same! Therefore, since it's the same, let's use the more specific |
||
.withType(AirbyteMessage.Type.RECORD) | ||
.withRecord( | ||
AirbyteRecordMessage() | ||
.withEmittedAt(outputConsumer.emittedAt.toEpochMilli()) | ||
.withStream(stream.name) | ||
.withNamespace(stream.namespace) | ||
.withData(outData) | ||
) | ||
AirbyteRecordMessage() | ||
.withStream(stream.name) | ||
.withNamespace(stream.namespace) | ||
.withData(outData) | ||
|
||
val streamFieldNames: List<String> = stream.fields.map { it.id } | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the default used to be 1MB instead of 4kB. I used to think this value didn't matter much, but I've changed my mind after looking at profiles from connectors in Airbyte Cloud:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious: what is the average/50th percentile size of a record message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question! It depends on the stream. I figure there's enough "thin" tables out there to make this optimization worth it. By "thin" I'm thinking of less than 5 columns, which are numbers/short strings/uuids/timestamps.