Skip to content

bulk-cdk-core-base: optimized record serialization #44880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk

import io.micronaut.context.annotation.Factory
Expand All @@ -10,21 +13,25 @@ import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset

const val OFFSET_CLOCK = "offset-clock"

/** Injects more-or-less fake clocks for testing purposes. */
/** Injects the system clock in production and a fake clock in tests. */
@Factory
@Requires(env = [Environment.TEST])
class TestClockFactory {
class ClockFactory {

@Singleton @Requires(notEnv = [Environment.TEST]) fun system(): Clock = Clock.systemUTC()

@Singleton
@Requires(env = [Environment.TEST])
@Requires(notEnv = [OFFSET_CLOCK])
fun fixed(): Clock = Clock.fixed(fakeNow, ZoneOffset.UTC)

@Singleton
@Requires(env = [Environment.TEST])
@Requires(env = [OFFSET_CLOCK])
fun offset(): Clock = Clock.offset(Clock.systemUTC(), Duration.between(fakeNow, Instant.now()))

companion object {
const val OFFSET_CLOCK = "offset-clock"

/** Some convenient timestamp with an easy-to-read ISO8601 representation. */
val fakeNow: Instant = Instant.ofEpochSecond(3133641600)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.output

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.SequenceWriter
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage
Expand All @@ -11,15 +12,25 @@ import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage
import io.airbyte.protocol.models.v0.AirbyteLogMessage
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
import io.airbyte.protocol.models.v0.ConnectorSpecification
import io.micronaut.context.annotation.DefaultImplementation
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Secondary
import io.micronaut.context.annotation.Value
import io.micronaut.context.env.Environment
import jakarta.inject.Singleton
import java.io.ByteArrayOutputStream
import java.io.FileOutputStream
import java.io.PrintStream
import java.nio.file.Path
import java.time.Clock
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer

/** Emits the [AirbyteMessage] instances produced by the connector. */
Expand Down Expand Up @@ -90,38 +101,79 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
}
}

/** Configuration properties prefix for [StdoutOutputConsumer]. */
const val CONNECTOR_OUTPUT_PREFIX = "airbyte.connector.output"

// Used for integration tests.
const val CONNECTOR_OUTPUT_FILE = "airbyte.connector.output.file"
const val CONNECTOR_OUTPUT_FILE = "$CONNECTOR_OUTPUT_PREFIX.file"

/** Default implementation of [OutputConsumer]. */
@Singleton
@Secondary
private class StdoutOutputConsumer : OutputConsumer {
override val emittedAt: Instant = Instant.now()
private class StdoutOutputConsumer(
val stdout: PrintStream,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear how this is being wired in. Does micronaut default PrintStream to ConsolePrintStream? Couldn't find any doc around that behavior. Not sure how this is related to record serialization opt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Micronaut doesn't make those kinds of assumptions. To inject a non-bean instance it will look for a @Factory, in this case PrintStreamFactory defined further below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah my bad. Not sure how I missed it...

clock: Clock,
/**
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the
* buffer's size (in bytes) grows past this value.
*
* Flushing the record buffer to stdout is done by calling [println] which is synchronized. The
* choice of [println] is imposed by our use of the ConsoleJSONAppender log4j appended which
* concurrently calls [println] to print [AirbyteMessage]s of type LOG to standard output.
*
* Because calling [println] incurs both a synchronization overhead and a syscall overhead, the
* connector's performance will noticeably degrade if it's called too often. This happens
* primarily when emitting lots of tiny RECORD messages, which is typical of source connectors.
*
* For this reason, the [bufferByteSizeThresholdForFlush] value should not be too small. The
* default value of 4kB is good in this respect. For example, if the average serialized record
* size is 100 bytes, this will reduce the volume of [println] calls by a factor of 40.
*
* Conversely, the [bufferByteSizeThresholdForFlush] value should also not be too large.
* Otherwise, the output becomes bursty and this also degrades performance. As of today (and
* hopefully not for long) the platform still pipes the connector's stdout into socat to emit
* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes.
*/
@Value("\${$CONNECTOR_OUTPUT_PREFIX.buffer-byte-size-threshold-for-flush:4096}")
val bufferByteSizeThresholdForFlush: Int,
) : OutputConsumer {
override val emittedAt: Instant = Instant.now(clock)

private val buffer = ByteArrayOutputStream()
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(buffer)
private val buffer = ByteArrayOutputStream() // TODO: replace this with a StringWriter?
private val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer)
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(jsonGenerator)

override fun accept(airbyteMessage: AirbyteMessage) {
// This method effectively println's its JSON-serialized argument.
// Using println is not particularly efficient, however.
// To improve performance, this method accumulates RECORD messages into a buffer
// before writing them to standard output in a batch.
// Other Airbyte message types are not buffered, instead they trigger an immediate flush.
// Such messages should not linger indefinitely in a buffer.
val isRecord: Boolean = airbyteMessage.type == AirbyteMessage.Type.RECORD
synchronized(this) {
if (buffer.size() > 0) {
buffer.write('\n'.code)
}
sequenceWriter.write(airbyteMessage)
sequenceWriter.flush()
if (!isRecord || buffer.size() >= BUFFER_MAX_SIZE) {
if (airbyteMessage.type == AirbyteMessage.Type.RECORD) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why record only? Everything seems to be preserved in the original order in the buffer so I feel like all airbyteMessages can be applied here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct in that we could apply this to all message types. The reasons I'm doing it just for records are:

  1. If you ignore RECORDs and LOGs there's really not much left over! Most of the remainder are STATE which are much less frequent than RECORDs and other than that it's just a few TRACE and CONTROL. In other words they don't matter
  2. On the other hand suppose there are very few or no RECORDs in a stream; we don't want the TRACE and STATE messages to sit in the buffer indefinitely, instead, we want the platform to know about them immediately. It's going to inform the UI, it's going to impact the other connector, etc.

// RECORD messages undergo a different serialization scheme.
accept(airbyteMessage.record)
} else {
synchronized(this) {
// Write a newline character to the buffer if it's not empty.
withLockMaybeWriteNewline()
// Non-RECORD AirbyteMessage instances are serialized and written to the buffer
// using standard jackson object mapping facilities.
sequenceWriter.write(airbyteMessage)
sequenceWriter.flush()
// Such messages don't linger in the buffer, they are flushed to stdout immediately,
// along with whatever might have already been lingering inside.
// This prints a newline after the message.
withLockFlush()
}
}
}

private fun withLockMaybeWriteNewline() {
if (buffer.size() > 0) {
buffer.write('\n'.code)
}
}

override fun close() {
synchronized(this) {
// Flush any remaining buffer contents to stdout before closing.
Expand All @@ -131,12 +183,120 @@ private class StdoutOutputConsumer : OutputConsumer {

private fun withLockFlush() {
if (buffer.size() > 0) {
println(buffer.toString(Charsets.UTF_8))
stdout.println(buffer.toString(Charsets.UTF_8))
stdout.flush()
buffer.reset()
}
}

override fun accept(record: AirbyteRecordMessage) {
// The serialization of RECORD messages can become a performance bottleneck for source
// connectors because they can come in much higher volumes than other message types.
// Specifically, with jackson, the bottleneck is in the object mapping logic.
// As it turns out, this object mapping logic is not particularly useful for RECORD messages
// because within a given stream the only variations occur in the "data" and the "meta"
// fields:
// - the "data" field is already an ObjectNode and is cheap to serialize,
// - the "meta" field is often unset.
// For this reason, this method builds and reuses a JSON template for each stream.
// Then, for each record, it serializes just "data" and "meta" to populate the template.
val template: RecordTemplate = getOrCreateRecordTemplate(record.stream, record.namespace)
synchronized(this) {
// Write a newline character to the buffer if it's not empty.
withLockMaybeWriteNewline()
// Write '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":'.
buffer.write(template.prefix)
// Serialize the record data ObjectNode to JSON, writing it to the buffer.
Jsons.writeTree(jsonGenerator, record.data)
jsonGenerator.flush()
// If the record has a AirbyteRecordMessageMeta instance set,
// write ',"meta":' followed by the serialized meta.
val meta: AirbyteRecordMessageMeta? = record.meta
if (meta != null) {
buffer.write(metaPrefixBytes)
sequenceWriter.write(meta)
sequenceWriter.flush()
}
// Write ',"emitted_at":...}}'.
buffer.write(template.suffix)
// Flush the buffer to stdout only once it has reached a certain size.
// Flushing to stdout incurs some overhead (mutex, syscall, etc.)
// which otherwise becomes very apparent when lots of tiny records are involved.
if (buffer.size() >= bufferByteSizeThresholdForFlush) {
withLockFlush()
}
}
}

private val metaPrefixBytes: ByteArray = META_PREFIX.toByteArray()

private fun getOrCreateRecordTemplate(stream: String, namespace: String?): RecordTemplate {
val streamToTemplateMap: StreamToTemplateMap =
if (namespace == null) {
unNamespacedTemplates
} else {
namespacedTemplates.getOrPut(namespace) { StreamToTemplateMap() }
}
return streamToTemplateMap.getOrPut(stream) {
RecordTemplate.create(stream, namespace, emittedAt)
}
}

private val namespacedTemplates = ConcurrentHashMap<String, StreamToTemplateMap>()
private val unNamespacedTemplates = StreamToTemplateMap()

companion object {
const val BUFFER_MAX_SIZE = 1024 * 1024
const val META_PREFIX = ""","meta":"""
}
}

private typealias StreamToTemplateMap = ConcurrentHashMap<String, RecordTemplate>

private class RecordTemplate(
/** [prefix] is '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":' */
val prefix: ByteArray,
/** [suffix] is ',"emitted_at":...}}' */
val suffix: ByteArray,
) {
companion object {
fun create(stream: String, namespace: String?, emittedAt: Instant): RecordTemplate {
// Generate a dummy AirbyteRecordMessage instance for the given args
// using an empty object (i.e. '{}') for the "data" field value.
val recordMessage =
AirbyteRecordMessage()
.withStream(stream)
.withNamespace(namespace)
.withEmittedAt(emittedAt.toEpochMilli())
.withData(Jsons.objectNode())
// Generate the corresponding dummy AirbyteMessage instance.
val airbyteMessage =
AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(recordMessage)
// Serialize to JSON.
val json: String = Jsons.writeValueAsString(airbyteMessage)
// Split the string in 2 around the '"data":{}' substring.
val parts: List<String> = json.split(DATA_SPLIT_DELIMITER, limit = 2)
require(parts.size == 2) { "expected to find $DATA_SPLIT_DELIMITER in $json" }
// Re-attach the '"data":' substring to the first part
// and return both parts in a RecordTemplate instance for this stream.
return RecordTemplate(
prefix = (parts.first() + DATA_PREFIX).toByteArray(),
suffix = parts.last().toByteArray()
)
}

private const val DATA_PREFIX = """"data":"""
private const val DATA_SPLIT_DELIMITER = "$DATA_PREFIX{}"
}
}

@Factory
private class PrintStreamFactory {

@Singleton @Requires(notEnv = [Environment.TEST]) fun stdout(): PrintStream = System.out

@Singleton
@Requires(env = [Environment.TEST])
@Requires(property = CONNECTOR_OUTPUT_FILE)
fun file(@Value("\${$CONNECTOR_OUTPUT_FILE}") filePath: Path): PrintStream =
PrintStream(FileOutputStream(filePath.toFile()), false, Charsets.UTF_8)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.airbyte.cdk.AirbyteConnectorRunnable
import io.airbyte.cdk.AirbyteConnectorRunner
import io.airbyte.cdk.AirbyteDestinationRunner
import io.airbyte.cdk.AirbyteSourceRunner
import io.airbyte.cdk.TestClockFactory
import io.airbyte.cdk.ClockFactory
import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
Expand Down Expand Up @@ -56,7 +56,7 @@ data object CliRunner {
state: List<AirbyteStateMessage>?,
connectorRunnerConstructor: (Array<String>) -> AirbyteConnectorRunner,
): BufferingOutputConsumer {
val result = BufferingOutputConsumer(TestClockFactory().fixed())
val result = BufferingOutputConsumer(ClockFactory().fixed())
val configFile: Path? = inputFile(config)
val catalogFile: Path? = inputFile(catalog)
val stateFile: Path? = inputFile(state)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.airbyte.cdk.read

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ArrayNode
import io.airbyte.cdk.TestClockFactory
import io.airbyte.cdk.ClockFactory
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.util.Jsons
Expand Down Expand Up @@ -109,7 +109,7 @@ class RootReaderIntegrationTest {
fun testAllStreamsNonGlobal() {
val stateManager =
StateManager(initialStreamStates = testCases.associate { it.stream to null })
val testOutputConsumer = BufferingOutputConsumer(TestClockFactory().fixed())
val testOutputConsumer = BufferingOutputConsumer(ClockFactory().fixed())
val rootReader =
RootReader(
stateManager,
Expand Down Expand Up @@ -152,7 +152,7 @@ class RootReaderIntegrationTest {
initialGlobalState = null,
initialStreamStates = testCases.associate { it.stream to null },
)
val testOutputConsumer = BufferingOutputConsumer(TestClockFactory().fixed())
val testOutputConsumer = BufferingOutputConsumer(ClockFactory().fixed())
val rootReader =
RootReader(
stateManager,
Expand Down Expand Up @@ -216,7 +216,7 @@ data class TestCase(
)

fun run() {
val testOutputConsumer = BufferingOutputConsumer(TestClockFactory().fixed())
val testOutputConsumer = BufferingOutputConsumer(ClockFactory().fixed())
val rootReader =
RootReader(
StateManager(initialStreamStates = mapOf(stream to null)),
Expand Down
Loading
Loading