@@ -17,11 +17,18 @@ import io.airbyte.protocol.models.v0.AirbyteStateMessage
17
17
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
18
18
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
19
19
import io.airbyte.protocol.models.v0.ConnectorSpecification
20
- import io.micronaut.context.annotation.ConfigurationProperties
21
20
import io.micronaut.context.annotation.DefaultImplementation
21
+ import io.micronaut.context.annotation.Factory
22
+ import io.micronaut.context.annotation.Requires
22
23
import io.micronaut.context.annotation.Secondary
24
+ import io.micronaut.context.annotation.Value
25
+ import io.micronaut.context.env.Environment
23
26
import jakarta.inject.Singleton
24
27
import java.io.ByteArrayOutputStream
28
+ import java.io.FileOutputStream
29
+ import java.io.PrintStream
30
+ import java.nio.file.Path
31
+ import java.time.Clock
25
32
import java.time.Instant
26
33
import java.util.concurrent.ConcurrentHashMap
27
34
import java.util.function.Consumer
@@ -94,17 +101,18 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
94
101
}
95
102
}
96
103
97
- // Used for integration tests.
98
- const val CONNECTOR_OUTPUT_FILE = " airbyte.connector.output.file"
99
-
100
104
/* * Configuration properties prefix for [StdoutOutputConsumer]. */
101
- const val CONNECTOR_OUTPUT_STDOUT_PREFIX = " airbyte.connector.output.stdout"
105
+ const val CONNECTOR_OUTPUT_PREFIX = " airbyte.connector.output"
106
+
107
+ // Used for integration tests.
108
+ const val CONNECTOR_OUTPUT_FILE = " $CONNECTOR_OUTPUT_PREFIX .file"
102
109
103
110
/* * Default implementation of [OutputConsumer]. */
104
111
@Singleton
105
112
@Secondary
106
- @ConfigurationProperties(CONNECTOR_OUTPUT_STDOUT_PREFIX )
107
113
private class StdoutOutputConsumer (
114
+ val stdout : PrintStream ,
115
+ clock : Clock ,
108
116
/* *
109
117
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the
110
118
* buffer's size (in bytes) grows past this value.
@@ -127,9 +135,11 @@ private class StdoutOutputConsumer(
127
135
* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any
128
136
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes.
129
137
*/
130
- val bufferByteSizeThresholdForFlush : Int = 4 * 1024
138
+ @Value(" \$ {$CONNECTOR_OUTPUT_PREFIX .buffer-byte-size-threshold-for-flush:4096}" )
139
+ val bufferByteSizeThresholdForFlush : Int ,
131
140
) : OutputConsumer {
132
- override val emittedAt: Instant = Instant .now()
141
+ override val emittedAt: Instant = Instant .now(clock)
142
+
133
143
private val buffer = ByteArrayOutputStream ()
134
144
private val jsonGenerator: JsonGenerator = Jsons .createGenerator(buffer)
135
145
private val sequenceWriter: SequenceWriter = Jsons .writer().writeValues(jsonGenerator)
@@ -173,7 +183,8 @@ private class StdoutOutputConsumer(
173
183
174
184
private fun withLockFlush () {
175
185
if (buffer.size() > 0 ) {
176
- println (buffer.toString(Charsets .UTF_8 ))
186
+ stdout.println (buffer.toString(Charsets .UTF_8 ))
187
+ stdout.flush()
177
188
buffer.reset()
178
189
}
179
190
}
@@ -277,3 +288,15 @@ private class RecordTemplate(
277
288
private val splitRegex: Regex = Regex .fromLiteral(" $DATA_PREFIX {}" )
278
289
}
279
290
}
291
+
292
+ @Factory
293
+ private class PrintStreamFactory {
294
+
295
+ @Singleton @Requires(notEnv = [Environment .TEST ]) fun stdout (): PrintStream = System .out
296
+
297
+ @Singleton
298
+ @Requires(env = [Environment .TEST ])
299
+ @Requires(property = CONNECTOR_OUTPUT_FILE )
300
+ fun file (@Value(" \$ {$CONNECTOR_OUTPUT_FILE }" ) filePath : Path ): PrintStream =
301
+ PrintStream (FileOutputStream (filePath.toFile()), false , Charsets .UTF_8 )
302
+ }
0 commit comments