Skip to content

Commit 130bff8

Browse files
author
Marius Posta
authored
bulk-cdk-core-base: optimized record serialization (#44880)
1 parent 7c2e51e commit 130bff8

File tree

7 files changed

+202
-83
lines changed

7 files changed

+202
-83
lines changed

airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/TestClockFactory.kt airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/ClockFactory.kt

+13-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
25
package io.airbyte.cdk
36

47
import io.micronaut.context.annotation.Factory
@@ -10,21 +13,25 @@ import java.time.Duration
1013
import java.time.Instant
1114
import java.time.ZoneOffset
1215

13-
const val OFFSET_CLOCK = "offset-clock"
14-
15-
/** Injects more-or-less fake clocks for testing purposes. */
16+
/** Injects the system clock in production and a fake clock in tests. */
1617
@Factory
17-
@Requires(env = [Environment.TEST])
18-
class TestClockFactory {
18+
class ClockFactory {
19+
20+
@Singleton @Requires(notEnv = [Environment.TEST]) fun system(): Clock = Clock.systemUTC()
21+
1922
@Singleton
23+
@Requires(env = [Environment.TEST])
2024
@Requires(notEnv = [OFFSET_CLOCK])
2125
fun fixed(): Clock = Clock.fixed(fakeNow, ZoneOffset.UTC)
2226

2327
@Singleton
28+
@Requires(env = [Environment.TEST])
2429
@Requires(env = [OFFSET_CLOCK])
2530
fun offset(): Clock = Clock.offset(Clock.systemUTC(), Duration.between(fakeNow, Instant.now()))
2631

2732
companion object {
33+
const val OFFSET_CLOCK = "offset-clock"
34+
2835
/** Some convenient timestamp with an easy-to-read ISO8601 representation. */
2936
val fakeNow: Instant = Instant.ofEpochSecond(3133641600)
3037
}

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/OutputConsumer.kt

+177-17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
22
package io.airbyte.cdk.output
33

4+
import com.fasterxml.jackson.core.JsonGenerator
45
import com.fasterxml.jackson.databind.SequenceWriter
56
import io.airbyte.cdk.util.Jsons
67
import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage
@@ -11,15 +12,25 @@ import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage
1112
import io.airbyte.protocol.models.v0.AirbyteLogMessage
1213
import io.airbyte.protocol.models.v0.AirbyteMessage
1314
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
15+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
1416
import io.airbyte.protocol.models.v0.AirbyteStateMessage
1517
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
1618
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
1719
import io.airbyte.protocol.models.v0.ConnectorSpecification
1820
import io.micronaut.context.annotation.DefaultImplementation
21+
import io.micronaut.context.annotation.Factory
22+
import io.micronaut.context.annotation.Requires
1923
import io.micronaut.context.annotation.Secondary
24+
import io.micronaut.context.annotation.Value
25+
import io.micronaut.context.env.Environment
2026
import jakarta.inject.Singleton
2127
import java.io.ByteArrayOutputStream
28+
import java.io.FileOutputStream
29+
import java.io.PrintStream
30+
import java.nio.file.Path
31+
import java.time.Clock
2232
import java.time.Instant
33+
import java.util.concurrent.ConcurrentHashMap
2334
import java.util.function.Consumer
2435

2536
/** Emits the [AirbyteMessage] instances produced by the connector. */
@@ -90,38 +101,79 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
90101
}
91102
}
92103

104+
/** Configuration properties prefix for [StdoutOutputConsumer]. */
105+
const val CONNECTOR_OUTPUT_PREFIX = "airbyte.connector.output"
106+
93107
// Used for integration tests.
94-
const val CONNECTOR_OUTPUT_FILE = "airbyte.connector.output.file"
108+
const val CONNECTOR_OUTPUT_FILE = "$CONNECTOR_OUTPUT_PREFIX.file"
95109

96110
/** Default implementation of [OutputConsumer]. */
97111
@Singleton
98112
@Secondary
99-
private class StdoutOutputConsumer : OutputConsumer {
100-
override val emittedAt: Instant = Instant.now()
113+
private class StdoutOutputConsumer(
114+
val stdout: PrintStream,
115+
clock: Clock,
116+
/**
117+
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the
118+
* buffer's size (in bytes) grows past this value.
119+
*
120+
* Flushing the record buffer to stdout is done by calling [println] which is synchronized. The
121+
* choice of [println] is imposed by our use of the ConsoleJSONAppender log4j appended which
122+
* concurrently calls [println] to print [AirbyteMessage]s of type LOG to standard output.
123+
*
124+
* Because calling [println] incurs both a synchronization overhead and a syscall overhead, the
125+
* connector's performance will noticeably degrade if it's called too often. This happens
126+
* primarily when emitting lots of tiny RECORD messages, which is typical of source connectors.
127+
*
128+
* For this reason, the [bufferByteSizeThresholdForFlush] value should not be too small. The
129+
* default value of 4kB is good in this respect. For example, if the average serialized record
130+
* size is 100 bytes, this will reduce the volume of [println] calls by a factor of 40.
131+
*
132+
* Conversely, the [bufferByteSizeThresholdForFlush] value should also not be too large.
133+
* Otherwise, the output becomes bursty and this also degrades performance. As of today (and
134+
* hopefully not for long) the platform still pipes the connector's stdout into socat to emit
135+
* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any
136+
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes.
137+
*/
138+
@Value("\${$CONNECTOR_OUTPUT_PREFIX.buffer-byte-size-threshold-for-flush:4096}")
139+
val bufferByteSizeThresholdForFlush: Int,
140+
) : OutputConsumer {
141+
override val emittedAt: Instant = Instant.now(clock)
101142

102-
private val buffer = ByteArrayOutputStream()
103-
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(buffer)
143+
private val buffer = ByteArrayOutputStream() // TODO: replace this with a StringWriter?
144+
private val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer)
145+
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(jsonGenerator)
104146

105147
override fun accept(airbyteMessage: AirbyteMessage) {
106148
// This method effectively println's its JSON-serialized argument.
107149
// Using println is not particularly efficient, however.
108150
// To improve performance, this method accumulates RECORD messages into a buffer
109151
// before writing them to standard output in a batch.
110-
// Other Airbyte message types are not buffered, instead they trigger an immediate flush.
111-
// Such messages should not linger indefinitely in a buffer.
112-
val isRecord: Boolean = airbyteMessage.type == AirbyteMessage.Type.RECORD
113-
synchronized(this) {
114-
if (buffer.size() > 0) {
115-
buffer.write('\n'.code)
116-
}
117-
sequenceWriter.write(airbyteMessage)
118-
sequenceWriter.flush()
119-
if (!isRecord || buffer.size() >= BUFFER_MAX_SIZE) {
152+
if (airbyteMessage.type == AirbyteMessage.Type.RECORD) {
153+
// RECORD messages undergo a different serialization scheme.
154+
accept(airbyteMessage.record)
155+
} else {
156+
synchronized(this) {
157+
// Write a newline character to the buffer if it's not empty.
158+
withLockMaybeWriteNewline()
159+
// Non-RECORD AirbyteMessage instances are serialized and written to the buffer
160+
// using standard jackson object mapping facilities.
161+
sequenceWriter.write(airbyteMessage)
162+
sequenceWriter.flush()
163+
// Such messages don't linger in the buffer, they are flushed to stdout immediately,
164+
// along with whatever might have already been lingering inside.
165+
// This prints a newline after the message.
120166
withLockFlush()
121167
}
122168
}
123169
}
124170

171+
private fun withLockMaybeWriteNewline() {
172+
if (buffer.size() > 0) {
173+
buffer.write('\n'.code)
174+
}
175+
}
176+
125177
override fun close() {
126178
synchronized(this) {
127179
// Flush any remaining buffer contents to stdout before closing.
@@ -131,12 +183,120 @@ private class StdoutOutputConsumer : OutputConsumer {
131183

132184
private fun withLockFlush() {
133185
if (buffer.size() > 0) {
134-
println(buffer.toString(Charsets.UTF_8))
186+
stdout.println(buffer.toString(Charsets.UTF_8))
187+
stdout.flush()
135188
buffer.reset()
136189
}
137190
}
138191

192+
override fun accept(record: AirbyteRecordMessage) {
193+
// The serialization of RECORD messages can become a performance bottleneck for source
194+
// connectors because they can come in much higher volumes than other message types.
195+
// Specifically, with jackson, the bottleneck is in the object mapping logic.
196+
// As it turns out, this object mapping logic is not particularly useful for RECORD messages
197+
// because within a given stream the only variations occur in the "data" and the "meta"
198+
// fields:
199+
// - the "data" field is already an ObjectNode and is cheap to serialize,
200+
// - the "meta" field is often unset.
201+
// For this reason, this method builds and reuses a JSON template for each stream.
202+
// Then, for each record, it serializes just "data" and "meta" to populate the template.
203+
val template: RecordTemplate = getOrCreateRecordTemplate(record.stream, record.namespace)
204+
synchronized(this) {
205+
// Write a newline character to the buffer if it's not empty.
206+
withLockMaybeWriteNewline()
207+
// Write '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":'.
208+
buffer.write(template.prefix)
209+
// Serialize the record data ObjectNode to JSON, writing it to the buffer.
210+
Jsons.writeTree(jsonGenerator, record.data)
211+
jsonGenerator.flush()
212+
// If the record has a AirbyteRecordMessageMeta instance set,
213+
// write ',"meta":' followed by the serialized meta.
214+
val meta: AirbyteRecordMessageMeta? = record.meta
215+
if (meta != null) {
216+
buffer.write(metaPrefixBytes)
217+
sequenceWriter.write(meta)
218+
sequenceWriter.flush()
219+
}
220+
// Write ',"emitted_at":...}}'.
221+
buffer.write(template.suffix)
222+
// Flush the buffer to stdout only once it has reached a certain size.
223+
// Flushing to stdout incurs some overhead (mutex, syscall, etc.)
224+
// which otherwise becomes very apparent when lots of tiny records are involved.
225+
if (buffer.size() >= bufferByteSizeThresholdForFlush) {
226+
withLockFlush()
227+
}
228+
}
229+
}
230+
231+
private val metaPrefixBytes: ByteArray = META_PREFIX.toByteArray()
232+
233+
private fun getOrCreateRecordTemplate(stream: String, namespace: String?): RecordTemplate {
234+
val streamToTemplateMap: StreamToTemplateMap =
235+
if (namespace == null) {
236+
unNamespacedTemplates
237+
} else {
238+
namespacedTemplates.getOrPut(namespace) { StreamToTemplateMap() }
239+
}
240+
return streamToTemplateMap.getOrPut(stream) {
241+
RecordTemplate.create(stream, namespace, emittedAt)
242+
}
243+
}
244+
245+
private val namespacedTemplates = ConcurrentHashMap<String, StreamToTemplateMap>()
246+
private val unNamespacedTemplates = StreamToTemplateMap()
247+
139248
companion object {
140-
const val BUFFER_MAX_SIZE = 1024 * 1024
249+
const val META_PREFIX = ""","meta":"""
141250
}
142251
}
252+
253+
private typealias StreamToTemplateMap = ConcurrentHashMap<String, RecordTemplate>
254+
255+
private class RecordTemplate(
256+
/** [prefix] is '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":' */
257+
val prefix: ByteArray,
258+
/** [suffix] is ',"emitted_at":...}}' */
259+
val suffix: ByteArray,
260+
) {
261+
companion object {
262+
fun create(stream: String, namespace: String?, emittedAt: Instant): RecordTemplate {
263+
// Generate a dummy AirbyteRecordMessage instance for the given args
264+
// using an empty object (i.e. '{}') for the "data" field value.
265+
val recordMessage =
266+
AirbyteRecordMessage()
267+
.withStream(stream)
268+
.withNamespace(namespace)
269+
.withEmittedAt(emittedAt.toEpochMilli())
270+
.withData(Jsons.objectNode())
271+
// Generate the corresponding dummy AirbyteMessage instance.
272+
val airbyteMessage =
273+
AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(recordMessage)
274+
// Serialize to JSON.
275+
val json: String = Jsons.writeValueAsString(airbyteMessage)
276+
// Split the string in 2 around the '"data":{}' substring.
277+
val parts: List<String> = json.split(DATA_SPLIT_DELIMITER, limit = 2)
278+
require(parts.size == 2) { "expected to find $DATA_SPLIT_DELIMITER in $json" }
279+
// Re-attach the '"data":' substring to the first part
280+
// and return both parts in a RecordTemplate instance for this stream.
281+
return RecordTemplate(
282+
prefix = (parts.first() + DATA_PREFIX).toByteArray(),
283+
suffix = parts.last().toByteArray()
284+
)
285+
}
286+
287+
private const val DATA_PREFIX = """"data":"""
288+
private const val DATA_SPLIT_DELIMITER = "$DATA_PREFIX{}"
289+
}
290+
}
291+
292+
@Factory
293+
private class PrintStreamFactory {
294+
295+
@Singleton @Requires(notEnv = [Environment.TEST]) fun stdout(): PrintStream = System.out
296+
297+
@Singleton
298+
@Requires(env = [Environment.TEST])
299+
@Requires(property = CONNECTOR_OUTPUT_FILE)
300+
fun file(@Value("\${$CONNECTOR_OUTPUT_FILE}") filePath: Path): PrintStream =
301+
PrintStream(FileOutputStream(filePath.toFile()), false, Charsets.UTF_8)
302+
}

airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunner.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import io.airbyte.cdk.AirbyteConnectorRunnable
55
import io.airbyte.cdk.AirbyteConnectorRunner
66
import io.airbyte.cdk.AirbyteDestinationRunner
77
import io.airbyte.cdk.AirbyteSourceRunner
8-
import io.airbyte.cdk.TestClockFactory
8+
import io.airbyte.cdk.ClockFactory
99
import io.airbyte.cdk.output.BufferingOutputConsumer
1010
import io.airbyte.cdk.util.Jsons
1111
import io.airbyte.protocol.models.v0.AirbyteMessage
@@ -56,7 +56,7 @@ data object CliRunner {
5656
state: List<AirbyteStateMessage>?,
5757
connectorRunnerConstructor: (Array<String>) -> AirbyteConnectorRunner,
5858
): BufferingOutputConsumer {
59-
val result = BufferingOutputConsumer(TestClockFactory().fixed())
59+
val result = BufferingOutputConsumer(ClockFactory().fixed())
6060
val configFile: Path? = inputFile(config)
6161
val catalogFile: Path? = inputFile(catalog)
6262
val stateFile: Path? = inputFile(state)

airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/output/FileOutputConsumer.kt

-42
This file was deleted.

airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/RootReaderIntegrationTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package io.airbyte.cdk.read
33

44
import com.fasterxml.jackson.databind.JsonNode
55
import com.fasterxml.jackson.databind.node.ArrayNode
6-
import io.airbyte.cdk.TestClockFactory
6+
import io.airbyte.cdk.ClockFactory
77
import io.airbyte.cdk.command.OpaqueStateValue
88
import io.airbyte.cdk.output.BufferingOutputConsumer
99
import io.airbyte.cdk.util.Jsons
@@ -109,7 +109,7 @@ class RootReaderIntegrationTest {
109109
fun testAllStreamsNonGlobal() {
110110
val stateManager =
111111
StateManager(initialStreamStates = testCases.associate { it.stream to null })
112-
val testOutputConsumer = BufferingOutputConsumer(TestClockFactory().fixed())
112+
val testOutputConsumer = BufferingOutputConsumer(ClockFactory().fixed())
113113
val rootReader =
114114
RootReader(
115115
stateManager,
@@ -152,7 +152,7 @@ class RootReaderIntegrationTest {
152152
initialGlobalState = null,
153153
initialStreamStates = testCases.associate { it.stream to null },
154154
)
155-
val testOutputConsumer = BufferingOutputConsumer(TestClockFactory().fixed())
155+
val testOutputConsumer = BufferingOutputConsumer(ClockFactory().fixed())
156156
val rootReader =
157157
RootReader(
158158
stateManager,
@@ -216,7 +216,7 @@ data class TestCase(
216216
)
217217

218218
fun run() {
219-
val testOutputConsumer = BufferingOutputConsumer(TestClockFactory().fixed())
219+
val testOutputConsumer = BufferingOutputConsumer(ClockFactory().fixed())
220220
val rootReader =
221221
RootReader(
222222
StateManager(initialStreamStates = mapOf(stream to null)),

0 commit comments

Comments
 (0)