Skip to content

Commit b3103b3

Browse files
author
Marius Posta
committed
bulk-cdk-core-base: savagely optimized record serialization
1 parent 18a8e5f commit b3103b3

File tree

1 file changed

+151
-14
lines changed

1 file changed

+151
-14
lines changed

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/OutputConsumer.kt

+151-14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
22
package io.airbyte.cdk.output
33

4+
import com.fasterxml.jackson.core.JsonGenerator
45
import com.fasterxml.jackson.databind.SequenceWriter
56
import io.airbyte.cdk.util.Jsons
67
import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage
@@ -11,15 +12,18 @@ import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage
1112
import io.airbyte.protocol.models.v0.AirbyteLogMessage
1213
import io.airbyte.protocol.models.v0.AirbyteMessage
1314
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
15+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
1416
import io.airbyte.protocol.models.v0.AirbyteStateMessage
1517
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
1618
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
1719
import io.airbyte.protocol.models.v0.ConnectorSpecification
20+
import io.micronaut.context.annotation.ConfigurationProperties
1821
import io.micronaut.context.annotation.DefaultImplementation
1922
import io.micronaut.context.annotation.Secondary
2023
import jakarta.inject.Singleton
2124
import java.io.ByteArrayOutputStream
2225
import java.time.Instant
26+
import java.util.concurrent.ConcurrentHashMap
2327
import java.util.function.Consumer
2428

2529
/** Emits the [AirbyteMessage] instances produced by the connector. */
@@ -93,35 +97,73 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
9397
// Used for integration tests.
9498
const val CONNECTOR_OUTPUT_FILE = "airbyte.connector.output.file"
9599

100+
/** Configuration properties prefix for [StdoutOutputConsumer]. */
101+
const val CONNECTOR_OUTPUT_STDOUT_PREFIX = "airbyte.connector.output.stdout"
102+
96103
/** Default implementation of [OutputConsumer]. */
97104
@Singleton
98105
@Secondary
99-
private class StdoutOutputConsumer : OutputConsumer {
106+
@ConfigurationProperties(CONNECTOR_OUTPUT_STDOUT_PREFIX)
107+
private class StdoutOutputConsumer(
108+
/**
109+
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the
110+
* buffer's size (in bytes) grows past this value.
111+
*
112+
* Flushing the record buffer to stdout is done by calling [println] which is synchronized. The
113+
* choice of [println] is imposed by our use of the ConsoleJSONAppender log4j appended which
114+
* concurrently calls [println] to print [AirbyteMessage]s of type LOG to standard output.
115+
*
116+
* Because calling [println] incurs both a synchronization overhead and a syscall overhead, the
117+
* connector's performance will noticeably degrade if it's called too often. This happens
118+
* primarily when emitting lots of tiny RECORD messages, which is typical of source connectors.
119+
*
120+
* For this reason, the [bufferByteSizeThresholdForFlush] value should not be too small. The
121+
* default value of 4kB is good in this respect. For example, if the average serialized record
122+
* size is 100 bytes, this will reduce the volume of [println] calls by a factor of 40.
123+
*
124+
* Conversely, the [bufferByteSizeThresholdForFlush] value should also not be too large.
125+
* Otherwise, the output becomes bursty and this also degrades performance. As of today (and
126+
* hopefully not for long) the platform still pipes the connector's stdout into socat to emit
127+
* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any
128+
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes.
129+
*/
130+
val bufferByteSizeThresholdForFlush: Int = 4 * 1024
131+
) : OutputConsumer {
100132
override val emittedAt: Instant = Instant.now()
101-
102133
private val buffer = ByteArrayOutputStream()
103-
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(buffer)
134+
private val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer)
135+
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(jsonGenerator)
104136

105137
override fun accept(airbyteMessage: AirbyteMessage) {
106138
// This method effectively println's its JSON-serialized argument.
107139
// Using println is not particularly efficient, however.
108140
// To improve performance, this method accumulates RECORD messages into a buffer
109141
// before writing them to standard output in a batch.
110-
// Other Airbyte message types are not buffered, instead they trigger an immediate flush.
111-
// Such messages should not linger indefinitely in a buffer.
112-
val isRecord: Boolean = airbyteMessage.type == AirbyteMessage.Type.RECORD
113-
synchronized(this) {
114-
if (buffer.size() > 0) {
115-
buffer.write('\n'.code)
116-
}
117-
sequenceWriter.write(airbyteMessage)
118-
sequenceWriter.flush()
119-
if (!isRecord || buffer.size() >= BUFFER_MAX_SIZE) {
142+
if (airbyteMessage.type == AirbyteMessage.Type.RECORD) {
143+
// RECORD messages undergo a different serialization scheme.
144+
accept(airbyteMessage.record)
145+
} else {
146+
synchronized(this) {
147+
// Write a newline character to the buffer if it's not empty.
148+
withLockMaybeWriteNewline()
149+
// Non-RECORD AirbyteMessage instances are serialized and written to the buffer
150+
// using standard jackson object mapping facilities.
151+
sequenceWriter.write(airbyteMessage)
152+
sequenceWriter.flush()
153+
// Such messages don't linger in the buffer, they are flushed to stdout immediately,
154+
// along with whatever might have already been lingering inside.
155+
// This prints a newline after the message.
120156
withLockFlush()
121157
}
122158
}
123159
}
124160

161+
private fun withLockMaybeWriteNewline() {
162+
if (buffer.size() > 0) {
163+
buffer.write('\n'.code)
164+
}
165+
}
166+
125167
override fun close() {
126168
synchronized(this) {
127169
// Flush any remaining buffer contents to stdout before closing.
@@ -136,7 +178,102 @@ private class StdoutOutputConsumer : OutputConsumer {
136178
}
137179
}
138180

181+
override fun accept(record: AirbyteRecordMessage) {
182+
// The serialization of RECORD messages can become a performance bottleneck for source
183+
// connectors because they can come in much higher volumes than other message types.
184+
// Specifically, with jackson, the bottleneck is in the object mapping logic.
185+
// As it turns out, this object mapping logic is not particularly useful for RECORD messages
186+
// because within a given stream the only variations occur in the "data" and the "meta"
187+
// fields:
188+
// - the "data" field is already an ObjectNode and is cheap to serialize,
189+
// - the "meta" field is often unset.
190+
// For this reason, this method builds and reuses a JSON template for each stream.
191+
// Then, for each record, it serializes just "data" and "meta" to populate the template.
192+
val template: RecordTemplate = getOrCreateRecordTemplate(record.stream, record.namespace)
193+
synchronized(this) {
194+
// Write a newline character to the buffer if it's not empty.
195+
withLockMaybeWriteNewline()
196+
// Write '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":'.
197+
buffer.write(template.prefix)
198+
// Serialize the record data ObjectNode to JSON, writing it to the buffer.
199+
Jsons.writeTree(jsonGenerator, record.data)
200+
jsonGenerator.flush()
201+
// If the record has a AirbyteRecordMessageMeta instance set,
202+
// write ',"meta":' followed by the serialized meta.
203+
val meta: AirbyteRecordMessageMeta? = record.meta
204+
if (meta != null) {
205+
buffer.write(metaPrefixBytes)
206+
sequenceWriter.write(meta)
207+
sequenceWriter.flush()
208+
}
209+
// Write ',"emitted_at":...}}'.
210+
buffer.write(template.suffix)
211+
// Flush the buffer to stdout only once it has reached a certain size.
212+
// Flushing to stdout incurs some overhead (mutex, syscall, etc.)
213+
// which otherwise becomes very apparent when lots of tiny records are involved.
214+
if (buffer.size() >= bufferByteSizeThresholdForFlush) {
215+
withLockFlush()
216+
}
217+
}
218+
}
219+
220+
private val metaPrefixBytes: ByteArray = META_PREFIX.toByteArray()
221+
222+
private fun getOrCreateRecordTemplate(stream: String, namespace: String?): RecordTemplate {
223+
val streamToTemplateMap: StreamToTemplateMap =
224+
if (namespace == null) {
225+
unNamespacedTemplates
226+
} else {
227+
namespacedTemplates.getOrPut(namespace) { StreamToTemplateMap() }
228+
}
229+
return streamToTemplateMap.getOrPut(stream) {
230+
RecordTemplate.create(stream, namespace, emittedAt)
231+
}
232+
}
233+
234+
private val namespacedTemplates = ConcurrentHashMap<String, StreamToTemplateMap>()
235+
private val unNamespacedTemplates = StreamToTemplateMap()
236+
237+
companion object {
238+
const val META_PREFIX = ""","meta":"""
239+
}
240+
}
241+
242+
private typealias StreamToTemplateMap = ConcurrentHashMap<String, RecordTemplate>
243+
244+
private class RecordTemplate(
245+
/** [prefix] is '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":' */
246+
val prefix: ByteArray,
247+
/** [suffix] is ',"emitted_at":...}}' */
248+
val suffix: ByteArray,
249+
) {
139250
companion object {
140-
const val BUFFER_MAX_SIZE = 1024 * 1024
251+
fun create(stream: String, namespace: String?, emittedAt: Instant): RecordTemplate {
252+
// Generate a dummy AirbyteRecordMessage instance for the given args
253+
// using an empty object (i.e. '{}') for the "data" field value.
254+
val recordMessage =
255+
AirbyteRecordMessage()
256+
.withStream(stream)
257+
.withNamespace(namespace)
258+
.withEmittedAt(emittedAt.toEpochMilli())
259+
.withData(Jsons.objectNode())
260+
// Generate the corresponding dummy AirbyteMessage instance.
261+
val airbyteMessage =
262+
AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(recordMessage)
263+
// Serialize to JSON.
264+
val json: String = Jsons.writeValueAsString(airbyteMessage)
265+
// Split the string in 2 around the '"data":{}' substring.
266+
val parts: List<String> = json.split(splitRegex, limit = 2)
267+
require(parts.size == 2) { "expected to find $splitRegex in $json" }
268+
// Re-attach the '"data":' substring to the first part
269+
// and return both parts in a RecordTemplate instance for this stream.
270+
return RecordTemplate(
271+
prefix = (parts.first() + DATA_PREFIX).toByteArray(),
272+
suffix = parts.last().toByteArray()
273+
)
274+
}
275+
276+
private const val DATA_PREFIX = """"data":"""
277+
private val splitRegex: Regex = Regex.fromLiteral("$DATA_PREFIX{}")
141278
}
142279
}

0 commit comments

Comments
 (0)