Skip to content

bulk-cdk-core-base: optimized record serialization #44880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.output

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.SequenceWriter
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage
Expand All @@ -11,15 +12,18 @@ import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage
import io.airbyte.protocol.models.v0.AirbyteLogMessage
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
import io.airbyte.protocol.models.v0.ConnectorSpecification
import io.micronaut.context.annotation.ConfigurationProperties
import io.micronaut.context.annotation.DefaultImplementation
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.io.ByteArrayOutputStream
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer

/** Emits the [AirbyteMessage] instances produced by the connector. */
Expand Down Expand Up @@ -93,35 +97,73 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
// Used for integration tests.
const val CONNECTOR_OUTPUT_FILE = "airbyte.connector.output.file"

/** Configuration properties prefix for [StdoutOutputConsumer]. */
const val CONNECTOR_OUTPUT_STDOUT_PREFIX = "airbyte.connector.output.stdout"

/** Default implementation of [OutputConsumer]. */
@Singleton
@Secondary
private class StdoutOutputConsumer : OutputConsumer {
@ConfigurationProperties(CONNECTOR_OUTPUT_STDOUT_PREFIX)
private class StdoutOutputConsumer(
/**
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the
* buffer's size (in bytes) grows past this value.
*
* Flushing the record buffer to stdout is done by calling [println] which is synchronized. The
* choice of [println] is imposed by our use of the ConsoleJSONAppender log4j appended which
* concurrently calls [println] to print [AirbyteMessage]s of type LOG to standard output.
*
* Because calling [println] incurs both a synchronization overhead and a syscall overhead, the
* connector's performance will noticeably degrade if it's called too often. This happens
* primarily when emitting lots of tiny RECORD messages, which is typical of source connectors.
*
* For this reason, the [bufferByteSizeThresholdForFlush] value should not be too small. The
* default value of 4kB is good in this respect. For example, if the average serialized record
* size is 100 bytes, this will reduce the volume of [println] calls by a factor of 40.
*
* Conversely, the [bufferByteSizeThresholdForFlush] value should also not be too large.
* Otherwise, the output becomes bursty and this also degrades performance. As of today (and
* hopefully not for long) the platform still pipes the connector's stdout into socat to emit
* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes.
*/
val bufferByteSizeThresholdForFlush: Int = 4 * 1024
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the default used to be 1MB instead of 4kB. I used to think this value didn't matter much, but I've changed my mind after looking at profiles from connectors in Airbyte Cloud:

Copy link
Contributor

@xiaohansong xiaohansong Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: what is the average/50th percentile size of a record message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! It depends on the stream. I figure there's enough "thin" tables out there to make this optimization worth it. By "thin" I'm thinking of less than 5 columns, which are numbers/short strings/uuids/timestamps.

) : OutputConsumer {
override val emittedAt: Instant = Instant.now()

private val buffer = ByteArrayOutputStream()
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(buffer)
private val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer)
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(jsonGenerator)

override fun accept(airbyteMessage: AirbyteMessage) {
// This method effectively println's its JSON-serialized argument.
// Using println is not particularly efficient, however.
// To improve performance, this method accumulates RECORD messages into a buffer
// before writing them to standard output in a batch.
// Other Airbyte message types are not buffered, instead they trigger an immediate flush.
// Such messages should not linger indefinitely in a buffer.
val isRecord: Boolean = airbyteMessage.type == AirbyteMessage.Type.RECORD
synchronized(this) {
if (buffer.size() > 0) {
buffer.write('\n'.code)
}
sequenceWriter.write(airbyteMessage)
sequenceWriter.flush()
if (!isRecord || buffer.size() >= BUFFER_MAX_SIZE) {
if (airbyteMessage.type == AirbyteMessage.Type.RECORD) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why record only? Everything seems to be preserved in the original order in the buffer so I feel like all airbyteMessages can be applied here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct in that we could apply this to all message types. The reasons I'm doing it just for records are:

  1. If you ignore RECORDs and LOGs there's really not much left over! Most of the remainder are STATE which are much less frequent than RECORDs and other than that it's just a few TRACE and CONTROL. In other words they don't matter
  2. On the other hand suppose there are very few or no RECORDs in a stream; we don't want the TRACE and STATE messages to sit in the buffer indefinitely, instead, we want the platform to know about them immediately. It's going to inform the UI, it's going to impact the other connector, etc.

// RECORD messages undergo a different serialization scheme.
accept(airbyteMessage.record)
} else {
synchronized(this) {
// Write a newline character to the buffer if it's not empty.
withLockMaybeWriteNewline()
// Non-RECORD AirbyteMessage instances are serialized and written to the buffer
// using standard jackson object mapping facilities.
sequenceWriter.write(airbyteMessage)
sequenceWriter.flush()
// Such messages don't linger in the buffer, they are flushed to stdout immediately,
// along with whatever might have already been lingering inside.
// This prints a newline after the message.
withLockFlush()
}
}
}

private fun withLockMaybeWriteNewline() {
if (buffer.size() > 0) {
buffer.write('\n'.code)
}
}

override fun close() {
synchronized(this) {
// Flush any remaining buffer contents to stdout before closing.
Expand All @@ -136,7 +178,102 @@ private class StdoutOutputConsumer : OutputConsumer {
}
}

override fun accept(record: AirbyteRecordMessage) {
// The serialization of RECORD messages can become a performance bottleneck for source
// connectors because they can come in much higher volumes than other message types.
// Specifically, with jackson, the bottleneck is in the object mapping logic.
// As it turns out, this object mapping logic is not particularly useful for RECORD messages
// because within a given stream the only variations occur in the "data" and the "meta"
// fields:
// - the "data" field is already an ObjectNode and is cheap to serialize,
// - the "meta" field is often unset.
// For this reason, this method builds and reuses a JSON template for each stream.
// Then, for each record, it serializes just "data" and "meta" to populate the template.
val template: RecordTemplate = getOrCreateRecordTemplate(record.stream, record.namespace)
synchronized(this) {
// Write a newline character to the buffer if it's not empty.
withLockMaybeWriteNewline()
// Write '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":'.
buffer.write(template.prefix)
// Serialize the record data ObjectNode to JSON, writing it to the buffer.
Jsons.writeTree(jsonGenerator, record.data)
jsonGenerator.flush()
// If the record has a AirbyteRecordMessageMeta instance set,
// write ',"meta":' followed by the serialized meta.
val meta: AirbyteRecordMessageMeta? = record.meta
if (meta != null) {
buffer.write(metaPrefixBytes)
sequenceWriter.write(meta)
sequenceWriter.flush()
}
// Write ',"emitted_at":...}}'.
buffer.write(template.suffix)
// Flush the buffer to stdout only once it has reached a certain size.
// Flushing to stdout incurs some overhead (mutex, syscall, etc.)
// which otherwise becomes very apparent when lots of tiny records are involved.
if (buffer.size() >= bufferByteSizeThresholdForFlush) {
withLockFlush()
}
}
}

private val metaPrefixBytes: ByteArray = META_PREFIX.toByteArray()

private fun getOrCreateRecordTemplate(stream: String, namespace: String?): RecordTemplate {
val streamToTemplateMap: StreamToTemplateMap =
if (namespace == null) {
unNamespacedTemplates
} else {
namespacedTemplates.getOrPut(namespace) { StreamToTemplateMap() }
}
return streamToTemplateMap.getOrPut(stream) {
RecordTemplate.create(stream, namespace, emittedAt)
}
}

private val namespacedTemplates = ConcurrentHashMap<String, StreamToTemplateMap>()
private val unNamespacedTemplates = StreamToTemplateMap()

companion object {
const val META_PREFIX = ""","meta":"""
}
}

private typealias StreamToTemplateMap = ConcurrentHashMap<String, RecordTemplate>

private class RecordTemplate(
/** [prefix] is '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":' */
val prefix: ByteArray,
/** [suffix] is ',"emitted_at":...}}' */
val suffix: ByteArray,
) {
companion object {
const val BUFFER_MAX_SIZE = 1024 * 1024
fun create(stream: String, namespace: String?, emittedAt: Instant): RecordTemplate {
// Generate a dummy AirbyteRecordMessage instance for the given args
// using an empty object (i.e. '{}') for the "data" field value.
val recordMessage =
AirbyteRecordMessage()
.withStream(stream)
.withNamespace(namespace)
.withEmittedAt(emittedAt.toEpochMilli())
.withData(Jsons.objectNode())
// Generate the corresponding dummy AirbyteMessage instance.
val airbyteMessage =
AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(recordMessage)
// Serialize to JSON.
val json: String = Jsons.writeValueAsString(airbyteMessage)
// Split the string in 2 around the '"data":{}' substring.
val parts: List<String> = json.split(splitRegex, limit = 2)
require(parts.size == 2) { "expected to find $splitRegex in $json" }
// Re-attach the '"data":' substring to the first part
// and return both parts in a RecordTemplate instance for this stream.
return RecordTemplate(
prefix = (parts.first() + DATA_PREFIX).toByteArray(),
suffix = parts.last().toByteArray()
)
}

private const val DATA_PREFIX = """"data":"""
private val splitRegex: Regex = Regex.fromLiteral("$DATA_PREFIX{}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
Expand Down Expand Up @@ -48,15 +47,10 @@ sealed class JdbcPartitionReader<P : JdbcPartition<*>>(
private val outData: ObjectNode = Jsons.objectNode()

private val msg =
AirbyteMessage()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Any way to more strongly discourage the use of this object in the framework?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate, please? I'm not sure what you mean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nit, and it might not matter if this is an internal concept, but if the partitionReaders we make in the toolkit should use AirbyteRecordMessage rather than AirbyteMessage, is there a way to mark it deprecated or something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. No, it's not quite like that. You can use either-or and the outcome is the same! Therefore, since it's the same, let's use the more specific AirbyteRecordMessage.

.withType(AirbyteMessage.Type.RECORD)
.withRecord(
AirbyteRecordMessage()
.withEmittedAt(outputConsumer.emittedAt.toEpochMilli())
.withStream(stream.name)
.withNamespace(stream.namespace)
.withData(outData)
)
AirbyteRecordMessage()
.withStream(stream.name)
.withNamespace(stream.namespace)
.withData(outData)

val streamFieldNames: List<String> = stream.fields.map { it.id }

Expand Down
Loading