-
Notifications
You must be signed in to change notification settings - Fork 4.5k
bulk-cdk-core-base: optimized record serialization #44880
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
b3103b3
2f0c2df
3b0b7da
4866d78
e306dfa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ | ||
package io.airbyte.cdk.output | ||
|
||
import com.fasterxml.jackson.core.JsonGenerator | ||
import com.fasterxml.jackson.databind.SequenceWriter | ||
import io.airbyte.cdk.util.Jsons | ||
import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage | ||
|
@@ -11,15 +12,25 @@ import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage | |
import io.airbyte.protocol.models.v0.AirbyteLogMessage | ||
import io.airbyte.protocol.models.v0.AirbyteMessage | ||
import io.airbyte.protocol.models.v0.AirbyteRecordMessage | ||
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta | ||
import io.airbyte.protocol.models.v0.AirbyteStateMessage | ||
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage | ||
import io.airbyte.protocol.models.v0.AirbyteTraceMessage | ||
import io.airbyte.protocol.models.v0.ConnectorSpecification | ||
import io.micronaut.context.annotation.DefaultImplementation | ||
import io.micronaut.context.annotation.Factory | ||
import io.micronaut.context.annotation.Requires | ||
import io.micronaut.context.annotation.Secondary | ||
import io.micronaut.context.annotation.Value | ||
import io.micronaut.context.env.Environment | ||
import jakarta.inject.Singleton | ||
import java.io.ByteArrayOutputStream | ||
import java.io.FileOutputStream | ||
import java.io.PrintStream | ||
import java.nio.file.Path | ||
import java.time.Clock | ||
import java.time.Instant | ||
import java.util.concurrent.ConcurrentHashMap | ||
import java.util.function.Consumer | ||
|
||
/** Emits the [AirbyteMessage] instances produced by the connector. */ | ||
|
@@ -90,38 +101,79 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable { | |
} | ||
} | ||
|
||
/** Configuration properties prefix for [StdoutOutputConsumer]. */ | ||
const val CONNECTOR_OUTPUT_PREFIX = "airbyte.connector.output" | ||
|
||
// Used for integration tests. | ||
const val CONNECTOR_OUTPUT_FILE = "airbyte.connector.output.file" | ||
const val CONNECTOR_OUTPUT_FILE = "$CONNECTOR_OUTPUT_PREFIX.file" | ||
|
||
/** Default implementation of [OutputConsumer]. */ | ||
@Singleton | ||
@Secondary | ||
private class StdoutOutputConsumer : OutputConsumer { | ||
override val emittedAt: Instant = Instant.now() | ||
private class StdoutOutputConsumer( | ||
val stdout: PrintStream, | ||
clock: Clock, | ||
/** | ||
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the | ||
* buffer's size (in bytes) grows past this value. | ||
* | ||
* Flushing the record buffer to stdout is done by calling [println] which is synchronized. The | ||
* choice of [println] is imposed by our use of the ConsoleJSONAppender log4j appended which | ||
* concurrently calls [println] to print [AirbyteMessage]s of type LOG to standard output. | ||
* | ||
* Because calling [println] incurs both a synchronization overhead and a syscall overhead, the | ||
* connector's performance will noticeably degrade if it's called too often. This happens | ||
* primarily when emitting lots of tiny RECORD messages, which is typical of source connectors. | ||
* | ||
* For this reason, the [bufferByteSizeThresholdForFlush] value should not be too small. The | ||
* default value of 4kB is good in this respect. For example, if the average serialized record | ||
* size is 100 bytes, this will reduce the volume of [println] calls by a factor of 40. | ||
* | ||
* Conversely, the [bufferByteSizeThresholdForFlush] value should also not be too large. | ||
* Otherwise, the output becomes bursty and this also degrades performance. As of today (and | ||
* hopefully not for long) the platform still pipes the connector's stdout into socat to emit | ||
* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any | ||
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes. | ||
*/ | ||
@Value("\${$CONNECTOR_OUTPUT_PREFIX.buffer-byte-size-threshold-for-flush:4096}") | ||
val bufferByteSizeThresholdForFlush: Int, | ||
) : OutputConsumer { | ||
override val emittedAt: Instant = Instant.now(clock) | ||
|
||
private val buffer = ByteArrayOutputStream() | ||
postamar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(buffer) | ||
private val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer) | ||
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(jsonGenerator) | ||
|
||
override fun accept(airbyteMessage: AirbyteMessage) { | ||
// This method effectively println's its JSON-serialized argument. | ||
// Using println is not particularly efficient, however. | ||
// To improve performance, this method accumulates RECORD messages into a buffer | ||
// before writing them to standard output in a batch. | ||
// Other Airbyte message types are not buffered, instead they trigger an immediate flush. | ||
// Such messages should not linger indefinitely in a buffer. | ||
val isRecord: Boolean = airbyteMessage.type == AirbyteMessage.Type.RECORD | ||
synchronized(this) { | ||
if (buffer.size() > 0) { | ||
buffer.write('\n'.code) | ||
} | ||
sequenceWriter.write(airbyteMessage) | ||
sequenceWriter.flush() | ||
if (!isRecord || buffer.size() >= BUFFER_MAX_SIZE) { | ||
if (airbyteMessage.type == AirbyteMessage.Type.RECORD) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why record only? Everything seems to be preserved in the original order in the buffer so I feel like all airbyteMessages can be applied here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're correct in that we could apply this to all message types. The reasons I'm doing it just for records are:
|
||
// RECORD messages undergo a different serialization scheme. | ||
accept(airbyteMessage.record) | ||
} else { | ||
synchronized(this) { | ||
// Write a newline character to the buffer if it's not empty. | ||
withLockMaybeWriteNewline() | ||
// Non-RECORD AirbyteMessage instances are serialized and written to the buffer | ||
// using standard jackson object mapping facilities. | ||
sequenceWriter.write(airbyteMessage) | ||
sequenceWriter.flush() | ||
// Such messages don't linger in the buffer, they are flushed to stdout immediately, | ||
// along with whatever might have already been lingering inside. | ||
// This prints a newline after the message. | ||
withLockFlush() | ||
} | ||
} | ||
} | ||
|
||
private fun withLockMaybeWriteNewline() { | ||
if (buffer.size() > 0) { | ||
buffer.write('\n'.code) | ||
} | ||
} | ||
|
||
override fun close() { | ||
synchronized(this) { | ||
// Flush any remaining buffer contents to stdout before closing. | ||
|
@@ -131,12 +183,120 @@ private class StdoutOutputConsumer : OutputConsumer { | |
|
||
private fun withLockFlush() { | ||
if (buffer.size() > 0) { | ||
println(buffer.toString(Charsets.UTF_8)) | ||
stdout.println(buffer.toString(Charsets.UTF_8)) | ||
stdout.flush() | ||
buffer.reset() | ||
} | ||
} | ||
|
||
override fun accept(record: AirbyteRecordMessage) { | ||
evantahler marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// The serialization of RECORD messages can become a performance bottleneck for source | ||
// connectors because they can come in much higher volumes than other message types. | ||
// Specifically, with jackson, the bottleneck is in the object mapping logic. | ||
// As it turns out, this object mapping logic is not particularly useful for RECORD messages | ||
// because within a given stream the only variations occur in the "data" and the "meta" | ||
// fields: | ||
// - the "data" field is already an ObjectNode and is cheap to serialize, | ||
// - the "meta" field is often unset. | ||
// For this reason, this method builds and reuses a JSON template for each stream. | ||
// Then, for each record, it serializes just "data" and "meta" to populate the template. | ||
val template: RecordTemplate = getOrCreateRecordTemplate(record.stream, record.namespace) | ||
synchronized(this) { | ||
// Write a newline character to the buffer if it's not empty. | ||
withLockMaybeWriteNewline() | ||
// Write '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":'. | ||
buffer.write(template.prefix) | ||
// Serialize the record data ObjectNode to JSON, writing it to the buffer. | ||
Jsons.writeTree(jsonGenerator, record.data) | ||
jsonGenerator.flush() | ||
// If the record has a AirbyteRecordMessageMeta instance set, | ||
// write ',"meta":' followed by the serialized meta. | ||
val meta: AirbyteRecordMessageMeta? = record.meta | ||
if (meta != null) { | ||
buffer.write(metaPrefixBytes) | ||
sequenceWriter.write(meta) | ||
sequenceWriter.flush() | ||
} | ||
// Write ',"emitted_at":...}}'. | ||
buffer.write(template.suffix) | ||
// Flush the buffer to stdout only once it has reached a certain size. | ||
// Flushing to stdout incurs some overhead (mutex, syscall, etc.) | ||
// which otherwise becomes very apparent when lots of tiny records are involved. | ||
if (buffer.size() >= bufferByteSizeThresholdForFlush) { | ||
withLockFlush() | ||
} | ||
} | ||
} | ||
|
||
private val metaPrefixBytes: ByteArray = META_PREFIX.toByteArray() | ||
|
||
private fun getOrCreateRecordTemplate(stream: String, namespace: String?): RecordTemplate { | ||
val streamToTemplateMap: StreamToTemplateMap = | ||
if (namespace == null) { | ||
unNamespacedTemplates | ||
} else { | ||
namespacedTemplates.getOrPut(namespace) { StreamToTemplateMap() } | ||
} | ||
return streamToTemplateMap.getOrPut(stream) { | ||
RecordTemplate.create(stream, namespace, emittedAt) | ||
} | ||
} | ||
|
||
private val namespacedTemplates = ConcurrentHashMap<String, StreamToTemplateMap>() | ||
private val unNamespacedTemplates = StreamToTemplateMap() | ||
|
||
companion object { | ||
const val BUFFER_MAX_SIZE = 1024 * 1024 | ||
const val META_PREFIX = ""","meta":""" | ||
} | ||
} | ||
|
||
private typealias StreamToTemplateMap = ConcurrentHashMap<String, RecordTemplate> | ||
|
||
private class RecordTemplate( | ||
/** [prefix] is '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":' */ | ||
val prefix: ByteArray, | ||
/** [suffix] is ',"emitted_at":...}}' */ | ||
val suffix: ByteArray, | ||
) { | ||
companion object { | ||
fun create(stream: String, namespace: String?, emittedAt: Instant): RecordTemplate { | ||
// Generate a dummy AirbyteRecordMessage instance for the given args | ||
// using an empty object (i.e. '{}') for the "data" field value. | ||
val recordMessage = | ||
AirbyteRecordMessage() | ||
.withStream(stream) | ||
.withNamespace(namespace) | ||
.withEmittedAt(emittedAt.toEpochMilli()) | ||
.withData(Jsons.objectNode()) | ||
// Generate the corresponding dummy AirbyteMessage instance. | ||
val airbyteMessage = | ||
AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(recordMessage) | ||
// Serialize to JSON. | ||
val json: String = Jsons.writeValueAsString(airbyteMessage) | ||
// Split the string in 2 around the '"data":{}' substring. | ||
val parts: List<String> = json.split(splitRegex, limit = 2) | ||
postamar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
require(parts.size == 2) { "expected to find $splitRegex in $json" } | ||
// Re-attach the '"data":' substring to the first part | ||
// and return both parts in a RecordTemplate instance for this stream. | ||
return RecordTemplate( | ||
prefix = (parts.first() + DATA_PREFIX).toByteArray(), | ||
suffix = parts.last().toByteArray() | ||
) | ||
} | ||
|
||
private const val DATA_PREFIX = """"data":""" | ||
private val splitRegex: Regex = Regex.fromLiteral("$DATA_PREFIX{}") | ||
} | ||
} | ||
|
||
@Factory | ||
private class PrintStreamFactory { | ||
|
||
@Singleton @Requires(notEnv = [Environment.TEST]) fun stdout(): PrintStream = System.out | ||
|
||
@Singleton | ||
@Requires(env = [Environment.TEST]) | ||
@Requires(property = CONNECTOR_OUTPUT_FILE) | ||
fun file(@Value("\${$CONNECTOR_OUTPUT_FILE}") filePath: Path): PrintStream = | ||
PrintStream(FileOutputStream(filePath.toFile()), false, Charsets.UTF_8) | ||
} |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not clear how this is being wired in. Does micronaut default PrintStream to ConsolePrintStream? Couldn't find any doc around that behavior. Not sure how this is related to record serialization opt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Micronaut doesn't make those kinds of assumptions. To inject a non-bean instance it will look for a
@Factory
, in this casePrintStreamFactory
defined further below.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah my bad. Not sure how I missed it...