Skip to content

Commit c326286

Browse files
debuggability improvements to the CDK (#37746)
A few changes to improve debuggability: 1) We now print the stack trace that created an orphan thread, in addition to the current stack trace. 2) we add the ability for a connector to tell the integration runner to not care about certain threads (by adding a thread filter) 3) We change the log of connector containers to include the name of the test that created it (usually each test will create and close a container) 4) We also log when the timeout timer gets triggered, to try to understand why some tests are still taking way more time than their timeout should allow Another change that was initially in there but has been removed and might be worth thinking about: Today, only Write checks for live threads on shutdown. We might want to do that for other operations as well
1 parent 45c615f commit c326286

File tree

10 files changed

+146
-74
lines changed

10 files changed

+146
-74
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

+62-23
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ import io.airbyte.protocol.models.v0.AirbyteMessage
2525
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
2626
import io.airbyte.validation.json.JsonSchemaValidator
2727
import java.io.*
28+
import java.lang.reflect.Method
2829
import java.nio.charset.StandardCharsets
2930
import java.nio.file.Path
31+
import java.time.Instant
3032
import java.util.*
3133
import java.util.concurrent.*
3234
import java.util.function.Consumer
33-
import java.util.function.Predicate
34-
import java.util.stream.Collectors
3535
import org.apache.commons.lang3.ThreadUtils
3636
import org.apache.commons.lang3.concurrent.BasicThreadFactory
3737
import org.slf4j.Logger
@@ -84,6 +84,7 @@ internal constructor(
8484
(destination != null) xor (source != null),
8585
"can only pass in a destination or a source"
8686
)
87+
threadCreationInfo.set(ThreadCreationInfo())
8788
this.cliParser = cliParser
8889
this.outputRecordCollector = outputRecordCollector
8990
// integration iface covers the commands that are the same for both source and destination.
@@ -189,17 +190,20 @@ internal constructor(
189190
}
190191
}
191192
Command.WRITE -> {
192-
val config = parseConfig(parsed.getConfigPath())
193-
validateConfig(integration.spec().connectionSpecification, config, "WRITE")
194-
// save config to singleton
195-
DestinationConfig.Companion.initialize(
196-
config,
197-
(integration as Destination).isV2Destination
198-
)
199-
val catalog =
200-
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)!!
201-
202193
try {
194+
val config = parseConfig(parsed.getConfigPath())
195+
validateConfig(integration.spec().connectionSpecification, config, "WRITE")
196+
// save config to singleton
197+
DestinationConfig.Companion.initialize(
198+
config,
199+
(integration as Destination).isV2Destination
200+
)
201+
val catalog =
202+
parseConfig(
203+
parsed.getCatalogPath(),
204+
ConfiguredAirbyteCatalog::class.java
205+
)!!
206+
203207
destination!!
204208
.getSerializedMessageConsumer(config, catalog, outputRecordCollector)
205209
.use { consumer -> consumeWriteStream(consumer!!) }
@@ -339,11 +343,37 @@ internal constructor(
339343
}
340344
}
341345

346+
class ThreadCreationInfo {
347+
val stack: List<StackTraceElement> = Thread.currentThread().stackTrace.asList()
348+
val time: Instant = Instant.now()
349+
override fun toString(): String {
350+
return "creationStack=${stack.joinToString("\n ")}\ncreationTime=$time"
351+
}
352+
}
353+
342354
companion object {
343355
private val LOGGER: Logger = LoggerFactory.getLogger(IntegrationRunner::class.java)
356+
private val threadCreationInfo: InheritableThreadLocal<ThreadCreationInfo> =
357+
object : InheritableThreadLocal<ThreadCreationInfo>() {
358+
override fun childValue(parentValue: ThreadCreationInfo): ThreadCreationInfo {
359+
return ThreadCreationInfo()
360+
}
361+
}
344362

345363
const val TYPE_AND_DEDUPE_THREAD_NAME: String = "type-and-dedupe"
346364

365+
// ThreadLocal.get(Thread) is private. So we open it and keep a reference to the
366+
// opened method
367+
private val getMethod: Method =
368+
ThreadLocal::class.java.getDeclaredMethod("get", Thread::class.java).also {
369+
it.isAccessible = true
370+
}
371+
372+
@JvmStatic
373+
fun getThreadCreationInfo(thread: Thread): ThreadCreationInfo {
374+
return getMethod.invoke(threadCreationInfo, thread) as ThreadCreationInfo
375+
}
376+
347377
/**
348378
* Filters threads that should not be considered when looking for orphaned threads at
349379
* shutdown of the integration runner.
@@ -353,11 +383,12 @@ internal constructor(
353383
* active so long as the database connection pool is open.
354384
*/
355385
@VisibleForTesting
356-
val ORPHANED_THREAD_FILTER: Predicate<Thread> = Predicate { runningThread: Thread ->
357-
(runningThread.name != Thread.currentThread().name &&
358-
!runningThread.isDaemon &&
359-
TYPE_AND_DEDUPE_THREAD_NAME != runningThread.name)
360-
}
386+
private val orphanedThreadPredicates: MutableList<(Thread) -> Boolean> =
387+
mutableListOf({ runningThread: Thread ->
388+
(runningThread.name != Thread.currentThread().name &&
389+
!runningThread.isDaemon &&
390+
TYPE_AND_DEDUPE_THREAD_NAME != runningThread.name)
391+
})
361392

362393
const val INTERRUPT_THREAD_DELAY_MINUTES: Int = 1
363394
const val EXIT_THREAD_DELAY_MINUTES: Int = 2
@@ -398,6 +429,15 @@ internal constructor(
398429
LOGGER.info("Finished buffered read of input stream")
399430
}
400431

432+
@JvmStatic
433+
fun addOrphanedThreadFilter(predicate: (Thread) -> (Boolean)) {
434+
orphanedThreadPredicates.add(predicate)
435+
}
436+
437+
fun filterOrphanedThread(thread: Thread): Boolean {
438+
return orphanedThreadPredicates.all { it(thread) }
439+
}
440+
401441
/**
402442
* Stops any non-daemon threads that could block the JVM from exiting when the main thread
403443
* is done.
@@ -425,11 +465,7 @@ internal constructor(
425465
) {
426466
val currentThread = Thread.currentThread()
427467

428-
val runningThreads =
429-
ThreadUtils.getAllThreads()
430-
.stream()
431-
.filter(ORPHANED_THREAD_FILTER)
432-
.collect(Collectors.toList())
468+
val runningThreads = ThreadUtils.getAllThreads().filter(::filterOrphanedThread).toList()
433469
if (runningThreads.isNotEmpty()) {
434470
LOGGER.warn(
435471
"""
@@ -450,7 +486,10 @@ internal constructor(
450486
.build()
451487
)
452488
for (runningThread in runningThreads) {
453-
val str = "Active non-daemon thread: " + dumpThread(runningThread)
489+
val str =
490+
"Active non-daemon thread: " +
491+
dumpThread(runningThread) +
492+
"\ncreationStack=${getThreadCreationInfo(runningThread)}"
454493
LOGGER.warn(str)
455494
// even though the main thread is already shutting down, we still leave some
456495
// chances to the children
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.31.5
1+
version=0.31.6

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ ${Jsons.serialize(message2)}""".toByteArray(
477477
val runningThreads =
478478
ThreadUtils.getAllThreads()
479479
.stream()
480-
.filter(IntegrationRunner.ORPHANED_THREAD_FILTER)
480+
.filter(IntegrationRunner::filterOrphanedThread)
481481
.collect(Collectors.toList())
482482
// all threads should be interrupted
483483
Assertions.assertEquals(listOf<Any>(), runningThreads)
@@ -505,7 +505,7 @@ ${Jsons.serialize(message2)}""".toByteArray(
505505
val runningThreads =
506506
ThreadUtils.getAllThreads()
507507
.stream()
508-
.filter(IntegrationRunner.ORPHANED_THREAD_FILTER)
508+
.filter(IntegrationRunner::filterOrphanedThread)
509509
.collect(Collectors.toList())
510510
// a thread that refuses to be interrupted should remain
511511
Assertions.assertEquals(1, runningThreads.size)

airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt

+31-19
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,18 @@ class LoggingInvocationInterceptor : InvocationInterceptor {
4646
ExtensionContext::class.java
4747
) == null
4848
) {
49-
LOGGER!!.error(
49+
LOGGER.error(
5050
"Junit LoggingInvocationInterceptor executing unknown interception point {}",
5151
method.name
5252
)
53-
return method.invoke(proxy, *(args!!))
53+
return method.invoke(proxy, *(args))
5454
}
55-
val invocation = args!![0] as InvocationInterceptor.Invocation<*>?
56-
val invocationContext = args[1] as ReflectiveInvocationContext<*>?
55+
val invocation = args[0] as InvocationInterceptor.Invocation<*>?
56+
val invocationContext = args[1] as ReflectiveInvocationContext<*>
5757
val extensionContext = args[2] as ExtensionContext?
5858
val methodName = method.name
59-
val logLineSuffix: String?
60-
val methodMatcher = methodPattern!!.matcher(methodName)
59+
val logLineSuffix: String
60+
val methodMatcher = methodPattern.matcher(methodName)
6161
if (methodName == "interceptDynamicTest") {
6262
logLineSuffix =
6363
"execution of DynamicTest %s".formatted(extensionContext!!.displayName)
@@ -66,12 +66,19 @@ class LoggingInvocationInterceptor : InvocationInterceptor {
6666
"instance creation for %s".formatted(invocationContext!!.targetClass)
6767
} else if (methodMatcher.matches()) {
6868
val interceptedEvent = methodMatcher.group(1)
69+
val methodRealClassName = invocationContext!!.executable!!.declaringClass.simpleName
70+
val methodName = invocationContext.executable!!.name
71+
val targetClassName = invocationContext!!.targetClass.simpleName
72+
val methodDisplayName =
73+
if (targetClassName == methodRealClassName) methodName
74+
else "$methodName($methodRealClassName)"
6975
logLineSuffix =
7076
"execution of @%s method %s.%s".formatted(
7177
interceptedEvent,
72-
invocationContext!!.executable!!.declaringClass.simpleName,
73-
invocationContext.executable!!.name
78+
targetClassName,
79+
methodDisplayName
7480
)
81+
TestContext.CURRENT_TEST_NAME.set("$targetClassName.$methodName")
7582
} else {
7683
logLineSuffix = "execution of unknown intercepted call %s".formatted(methodName)
7784
}
@@ -81,15 +88,15 @@ class LoggingInvocationInterceptor : InvocationInterceptor {
8188
try {
8289
val timeout = getTimeout(invocationContext)
8390
if (timeout != null) {
84-
LOGGER!!.info(
91+
LOGGER.info(
8592
"Junit starting {} with timeout of {}",
8693
logLineSuffix,
8794
DurationFormatUtils.formatDurationWords(timeout.toMillis(), true, true)
8895
)
8996
Timer("TimeoutTimer-" + currentThread.name, true)
9097
.schedule(timeoutTask, timeout.toMillis())
9198
} else {
92-
LOGGER!!.warn("Junit starting {} with no timeout", logLineSuffix)
99+
LOGGER.warn("Junit starting {} with no timeout", logLineSuffix)
93100
}
94101
val retVal = invocation!!.proceed()
95102
val elapsedMs = Duration.between(start, Instant.now()).toMillis()
@@ -136,7 +143,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor {
136143
}
137144
}
138145
val stackTrace = StringUtils.join(stackToDisplay, "\n ")
139-
LOGGER!!.error(
146+
LOGGER.error(
140147
"Junit exception throw during {} after {}:\n{}",
141148
logLineSuffix,
142149
DurationFormatUtils.formatDurationWords(elapsedMs, true, true),
@@ -145,24 +152,29 @@ class LoggingInvocationInterceptor : InvocationInterceptor {
145152
throw t1
146153
} finally {
147154
timeoutTask.cancel()
155+
TestContext.CURRENT_TEST_NAME.set(null)
148156
}
149157
}
150158

151-
private class TimeoutInteruptor(private val parentThread: Thread?) : TimerTask() {
159+
private class TimeoutInteruptor(private val parentThread: Thread) : TimerTask() {
152160
@Volatile var wasTriggered: Boolean = false
153161

154162
override fun run() {
163+
LOGGER.info(
164+
"interrupting running task on ${parentThread.name}. Current Stacktrace is ${parentThread.stackTrace.asList()}"
165+
)
155166
wasTriggered = true
156-
parentThread!!.interrupt()
167+
parentThread.interrupt()
157168
}
158169

159170
override fun cancel(): Boolean {
171+
LOGGER.info("cancelling timer task on ${parentThread.name}")
160172
return super.cancel()
161173
}
162174
}
163175

164176
companion object {
165-
private val methodPattern: Pattern? = Pattern.compile("intercept(.*)Method")
177+
private val methodPattern: Pattern = Pattern.compile("intercept(.*)Method")
166178

167179
private val PATTERN: Pattern =
168180
Pattern.compile(
@@ -201,11 +213,11 @@ class LoggingInvocationInterceptor : InvocationInterceptor {
201213
)
202214
}
203215

204-
private fun getTimeout(invocationContext: ReflectiveInvocationContext<*>?): Duration? {
216+
private fun getTimeout(invocationContext: ReflectiveInvocationContext<*>): Duration {
205217
var timeout: Duration? = null
206-
var m = invocationContext!!.executable
218+
var m = invocationContext.executable
207219
if (m is Method) {
208-
var timeoutAnnotation: Timeout? = m.getAnnotation<Timeout?>(Timeout::class.java)
220+
var timeoutAnnotation: Timeout? = m.getAnnotation(Timeout::class.java)
209221
if (timeoutAnnotation == null) {
210222
timeoutAnnotation =
211223
invocationContext.targetClass.getAnnotation(Timeout::class.java)
@@ -328,9 +340,9 @@ class LoggingInvocationInterceptor : InvocationInterceptor {
328340
}
329341

330342
companion object {
331-
private val LOGGER: Logger? =
343+
private val LOGGER: Logger =
332344
LoggerFactory.getLogger(LoggingInvocationInterceptor::class.java)
333-
private val JUNIT_METHOD_EXECUTION_TIMEOUT_PROPERTY_NAME: String? =
345+
private val JUNIT_METHOD_EXECUTION_TIMEOUT_PROPERTY_NAME: String =
334346
"JunitMethodExecutionTimeout"
335347
}
336348
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.extensions
6+
7+
object TestContext {
8+
val CURRENT_TEST_NAME: ThreadLocal<String?> = ThreadLocal()
9+
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt

+15-13
Original file line numberDiff line numberDiff line change
@@ -1469,7 +1469,7 @@ abstract class DestinationAcceptanceTest {
14691469
}
14701470

14711471
/** Whether the destination should be tested against different namespaces. */
1472-
protected open fun supportNamespaceTest(): Boolean {
1472+
open protected fun supportNamespaceTest(): Boolean {
14731473
return false
14741474
}
14751475

@@ -1571,19 +1571,21 @@ abstract class DestinationAcceptanceTest {
15711571
}
15721572

15731573
protected val destination: AirbyteDestination
1574-
get() =
1575-
DefaultAirbyteDestination(
1576-
AirbyteIntegrationLauncher(
1577-
JOB_ID,
1578-
JOB_ATTEMPT,
1579-
imageName,
1580-
processFactory,
1581-
null,
1582-
null,
1583-
false,
1584-
EnvVariableFeatureFlags()
1585-
)
1574+
get() {
1575+
return DefaultAirbyteDestination(
1576+
integrationLauncher =
1577+
AirbyteIntegrationLauncher(
1578+
JOB_ID,
1579+
JOB_ATTEMPT,
1580+
imageName,
1581+
processFactory,
1582+
null,
1583+
null,
1584+
false,
1585+
EnvVariableFeatureFlags()
1586+
)
15861587
)
1588+
}
15871589

15881590
@Throws(Exception::class)
15891591
protected fun runSyncAndVerifyStateOutput(

0 commit comments

Comments
 (0)