Skip to content

Commit e4fec50

Browse files
author
Marius Posta
authored
bulk-cdk: improve AirbyteConnectorRunner and CliRunner (#45374)
1 parent 03584d5 commit e4fec50

File tree

9 files changed

+164
-70
lines changed

9 files changed

+164
-70
lines changed

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt

+13-17
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import io.airbyte.cdk.command.ConnectorCommandLinePropertySource
55
import io.airbyte.cdk.command.MetadataYamlPropertySource
66
import io.micronaut.configuration.picocli.MicronautFactory
77
import io.micronaut.context.ApplicationContext
8+
import io.micronaut.context.RuntimeBeanDefinition
89
import io.micronaut.context.env.CommandLinePropertySource
910
import io.micronaut.context.env.Environment
1011
import io.micronaut.core.cli.CommandLine as MicronautCommandLine
@@ -17,8 +18,11 @@ import picocli.CommandLine.Model.UsageMessageSpec
1718

1819
/** Source connector entry point. */
1920
class AirbyteSourceRunner(
21+
/** CLI args. */
2022
args: Array<out String>,
21-
) : AirbyteConnectorRunner("source", args) {
23+
/** Micronaut bean definition overrides, used only for tests. */
24+
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
25+
) : AirbyteConnectorRunner("source", args, testBeanDefinitions) {
2226
companion object {
2327
@JvmStatic
2428
fun run(vararg args: String) {
@@ -29,8 +33,11 @@ class AirbyteSourceRunner(
2933

3034
/** Destination connector entry point. */
3135
class AirbyteDestinationRunner(
36+
/** CLI args. */
3237
args: Array<out String>,
33-
) : AirbyteConnectorRunner("destination", args) {
38+
/** Micronaut bean definition overrides, used only for tests. */
39+
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
40+
) : AirbyteConnectorRunner("destination", args, testBeanDefinitions) {
3441
companion object {
3542
@JvmStatic
3643
fun run(vararg args: String) {
@@ -46,6 +53,7 @@ class AirbyteDestinationRunner(
4653
sealed class AirbyteConnectorRunner(
4754
val connectorType: String,
4855
val args: Array<out String>,
56+
val testBeanDefinitions: Array<out RuntimeBeanDefinition<*>>,
4957
) {
5058
val envs: Array<String> = arrayOf(Environment.CLI, connectorType)
5159

@@ -65,11 +73,12 @@ sealed class AirbyteConnectorRunner(
6573
commandLinePropertySource,
6674
MetadataYamlPropertySource(),
6775
)
76+
.beanDefinitions(*testBeanDefinitions)
6877
.start()
6978
val isTest: Boolean = ctx.environment.activeNames.contains(Environment.TEST)
7079
val picocliFactory: CommandLine.IFactory = MicronautFactory(ctx)
7180
val picocliCommandLine: CommandLine =
72-
picocliCommandLineFactory.build<AirbyteConnectorRunnable>(picocliFactory, isTest)
81+
picocliCommandLineFactory.build<AirbyteConnectorRunnable>(picocliFactory)
7382
val exitCode: Int = picocliCommandLine.execute(*args)
7483
if (!isTest) {
7584
// Required by the platform, otherwise syncs may hang.
@@ -82,10 +91,7 @@ sealed class AirbyteConnectorRunner(
8291
class PicocliCommandLineFactory(
8392
val runner: AirbyteConnectorRunner,
8493
) {
85-
inline fun <reified R : Runnable> build(
86-
factory: CommandLine.IFactory,
87-
isTest: Boolean,
88-
): CommandLine {
94+
inline fun <reified R : Runnable> build(factory: CommandLine.IFactory): CommandLine {
8995
val commandSpec: CommandLine.Model.CommandSpec =
9096
CommandLine.Model.CommandSpec.wrapWithoutInspection(R::class.java, factory)
9197
.name("airbyte-${runner.connectorType}-connector")
@@ -95,10 +101,6 @@ class PicocliCommandLineFactory(
95101
.addOption(config)
96102
.addOption(catalog)
97103
.addOption(state)
98-
99-
if (isTest) {
100-
commandSpec.addOption(output)
101-
}
102104
return CommandLine(commandSpec, factory)
103105
}
104106

@@ -168,10 +170,4 @@ class PicocliCommandLineFactory(
168170
"path to the json-encoded state file",
169171
"Required by the following commands: read",
170172
)
171-
val output: OptionSpec =
172-
fileOption(
173-
"output",
174-
"path to the output file",
175-
"When present, the connector writes to this file instead of stdout",
176-
)
177173
}

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/command/ConnectorCommandLinePropertySource.kt

-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ class ConnectorCommandLinePropertySource(
2323
const val CONNECTOR_CONFIG_PREFIX: String = "airbyte.connector.config"
2424
const val CONNECTOR_CATALOG_PREFIX: String = "airbyte.connector.catalog"
2525
const val CONNECTOR_STATE_PREFIX: String = "airbyte.connector.state"
26-
const val CONNECTOR_OUTPUT_FILE = "airbyte.connector.output.file"
2726

2827
private fun resolveValues(
2928
commandLine: CommandLine,
@@ -39,7 +38,6 @@ private fun resolveValues(
3938
}
4039
val values: MutableMap<String, Any> = mutableMapOf()
4140
values[Operation.PROPERTY] = ops.first()
42-
commandLine.optionValue("output")?.let { values[CONNECTOR_OUTPUT_FILE] = it }
4341
for ((cliOptionKey, prefix) in
4442
mapOf(
4543
"config" to CONNECTOR_CONFIG_PREFIX,

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/OutputConsumer.kt

-11
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ import io.micronaut.context.annotation.Value
2525
import io.micronaut.context.env.Environment
2626
import jakarta.inject.Singleton
2727
import java.io.ByteArrayOutputStream
28-
import java.io.FileOutputStream
2928
import java.io.PrintStream
30-
import java.nio.file.Path
3129
import java.time.Clock
3230
import java.time.Instant
3331
import java.util.concurrent.ConcurrentHashMap
@@ -104,9 +102,6 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
104102
/** Configuration properties prefix for [StdoutOutputConsumer]. */
105103
const val CONNECTOR_OUTPUT_PREFIX = "airbyte.connector.output"
106104

107-
// Used for integration tests.
108-
const val CONNECTOR_OUTPUT_FILE = "$CONNECTOR_OUTPUT_PREFIX.file"
109-
110105
/** Default implementation of [OutputConsumer]. */
111106
@Singleton
112107
@Secondary
@@ -293,10 +288,4 @@ private class RecordTemplate(
293288
private class PrintStreamFactory {
294289

295290
@Singleton @Requires(notEnv = [Environment.TEST]) fun stdout(): PrintStream = System.out
296-
297-
@Singleton
298-
@Requires(env = [Environment.TEST])
299-
@Requires(property = CONNECTOR_OUTPUT_FILE)
300-
fun file(@Value("\${$CONNECTOR_OUTPUT_FILE}") filePath: Path): PrintStream =
301-
PrintStream(FileOutputStream(filePath.toFile()), false, Charsets.UTF_8)
302291
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.command
6+
7+
import io.airbyte.cdk.output.BufferingOutputConsumer
8+
import io.airbyte.protocol.models.v0.AirbyteMessage
9+
10+
/** Convenience object for return values in [CliRunner]. */
11+
data class CliRunnable(
12+
val runnable: Runnable,
13+
val results: BufferingOutputConsumer,
14+
) {
15+
16+
/** Decorates the [BufferingOutputConsumer] with a callback, which should return quickly. */
17+
fun withCallback(nonBlockingFn: (AirbyteMessage) -> Unit): CliRunnable {
18+
results.callback = nonBlockingFn
19+
return this
20+
}
21+
22+
/** Runs the [Runnable]. */
23+
fun run(): BufferingOutputConsumer {
24+
runnable.run()
25+
return results
26+
}
27+
}

airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunner.kt

+61-31
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,22 @@ import io.airbyte.cdk.AirbyteConnectorRunnable
55
import io.airbyte.cdk.AirbyteConnectorRunner
66
import io.airbyte.cdk.AirbyteDestinationRunner
77
import io.airbyte.cdk.AirbyteSourceRunner
8-
import io.airbyte.cdk.ClockFactory
98
import io.airbyte.cdk.output.BufferingOutputConsumer
109
import io.airbyte.cdk.util.Jsons
1110
import io.airbyte.protocol.models.v0.AirbyteMessage
1211
import io.airbyte.protocol.models.v0.AirbyteStateMessage
1312
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
13+
import io.micronaut.context.RuntimeBeanDefinition
14+
import java.io.ByteArrayInputStream
15+
import java.io.ByteArrayOutputStream
16+
import java.io.InputStream
1417
import java.nio.file.Files
1518
import java.nio.file.Path
1619
import kotlin.io.path.deleteIfExists
1720

1821
data object CliRunner {
1922
/**
20-
* Runs a source connector with the given arguments and returns the results.
23+
* Builds a [CliRunnable] which runs a source connector with the given arguments.
2124
*
2225
* This is useful for writing connector integration tests:
2326
* - the [config], [catalog] and [state] get written to temporary files;
@@ -26,61 +29,88 @@ data object CliRunner {
2629
* - that file name gets passed with the test-only `--output` CLI argument;
2730
* - [AirbyteSourceRunner] takes the CLI arguments and runs them in a new Micronaut context;
2831
* - after it's done, the output file contents are read and parsed into [AirbyteMessage]s.
29-
* - those are stored in a [BufferingOutputConsumer] which is returned.
32+
* - those are stored in the [BufferingOutputConsumer] which is returned in the [CliRunnable].
3033
*/
31-
fun runSource(
34+
fun source(
3235
op: String,
3336
config: ConfigurationJsonObjectBase? = null,
3437
catalog: ConfiguredAirbyteCatalog? = null,
3538
state: List<AirbyteStateMessage>? = null,
36-
): BufferingOutputConsumer =
37-
runConnector(op, config, catalog, state) { args: Array<String> ->
38-
AirbyteSourceRunner(args)
39-
}
39+
): CliRunnable {
40+
val out = CliRunnerOutputStream()
41+
val runnable: Runnable =
42+
makeRunnable(op, config, catalog, state) { args: Array<String> ->
43+
AirbyteSourceRunner(args, out.beanDefinition)
44+
}
45+
return CliRunnable(runnable, out.results)
46+
}
4047

41-
/** Same as [runSource] but for destinations. */
42-
fun runDestination(
48+
/** Same as [source] but for destinations. */
49+
fun destination(
4350
op: String,
4451
config: ConfigurationJsonObjectBase? = null,
4552
catalog: ConfiguredAirbyteCatalog? = null,
4653
state: List<AirbyteStateMessage>? = null,
47-
): BufferingOutputConsumer =
48-
runConnector(op, config, catalog, state) { args: Array<String> ->
49-
AirbyteDestinationRunner(args)
50-
}
54+
inputStream: InputStream,
55+
): CliRunnable {
56+
val inputBeanDefinition: RuntimeBeanDefinition<InputStream> =
57+
RuntimeBeanDefinition.builder(InputStream::class.java) { inputStream }
58+
.singleton(true)
59+
.build()
60+
val out = CliRunnerOutputStream()
61+
val runnable: Runnable =
62+
makeRunnable(op, config, catalog, state) { args: Array<String> ->
63+
AirbyteDestinationRunner(args, inputBeanDefinition, out.beanDefinition)
64+
}
65+
return CliRunnable(runnable, out.results)
66+
}
67+
68+
/** Same as the other [destination] but with [AirbyteMessage] input. */
69+
fun destination(
70+
op: String,
71+
config: ConfigurationJsonObjectBase? = null,
72+
catalog: ConfiguredAirbyteCatalog? = null,
73+
state: List<AirbyteStateMessage>? = null,
74+
vararg input: AirbyteMessage,
75+
): CliRunnable {
76+
val inputJsonBytes: ByteArray =
77+
ByteArrayOutputStream().use { baos ->
78+
for (msg in input) {
79+
Jsons.writeValue(baos, msg)
80+
baos.write('\n'.code)
81+
}
82+
baos.toByteArray()
83+
}
84+
val inputStream: InputStream = ByteArrayInputStream(inputJsonBytes)
85+
return destination(op, config, catalog, state, inputStream)
86+
}
5187

52-
private fun runConnector(
88+
private fun makeRunnable(
5389
op: String,
5490
config: ConfigurationJsonObjectBase?,
5591
catalog: ConfiguredAirbyteCatalog?,
5692
state: List<AirbyteStateMessage>?,
5793
connectorRunnerConstructor: (Array<String>) -> AirbyteConnectorRunner,
58-
): BufferingOutputConsumer {
59-
val result = BufferingOutputConsumer(ClockFactory().fixed())
94+
): Runnable {
6095
val configFile: Path? = inputFile(config)
6196
val catalogFile: Path? = inputFile(catalog)
6297
val stateFile: Path? = inputFile(state)
63-
val outputFile: Path = Files.createTempFile(null, null)
6498
val args: List<String> =
6599
listOfNotNull(
66100
"--$op",
67101
configFile?.let { "--config=$it" },
68102
catalogFile?.let { "--catalog=$it" },
69103
stateFile?.let { "--state=$it" },
70-
"--output=$outputFile",
71104
)
72-
try {
73-
connectorRunnerConstructor(args.toTypedArray()).run<AirbyteConnectorRunnable>()
74-
Files.readAllLines(outputFile)
75-
.filter { it.isNotBlank() }
76-
.map { Jsons.readValue(it, AirbyteMessage::class.java) }
77-
.forEach { result.accept(it) }
78-
return result
79-
} finally {
80-
configFile?.deleteIfExists()
81-
catalogFile?.deleteIfExists()
82-
stateFile?.deleteIfExists()
83-
outputFile.deleteIfExists()
105+
val runner: AirbyteConnectorRunner = connectorRunnerConstructor(args.toTypedArray())
106+
return Runnable {
107+
try {
108+
runner.run<AirbyteConnectorRunnable>()
109+
} finally {
110+
configFile?.deleteIfExists()
111+
catalogFile?.deleteIfExists()
112+
stateFile?.deleteIfExists()
113+
}
84114
}
85115
}
86116

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.command
6+
7+
import io.airbyte.cdk.ClockFactory
8+
import io.airbyte.cdk.output.BufferingOutputConsumer
9+
import io.airbyte.cdk.util.Jsons
10+
import io.airbyte.protocol.models.v0.AirbyteMessage
11+
import io.micronaut.context.RuntimeBeanDefinition
12+
import java.io.ByteArrayOutputStream
13+
import java.io.OutputStream
14+
import java.io.PrintStream
15+
16+
/** Used by [CliRunner] to populate a [BufferingOutputConsumer] instance. */
17+
class CliRunnerOutputStream : OutputStream() {
18+
19+
val results = BufferingOutputConsumer(ClockFactory().fixed())
20+
private val lineStream = ByteArrayOutputStream()
21+
private val printStream = PrintStream(this, true, Charsets.UTF_8)
22+
23+
val beanDefinition: RuntimeBeanDefinition<PrintStream> =
24+
RuntimeBeanDefinition.builder(PrintStream::class.java) { printStream }
25+
.singleton(true)
26+
.build()
27+
28+
override fun write(b: Int) {
29+
if (b == '\n'.code) {
30+
readLine()
31+
} else {
32+
lineStream.write(b)
33+
}
34+
}
35+
36+
override fun close() {
37+
readLine()
38+
lineStream.close()
39+
results.close()
40+
super.close()
41+
}
42+
43+
private fun readLine() {
44+
val line: String = lineStream.toString(Charsets.UTF_8).trim()
45+
lineStream.reset()
46+
if (line.isNotBlank()) {
47+
results.accept(Jsons.readValue(line, AirbyteMessage::class.java))
48+
}
49+
}
50+
}

airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/output/BufferingOutputConsumer.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import java.time.Instant
2020
/** [OutputConsumer] implementation for unit tests. Collects everything into thread-safe buffers. */
2121
@Singleton
2222
@Requires(notEnv = [Environment.CLI])
23-
@Requires(missingProperty = CONNECTOR_OUTPUT_FILE)
2423
@Replaces(OutputConsumer::class)
2524
class BufferingOutputConsumer(
2625
clock: Clock,
@@ -36,6 +35,11 @@ class BufferingOutputConsumer(
3635
private val traces = mutableListOf<AirbyteTraceMessage>()
3736
private val messages = mutableListOf<AirbyteMessage>()
3837

38+
var callback: (AirbyteMessage) -> Unit = {}
39+
set(value) {
40+
synchronized(this) { field = value }
41+
}
42+
3943
override fun accept(input: AirbyteMessage) {
4044
// Deep copy the input, which may be reused and mutated later on.
4145
val m: AirbyteMessage =
@@ -52,6 +56,7 @@ class BufferingOutputConsumer(
5256
AirbyteMessage.Type.TRACE -> traces.add(m.trace)
5357
else -> TODO("${m.type} not supported")
5458
}
59+
callback(m)
5560
}
5661
}
5762

0 commit comments

Comments
 (0)