Skip to content

Commit 6d74db7

Browse files
change signature of orphanedThreadFilter (#45469)
- Introduced a new `OrphanedThreadInfo` data class to encapsulate thread information to avoid data races when getting thread-specific information (like stacktrace, or threadLocal values) - Updated thread filtering and logging to use `OrphanedThreadInfo` - Enhanced logging for timeout tasks in `LoggingInvocationInterceptor` - fixed race condition in `AirbyteTraceMessageUtilityTest` - improved test logging to add the running tests for the destination containers - Modified `TestContext` to use a default value for `CURRENT_TEST_NAME`
1 parent 6469111 commit 6d74db7

File tree

9 files changed

+126
-58
lines changed

9 files changed

+126
-58
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:-----------|:-----------| :----------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.45.0 | 2024-09-16 | [\#45469](https://github.com/airbytehq/airbyte/pull/45469) | Fix some race conditions, improve thread filtering, improve test logging |
177178
| 0.44.22 | 2024-09-10 | [\#45368](https://github.com/airbytehq/airbyte/pull/45368) | Remove excessive debezium logging |
178179
| 0.44.21 | 2024-09-04 | [\#45143](https://github.com/airbytehq/airbyte/pull/45143) | S3-destination: don't overwrite existing files, skip those file indexes instead |
179180
| 0.44.20 | 2024-08-30 | [\#44933](https://github.com/airbytehq/airbyte/pull/44933) | Avro/Parquet destinations: handle `{}` schemas inside objects/arrays |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/DefaultJdbcDatabase.kt

+22-13
Original file line numberDiff line numberDiff line change
@@ -116,19 +116,28 @@ constructor(
116116
recordTransform: CheckedFunction<ResultSet, T, SQLException>
117117
): Stream<T> {
118118
val connection = dataSource.connection
119-
return JdbcDatabase.Companion.toUnsafeStream<T>(
120-
statementCreator.apply(connection).executeQuery(),
121-
recordTransform
122-
)
123-
.onClose(
124-
Runnable {
125-
try {
126-
LOGGER.info { "closing connection" }
127-
connection.close()
128-
} catch (e: SQLException) {
129-
throw RuntimeException(e)
119+
try {
120+
return JdbcDatabase.Companion.toUnsafeStream<T>(
121+
statementCreator.apply(connection).executeQuery(),
122+
recordTransform
123+
)
124+
.onClose(
125+
Runnable {
126+
try {
127+
LOGGER.info { "closing connection" }
128+
connection.close()
129+
} catch (e: SQLException) {
130+
throw RuntimeException(e)
131+
}
130132
}
131-
}
132-
)
133+
)
134+
} catch (e: Throwable) {
135+
// this is ugly because we usually don't close the connection here.
136+
// We expect the calleer to close the returned stream, which will call the onClose
137+
// but if the executeQuery threw an exception, we still need to close the connection
138+
LOGGER.warn(e) { "closing connection because of an Exception" }
139+
connection.close()
140+
throw e
141+
}
133142
}
134143
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

+54-29
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,45 @@ internal constructor(
316316
}
317317
}
318318

319+
data class OrphanedThreadInfo
320+
private constructor(
321+
val thread: Thread,
322+
val threadCreationInfo: ThreadCreationInfo,
323+
val lastStackTrace: List<StackTraceElement>
324+
) {
325+
fun getLogString(): String {
326+
return String.format(
327+
"%s (%s)\n Thread stacktrace: %s",
328+
thread.name,
329+
thread.state,
330+
lastStackTrace.joinToString("\n at ")
331+
)
332+
}
333+
334+
companion object {
335+
fun getAll(): List<OrphanedThreadInfo> {
336+
return ThreadUtils.getAllThreads().mapNotNull { getForThread(it) }
337+
}
338+
339+
fun getForThread(thread: Thread): OrphanedThreadInfo? {
340+
val threadCreationInfo =
341+
getMethod.invoke(threadCreationInfo, thread) as ThreadCreationInfo?
342+
val stack = thread.stackTrace.asList()
343+
if (threadCreationInfo == null) {
344+
return null
345+
}
346+
return OrphanedThreadInfo(thread, threadCreationInfo, stack)
347+
}
348+
349+
// ThreadLocal.get(Thread) is private. So we open it and keep a reference to the
350+
// opened method
351+
private val getMethod: Method =
352+
ThreadLocal::class.java.getDeclaredMethod("get", Thread::class.java).also {
353+
it.isAccessible = true
354+
}
355+
}
356+
}
357+
319358
class ThreadCreationInfo {
320359
val stack: List<StackTraceElement> = Thread.currentThread().stackTrace.asList()
321360
val time: Instant = Instant.now()
@@ -327,25 +366,13 @@ internal constructor(
327366
companion object {
328367
private val threadCreationInfo: InheritableThreadLocal<ThreadCreationInfo> =
329368
object : InheritableThreadLocal<ThreadCreationInfo>() {
330-
override fun childValue(parentValue: ThreadCreationInfo): ThreadCreationInfo {
369+
override fun childValue(parentValue: ThreadCreationInfo?): ThreadCreationInfo {
331370
return ThreadCreationInfo()
332371
}
333372
}
334373

335374
const val TYPE_AND_DEDUPE_THREAD_NAME: String = "type-and-dedupe"
336375

337-
// ThreadLocal.get(Thread) is private. So we open it and keep a reference to the
338-
// opened method
339-
private val getMethod: Method =
340-
ThreadLocal::class.java.getDeclaredMethod("get", Thread::class.java).also {
341-
it.isAccessible = true
342-
}
343-
344-
@JvmStatic
345-
fun getThreadCreationInfo(thread: Thread): ThreadCreationInfo? {
346-
return getMethod.invoke(threadCreationInfo, thread) as ThreadCreationInfo?
347-
}
348-
349376
/**
350377
* Filters threads that should not be considered when looking for orphaned threads at
351378
* shutdown of the integration runner.
@@ -355,11 +382,11 @@ internal constructor(
355382
* active so long as the database connection pool is open.
356383
*/
357384
@VisibleForTesting
358-
private val orphanedThreadPredicates: MutableList<(Thread) -> Boolean> =
359-
mutableListOf({ runningThread: Thread ->
360-
(runningThread.name != Thread.currentThread().name &&
361-
!runningThread.isDaemon &&
362-
TYPE_AND_DEDUPE_THREAD_NAME != runningThread.name)
385+
private val orphanedThreadPredicates: MutableList<(OrphanedThreadInfo) -> Boolean> =
386+
mutableListOf({ runningThreadInfo: OrphanedThreadInfo ->
387+
(runningThreadInfo.thread.name != Thread.currentThread().name &&
388+
!runningThreadInfo.thread.isDaemon &&
389+
TYPE_AND_DEDUPE_THREAD_NAME != runningThreadInfo.thread.name)
363390
})
364391

365392
const val INTERRUPT_THREAD_DELAY_MINUTES: Int = 1
@@ -402,12 +429,12 @@ internal constructor(
402429
}
403430

404431
@JvmStatic
405-
fun addOrphanedThreadFilter(predicate: (Thread) -> (Boolean)) {
432+
fun addOrphanedThreadFilter(predicate: (OrphanedThreadInfo) -> (Boolean)) {
406433
orphanedThreadPredicates.add(predicate)
407434
}
408435

409-
fun filterOrphanedThread(thread: Thread): Boolean {
410-
return orphanedThreadPredicates.all { it(thread) }
436+
fun filterOrphanedThread(threadInfo: OrphanedThreadInfo): Boolean {
437+
return orphanedThreadPredicates.all { it(threadInfo) }
411438
}
412439

413440
/**
@@ -437,8 +464,8 @@ internal constructor(
437464
) {
438465
val currentThread = Thread.currentThread()
439466

440-
val runningThreads = ThreadUtils.getAllThreads().filter(::filterOrphanedThread)
441-
if (runningThreads.isNotEmpty()) {
467+
val runningThreadInfos = OrphanedThreadInfo.getAll().filter(::filterOrphanedThread)
468+
if (runningThreadInfos.isNotEmpty()) {
442469
LOGGER.warn {
443470
"""
444471
The main thread is exiting while children non-daemon threads from a connector are still active.
@@ -457,18 +484,15 @@ internal constructor(
457484
.daemon(true)
458485
.build()
459486
)
460-
for (runningThread in runningThreads) {
461-
val str =
462-
"Active non-daemon thread: " +
463-
dumpThread(runningThread) +
464-
"\ncreationStack=${getThreadCreationInfo(runningThread)}"
487+
for (runningThreadInfo in runningThreadInfos) {
488+
val str = "Active non-daemon thread info: ${runningThreadInfo.getLogString()}"
465489
LOGGER.warn { str }
466490
// even though the main thread is already shutting down, we still leave some
467491
// chances to the children
468492
// threads to close properly on their own.
469493
// So, we schedule an interrupt hook after a fixed time delay instead...
470494
scheduledExecutorService.schedule(
471-
{ runningThread.interrupt() },
495+
{ runningThreadInfo.thread.interrupt() },
472496
interruptTimeDelay.toLong(),
473497
interruptTimeUnit
474498
)
@@ -493,6 +517,7 @@ internal constructor(
493517
}
494518

495519
private fun dumpThread(thread: Thread): String {
520+
OrphanedThreadInfo.getForThread(thread)
496521
return String.format(
497522
"%s (%s)\n Thread stacktrace: %s",
498523
thread.name,
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.44.23
1+
version=0.45.0

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.kt

+12-2
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,18 @@ class AirbyteTraceMessageUtilityTest {
4848
Mockito.mock(RuntimeException::class.java),
4949
"this is a config error"
5050
)
51-
val outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8))
52-
assertJsonNodeIsTraceMessage(outJson)
51+
val outCt = outContent.toString(StandardCharsets.UTF_8)
52+
var outJson: JsonNode? = null
53+
// because we are running tests in parallel, it's possible that another test is writing to
54+
// stdout while we run this test, in which case we'd see their messages.
55+
// we filter through the messages to find an error (hopefully hours)
56+
for (line in outCt.split('\n')) {
57+
if (line.contains("\"error\"")) {
58+
outJson = Jsons.deserialize(line)
59+
break
60+
}
61+
}
62+
assertJsonNodeIsTraceMessage(outJson!!)
5363
Assertions.assertEquals("config_error", outJson["trace"]["error"]["failure_type"].asText())
5464
}
5565

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt

+8-6
Original file line numberDiff line numberDiff line change
@@ -442,11 +442,12 @@ ${Jsons.serialize(message2)}""".toByteArray(
442442
} catch (e: Exception) {
443443
throw RuntimeException(e)
444444
}
445-
val runningThreads =
446-
ThreadUtils.getAllThreads().filter(IntegrationRunner::filterOrphanedThread)
445+
val runningThreadInfos =
446+
IntegrationRunner.OrphanedThreadInfo.getAll()
447+
.filter(IntegrationRunner::filterOrphanedThread)
447448

448449
// all threads should be interrupted
449-
Assertions.assertEquals(listOf<Any>(), runningThreads)
450+
Assertions.assertEquals(listOf<Any>(), runningThreadInfos)
450451
Assertions.assertEquals(1, caughtExceptions.size)
451452
}
452453

@@ -468,11 +469,12 @@ ${Jsons.serialize(message2)}""".toByteArray(
468469
throw RuntimeException(e)
469470
}
470471

471-
val runningThreads =
472-
ThreadUtils.getAllThreads().filter(IntegrationRunner::filterOrphanedThread)
472+
val runningThreadInfos =
473+
IntegrationRunner.OrphanedThreadInfo.getAll()
474+
.filter(IntegrationRunner::filterOrphanedThread)
473475

474476
// a thread that refuses to be interrupted should remain
475-
Assertions.assertEquals(1, runningThreads.size)
477+
Assertions.assertEquals(1, runningThreadInfos.size)
476478
Assertions.assertEquals(1, caughtExceptions.size)
477479
Assertions.assertTrue(exitCalled.get())
478480
}

airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt

+19-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import java.time.format.DateTimeParseException
1111
import java.util.*
1212
import java.util.concurrent.TimeUnit
1313
import java.util.concurrent.TimeoutException
14+
import java.util.concurrent.atomic.AtomicLong
1415
import java.util.regex.Pattern
1516
import kotlin.concurrent.Volatile
1617
import org.apache.commons.lang3.StringUtils
@@ -88,7 +89,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor {
8889
logLineSuffix = "execution of unknown intercepted call $methodName"
8990
}
9091
val currentThread = Thread.currentThread()
91-
val timeoutTask = TimeoutInteruptor(currentThread)
92+
val timeoutTask = TimeoutInteruptor(currentThread, logLineSuffix)
9293
val start = Instant.now()
9394
try {
9495
val timeout = reflectiveInvocationContext?.let(::getTimeout)
@@ -116,6 +117,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor {
116117
val elapsedMs = Duration.between(start, Instant.now()).toMillis()
117118
val t1: Throwable
118119
if (timeoutTask.wasTriggered) {
120+
LOGGER.info { "timeoutTask ${timeoutTask.id} was triggered." }
119121
val timeoutAsString =
120122
DurationFormatUtils.formatDurationWords(elapsedMs, true, true)
121123
t1 =
@@ -126,6 +128,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor {
126128
)
127129
t1.initCause(throwable)
128130
} else {
131+
LOGGER.info { "timeoutTask ${timeoutTask.id} was not triggered." }
129132
t1 = throwable
130133
}
131134
var belowCurrentCall = false
@@ -157,25 +160,36 @@ class LoggingInvocationInterceptor : InvocationInterceptor {
157160
throw t1
158161
} finally {
159162
timeoutTask.cancel()
160-
TestContext.CURRENT_TEST_NAME.set(null)
163+
TestContext.CURRENT_TEST_NAME.set(TestContext.NO_RUNNING_TEST)
161164
}
162165
}
163166

164-
private class TimeoutInteruptor(private val parentThread: Thread) : TimerTask() {
167+
private class TimeoutInteruptor(
168+
private val parentThread: Thread,
169+
private val context: String
170+
) : TimerTask() {
165171
@Volatile var wasTriggered: Boolean = false
172+
val id = timerIdentifier.incrementAndGet()
166173

167174
override fun run() {
168175
LOGGER.info(
169-
"interrupting running task on ${parentThread.name}. Current Stacktrace is ${parentThread.stackTrace.asList()}"
176+
"interrupting running task on ${parentThread.name}. " +
177+
"Current Stacktrace is ${parentThread.stackTrace.asList()}" +
178+
"TimeoutIterruptor $id interrupting running task on ${parentThread.name}: $context. " +
179+
"Current Stacktrace is ${parentThread.stackTrace.asList()}"
170180
)
171181
wasTriggered = true
172182
parentThread.interrupt()
173183
}
174184

175185
override fun cancel(): Boolean {
176-
LOGGER.info("cancelling timer task on ${parentThread.name}")
186+
LOGGER.info("cancelling TimeoutIterruptor $id on ${parentThread.name}")
177187
return super.cancel()
178188
}
189+
190+
companion object {
191+
private val timerIdentifier = AtomicLong(1)
192+
}
179193
}
180194

181195
companion object {

airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,11 @@
55
package io.airbyte.cdk.extensions
66

77
object TestContext {
8-
val CURRENT_TEST_NAME: ThreadLocal<String?> = ThreadLocal()
8+
const val NO_RUNNING_TEST = "NONE"
9+
val CURRENT_TEST_NAME: ThreadLocal<String> =
10+
object : ThreadLocal<String>() {
11+
override fun initialValue(): String {
12+
return NO_RUNNING_TEST
13+
}
14+
}
915
}

airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package io.airbyte.workers.internal
55

66
import com.google.common.base.Charsets
77
import com.google.common.base.Preconditions
8+
import io.airbyte.cdk.extensions.TestContext
89
import io.airbyte.commons.io.IOs
910
import io.airbyte.commons.io.LineGobbler
1011
import io.airbyte.commons.json.Jsons
@@ -182,7 +183,7 @@ constructor(
182183

183184
fun createContainerLogMdcBuilder(): MdcScope.Builder =
184185
MdcScope.Builder()
185-
.setLogPrefix("destination")
186+
.setLogPrefix("destination-${TestContext.CURRENT_TEST_NAME.get()}")
186187
.setPrefixColor(LoggingHelper.Color.YELLOW_BACKGROUND)
187188
val IGNORED_EXIT_CODES: Set<Int> =
188189
setOf(

0 commit comments

Comments
 (0)