Skip to content

Commit 0d4adbb

Browse files
authored
Remove the custom implementation of buffered input stream reader (#36673)
1 parent 7c21c1b commit 0d4adbb

File tree

4 files changed

+32
-68
lines changed

4 files changed

+32
-68
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th
144144

145145
| Version | Date | Pull Request | Subject |
146146
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
147+
| 0.28.21 | 2024-04-02 | [\#36673](https://github.com/airbytehq/airbyte/pull/36673) | Change the destination message parsing to use standard java/kotlin classes. Adds logging to catch empty lines. |
147148
| 0.28.20 | 2024-04-01 | [\#36584](https://github.com/airbytehq/airbyte/pull/36584) | Changes to make source-postgres compileable |
148149
| 0.28.19 | 2024-03-29 | [\#36619](https://github.com/airbytehq/airbyte/pull/36619) | Changes to make destination-postgres compileable |
149150
| 0.28.19 | 2024-03-29 | [\#36588](https://github.com/airbytehq/airbyte/pull/36588) | Changes to make destination-redshift compileable |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

+28-61
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ internal constructor(
261261
}
262262
messageIterator.forEachRemaining(recordCollector)
263263
messageIterator.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
264-
LOGGER.debug("Finished producing messages for stream {}...")
264+
LOGGER.debug("Finished producing messages for stream {}...", s)
265265
}
266266
}
267267

@@ -373,67 +373,34 @@ internal constructor(
373373

374374
private lateinit var validator: JsonSchemaValidator
375375

376-
@VisibleForTesting
377376
@Throws(Exception::class)
378-
fun consumeWriteStream(consumer: SerializedAirbyteMessageConsumer) {
379-
BufferedInputStream(System.`in`).use { bis ->
380-
ByteArrayOutputStream().use { baos -> consumeWriteStream(consumer, bis, baos) }
381-
}
382-
}
383-
384-
@VisibleForTesting
385-
@Throws(Exception::class)
386-
fun consumeWriteStream(
377+
internal fun consumeWriteStream(
387378
consumer: SerializedAirbyteMessageConsumer,
388-
bis: BufferedInputStream,
389-
baos: ByteArrayOutputStream
379+
inputStream: InputStream = System.`in`
390380
) {
381+
LOGGER.info("Starting buffered read of input stream")
391382
consumer.start()
392-
393-
val buffer = ByteArray(8192) // 8K buffer
394-
var bytesRead: Int
395-
var lastWasNewLine = false
396-
397-
while ((bis.read(buffer).also { bytesRead = it }) != -1) {
398-
for (i in 0 until bytesRead) {
399-
val b = buffer[i]
400-
if (b == '\n'.code.toByte() || b == '\r'.code.toByte()) {
401-
if (!lastWasNewLine && baos.size() > 0) {
402-
consumer.accept(baos.toString(StandardCharsets.UTF_8), baos.size())
403-
baos.reset()
404-
}
405-
lastWasNewLine = true
383+
inputStream.bufferedReader(StandardCharsets.UTF_8).use {
384+
var emptyLines = 0
385+
it.lines().forEach { line: String ->
386+
if (line.isNotEmpty()) {
387+
consumer.accept(line, line.toByteArray(StandardCharsets.UTF_8).size)
406388
} else {
407-
baos.write(b.toInt())
408-
lastWasNewLine = false
389+
emptyLines++
390+
// We've occasionally seen this loop not exit
391+
// maybe it's because we keep getting streams of empty lines?
392+
// TODO: Monitor the logs for occurrences of this log line and if this isn't
393+
// an issue, remove it.
394+
if (emptyLines % 1_000 == 0 && emptyLines < 10_000) {
395+
LOGGER.warn("Encountered $emptyLines empty lines during execution")
396+
}
409397
}
410398
}
399+
if (emptyLines > 0) {
400+
LOGGER.warn("Encountered $emptyLines empty lines in the input stream.")
401+
}
411402
}
412-
413-
// Handle last line if there's one
414-
if (baos.size() > 0) {
415-
consumer.accept(baos.toString(StandardCharsets.UTF_8), baos.size())
416-
}
417-
}
418-
419-
/**
420-
* Stops any non-daemon threads that could block the JVM from exiting when the main thread
421-
* is done.
422-
*
423-
* If any active non-daemon threads would be left as orphans, this method will schedule some
424-
* interrupt/exit hooks after giving it some time delay to close up properly. It is
425-
* generally preferred to have a proper closing sequence from children threads instead of
426-
* interrupting or force exiting the process, so this mechanism serve as a fallback while
427-
* surfacing warnings in logs for maintainers to fix the code behavior instead.
428-
*/
429-
fun stopOrphanedThreads() {
430-
stopOrphanedThreads(
431-
EXIT_HOOK,
432-
INTERRUPT_THREAD_DELAY_MINUTES,
433-
TimeUnit.MINUTES,
434-
EXIT_THREAD_DELAY_MINUTES,
435-
TimeUnit.MINUTES
436-
)
403+
LOGGER.info("Finished buffered read of input stream")
437404
}
438405

439406
/**
@@ -455,11 +422,11 @@ internal constructor(
455422
*/
456423
@VisibleForTesting
457424
fun stopOrphanedThreads(
458-
exitHook: Runnable,
459-
interruptTimeDelay: Int,
460-
interruptTimeUnit: TimeUnit?,
461-
exitTimeDelay: Int,
462-
exitTimeUnit: TimeUnit?
425+
exitHook: Runnable = EXIT_HOOK,
426+
interruptTimeDelay: Int = INTERRUPT_THREAD_DELAY_MINUTES,
427+
interruptTimeUnit: TimeUnit = TimeUnit.MINUTES,
428+
exitTimeDelay: Int = EXIT_THREAD_DELAY_MINUTES,
429+
exitTimeUnit: TimeUnit = TimeUnit.MINUTES
463430
) {
464431
val currentThread = Thread.currentThread()
465432

@@ -468,7 +435,7 @@ internal constructor(
468435
.stream()
469436
.filter(ORPHANED_THREAD_FILTER)
470437
.collect(Collectors.toList())
471-
if (!runningThreads.isEmpty()) {
438+
if (runningThreads.isNotEmpty()) {
472439
LOGGER.warn(
473440
"""
474441
The main thread is exiting while children non-daemon threads from a connector are still active.
@@ -535,7 +502,7 @@ internal constructor(
535502
operationType: String
536503
) {
537504
val validationResult = validator.validate(schemaJson, objectJson)
538-
if (!validationResult.isEmpty()) {
505+
if (validationResult.isNotEmpty()) {
539506
throw Exception(
540507
String.format(
541508
"Verification error(s) occurred for %s. Errors: %s ",
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.28.20
1+
version=0.28.21
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import java.util.*
99
import org.junit.jupiter.api.Assertions
1010
import org.junit.jupiter.api.Test
1111

12-
class IntegrationRunnerBackwardsCompatbilityTest {
12+
class IntegrationRunnerBackwardsCompatabilityTest {
1313
@Test
1414
@Throws(Exception::class)
1515
fun testByteArrayInputStreamVersusScanner() {
@@ -31,11 +31,7 @@ class IntegrationRunnerBackwardsCompatbilityTest {
3131
val stream1: InputStream =
3232
ByteArrayInputStream(testInput.toByteArray(StandardCharsets.UTF_8))
3333
val consumer2 = MockConsumer()
34-
BufferedInputStream(stream1).use { bis ->
35-
ByteArrayOutputStream().use { baos ->
36-
IntegrationRunner.consumeWriteStream(consumer2, bis, baos)
37-
}
38-
}
34+
IntegrationRunner.consumeWriteStream(consumer2, stream1)
3935
val newOutput = consumer2.getOutput()
4036

4137
// get old output

0 commit comments

Comments
 (0)