Skip to content

Commit e297fc0

Browse files
author
Marius Posta
authored
bulk-cdk: add ExceptionClassifier, revamp exception handling (#44608)
1 parent c7a6731 commit e297fc0

File tree

8 files changed

+208
-74
lines changed

8 files changed

+208
-74
lines changed

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunnable.kt

+20-6
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
package io.airbyte.cdk
33

44
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
5+
import io.airbyte.cdk.output.ExceptionClassifier
56
import io.airbyte.cdk.output.OutputConsumer
6-
import io.airbyte.cdk.util.ApmTraceUtils
77
import io.github.oshai.kotlinlogging.KotlinLogging
88
import io.micronaut.context.annotation.Value
99
import jakarta.inject.Inject
10+
import jakarta.inject.Provider
1011

1112
private val log = KotlinLogging.logger {}
1213

@@ -15,18 +16,31 @@ private val log = KotlinLogging.logger {}
1516
class AirbyteConnectorRunnable : Runnable {
1617
@Value("\${airbyte.connector.metadata.docker-repository}") lateinit var connectorName: String
1718

18-
@Inject lateinit var operation: Operation
19+
@Inject lateinit var operationProvider: Provider<Operation>
1920

2021
@Inject lateinit var outputConsumer: OutputConsumer
2122

23+
@Inject lateinit var exceptionClassifier: ExceptionClassifier
24+
2225
override fun run() {
23-
log.info { "Executing ${operation::class} operation." }
26+
var operation: Operation? = null
2427
try {
28+
try {
29+
operation = operationProvider.get()!!
30+
} catch (e: Throwable) {
31+
throw ConfigErrorException("Failed to initialize connector operation", e)
32+
}
33+
log.info { "Executing ${operation::class} operation." }
2534
operation.execute()
2635
} catch (e: Throwable) {
27-
log.error(e) { "Failed ${operation::class} operation execution." }
28-
ApmTraceUtils.addExceptionToTrace(e)
29-
outputConsumer.acceptTraceOnConfigError(e)
36+
log.error(e) {
37+
if (operation == null) {
38+
"Failed connector operation initialization."
39+
} else {
40+
"Failed ${operation::class} operation execution."
41+
}
42+
}
43+
outputConsumer.accept(exceptionClassifier.handle(e))
3044
throw e
3145
} finally {
3246
log.info { "Flushing output consumer prior to shutdown." }

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/ConfigErrorException.kt

-21
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
package io.airbyte.cdk
5+
6+
/**
7+
* A [ConnectorErrorException] is an exception which readily maps to an
8+
* [io.airbyte.cdk.output.ConnectorError] object.
9+
*/
10+
sealed class ConnectorErrorException(
11+
displayMessage: String?,
12+
exception: Throwable?,
13+
) : RuntimeException(displayMessage, exception)
14+
15+
/** See [io.airbyte.cdk.output.ConfigError]. */
16+
class ConfigErrorException(
17+
displayMessage: String,
18+
exception: Throwable? = null,
19+
) : ConnectorErrorException(displayMessage, exception)
20+
21+
/** See [io.airbyte.cdk.output.TransientError]. */
22+
class TransientErrorException(
23+
displayMessage: String,
24+
exception: Throwable? = null,
25+
) : ConnectorErrorException(displayMessage, exception)
26+
27+
/** See [io.airbyte.cdk.output.SystemError]. */
28+
class SystemErrorException
29+
private constructor(
30+
displayMessage: String?,
31+
exception: Throwable? = null,
32+
) : ConnectorErrorException(displayMessage, exception)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
package io.airbyte.cdk.output
5+
6+
import io.airbyte.cdk.ConfigErrorException
7+
import io.airbyte.cdk.ConnectorErrorException
8+
import io.airbyte.cdk.SystemErrorException
9+
import io.airbyte.cdk.TransientErrorException
10+
import io.airbyte.cdk.util.ApmTraceUtils
11+
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
12+
import io.micronaut.context.annotation.DefaultImplementation
13+
import jakarta.inject.Singleton
14+
import org.apache.commons.lang3.exception.ExceptionUtils
15+
16+
@Singleton
17+
@DefaultImplementation(DefaultExceptionClassifier::class)
18+
fun interface ExceptionClassifier {
19+
20+
/** Classifies [e] into a [ConnectorError] if possible, null otherwise. */
21+
fun classify(e: Throwable): ConnectorError?
22+
23+
/** [SystemError] display message for [e] in case it can't be classified. */
24+
fun fallbackDisplayMessage(e: Throwable): String? = e.message
25+
26+
/** Maps [e] to a [AirbyteErrorTraceMessage] to be passed to the [OutputConsumer]. */
27+
fun handle(e: Throwable): AirbyteErrorTraceMessage {
28+
ApmTraceUtils.addExceptionToTrace(e)
29+
val connectorError: ConnectorError =
30+
DefaultExceptionClassifier().classify(e)
31+
?: classify(e) ?: SystemError(fallbackDisplayMessage(e) ?: e.message)
32+
val errorTraceMessage =
33+
AirbyteErrorTraceMessage()
34+
.withInternalMessage(e.toString())
35+
.withStackTrace(ExceptionUtils.getStackTrace(e))
36+
return when (connectorError) {
37+
is ConfigError ->
38+
errorTraceMessage
39+
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
40+
.withMessage(connectorError.displayMessage)
41+
is TransientError ->
42+
errorTraceMessage
43+
.withFailureType(AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
44+
.withMessage(connectorError.displayMessage)
45+
is SystemError ->
46+
errorTraceMessage
47+
.withFailureType(AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR)
48+
.withMessage(connectorError.displayMessage ?: e.message)
49+
}
50+
}
51+
}
52+
53+
/** Each [ConnectorError] subtype corresponds to a [AirbyteErrorTraceMessage.FailureType]. */
54+
sealed interface ConnectorError
55+
56+
/**
57+
* A [ConfigError] means there is something wrong with the user's connector configuration or any
58+
* other error for which the connector informs the platform that the error is not transient in
59+
* nature and thus to not bother retrying.
60+
*/
61+
data class ConfigError(val displayMessage: String) : ConnectorError
62+
63+
/**
64+
* A [TransientError] means there is something wrong with the user's source or any other error for
65+
* which the connector informs the platform that the error is transient in nature.
66+
*/
67+
data class TransientError(val displayMessage: String) : ConnectorError
68+
69+
/**
70+
* A [SystemError] means there is something wrong with the connector.
71+
*
72+
* In practice these are also ll errors that are neither [ConfigError] or [TransientError]. This is
73+
* whatever falls through the cracks of the [ExceptionClassifier], as such there is a standing goal
74+
* to minimize occurrences of these instances.
75+
*/
76+
data class SystemError(val displayMessage: String?) : ConnectorError
77+
78+
/** Default implementation of [ExceptionClassifier]. */
79+
@Singleton
80+
class DefaultExceptionClassifier : ExceptionClassifier {
81+
82+
override fun classify(e: Throwable): ConnectorError? {
83+
return when (val connectorErrorException: ConnectorErrorException? = unwind(e)) {
84+
is ConfigErrorException -> ConfigError(connectorErrorException.message!!)
85+
is TransientErrorException -> TransientError(connectorErrorException.message!!)
86+
is SystemErrorException -> SystemError(connectorErrorException.message)
87+
null -> null
88+
}
89+
}
90+
91+
/** Recursively walks the causes of [e] and returns the last [ConnectorErrorException]. */
92+
fun unwind(e: Throwable): ConnectorErrorException? {
93+
var connectorErrorException: ConnectorErrorException? = null
94+
var unwound: Throwable? = e
95+
while (unwound != null) {
96+
if (unwound is ConnectorErrorException) {
97+
connectorErrorException = unwound
98+
}
99+
unwound = unwound.cause
100+
}
101+
return connectorErrorException
102+
}
103+
}

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/OutputConsumer.kt

-13
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
22
package io.airbyte.cdk.output
33

4-
import io.airbyte.cdk.ConfigErrorException
54
import io.airbyte.cdk.util.Jsons
65
import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage
76
import io.airbyte.protocol.models.v0.AirbyteCatalog
@@ -21,7 +20,6 @@ import jakarta.inject.Singleton
2120
import java.io.ByteArrayOutputStream
2221
import java.time.Instant
2322
import java.util.function.Consumer
24-
import org.apache.commons.lang3.exception.ExceptionUtils
2523

2624
/** Emits the [AirbyteMessage] instances produced by the connector. */
2725
@DefaultImplementation(StdoutOutputConsumer::class)
@@ -89,17 +87,6 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
8987
.withAnalytics(analytics),
9088
)
9189
}
92-
93-
fun acceptTraceOnConfigError(e: Throwable) {
94-
val configErrorException: ConfigErrorException = ConfigErrorException.unwind(e) ?: return
95-
accept(
96-
AirbyteErrorTraceMessage()
97-
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
98-
.withMessage(configErrorException.message)
99-
.withInternalMessage(e.toString())
100-
.withStackTrace(ExceptionUtils.getStackTrace(e)),
101-
)
102-
}
10390
}
10491

10592
// Used for integration tests.

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt

+14-33
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
22
package io.airbyte.cdk.check
33

4+
import io.airbyte.cdk.ConfigErrorException
45
import io.airbyte.cdk.Operation
5-
import io.airbyte.cdk.command.*
6+
import io.airbyte.cdk.command.ConfigurationJsonObjectBase
7+
import io.airbyte.cdk.command.ConfigurationJsonObjectSupplier
8+
import io.airbyte.cdk.command.SourceConfiguration
9+
import io.airbyte.cdk.command.SourceConfigurationFactory
610
import io.airbyte.cdk.discover.MetadataQuerier
11+
import io.airbyte.cdk.output.ExceptionClassifier
712
import io.airbyte.cdk.output.OutputConsumer
8-
import io.airbyte.cdk.util.ApmTraceUtils
913
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
1014
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
1115
import io.github.oshai.kotlinlogging.KotlinLogging
1216
import io.micronaut.context.annotation.Requires
1317
import jakarta.inject.Singleton
14-
import java.sql.SQLException
15-
import org.apache.commons.lang3.exception.ExceptionUtils
1618

1719
@Singleton
1820
@Requires(property = Operation.PROPERTY, value = "check")
@@ -22,6 +24,7 @@ class CheckOperation<T : ConfigurationJsonObjectBase>(
2224
val configFactory: SourceConfigurationFactory<T, out SourceConfiguration>,
2325
val metadataQuerierFactory: MetadataQuerier.Factory<SourceConfiguration>,
2426
val outputConsumer: OutputConsumer,
27+
val exceptionClassifier: ExceptionClassifier,
2528
) : Operation {
2629
private val log = KotlinLogging.logger {}
2730

@@ -34,37 +37,15 @@ class CheckOperation<T : ConfigurationJsonObjectBase>(
3437
val config: SourceConfiguration = configFactory.make(pojo)
3538
log.info { "Connecting for config check." }
3639
metadataQuerierFactory.session(config).use { connectionCheck(it) }
37-
} catch (e: SQLException) {
38-
log.debug(e) { "SQLException while checking config." }
39-
val message: String =
40-
listOfNotNull(
41-
e.sqlState?.let { "State code: $it" },
42-
e.errorCode.takeIf { it != 0 }?.let { "Error code: $it" },
43-
e.message?.let { "Message: $it" },
44-
)
45-
.joinToString(separator = "; ")
46-
ApmTraceUtils.addExceptionToTrace(e)
47-
outputConsumer.accept(
48-
AirbyteErrorTraceMessage()
49-
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
50-
.withMessage(message)
51-
.withInternalMessage(e.toString())
52-
.withStackTrace(ExceptionUtils.getStackTrace(e)),
53-
)
54-
outputConsumer.accept(
55-
AirbyteConnectionStatus()
56-
.withMessage(message)
57-
.withStatus(AirbyteConnectionStatus.Status.FAILED),
58-
)
59-
log.info { "Config check failed." }
60-
return
6140
} catch (e: Exception) {
6241
log.debug(e) { "Exception while checking config." }
63-
ApmTraceUtils.addExceptionToTrace(e)
64-
outputConsumer.acceptTraceOnConfigError(e)
42+
val errorTraceMessage: AirbyteErrorTraceMessage = exceptionClassifier.handle(e)
43+
outputConsumer.accept(errorTraceMessage)
44+
val connectionStatusMessage: String =
45+
String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, errorTraceMessage.message)
6546
outputConsumer.accept(
6647
AirbyteConnectionStatus()
67-
.withMessage(String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, e.message))
48+
.withMessage(connectionStatusMessage)
6849
.withStatus(AirbyteConnectionStatus.Status.FAILED),
6950
)
7051
log.info { "Config check failed." }
@@ -101,9 +82,9 @@ class CheckOperation<T : ConfigurationJsonObjectBase>(
10182
}
10283
}
10384
if (n == 0) {
104-
throw RuntimeException("Discovered zero tables.")
85+
throw ConfigErrorException("Discovered zero tables.")
10586
} else {
106-
throw RuntimeException("Unable to query any of the $n discovered table(s).")
87+
throw ConfigErrorException("Unable to query any of the $n discovered table(s).")
10788
}
10889
}
10990

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.output
6+
7+
import io.airbyte.cdk.Operation
8+
import io.micronaut.context.annotation.Replaces
9+
import io.micronaut.context.annotation.Value
10+
import java.sql.SQLException
11+
12+
@Replaces(ExceptionClassifier::class)
13+
class JdbcExceptionClassifier(@Value("\${${Operation.PROPERTY}}") operationName: String) :
14+
ExceptionClassifier {
15+
16+
val isCheck: Boolean = operationName == "check"
17+
18+
override fun classify(e: Throwable): ConnectorError? =
19+
if (isCheck && e is SQLException) {
20+
ConfigError(sqlExceptionDisplayMessage(e))
21+
} else {
22+
null
23+
}
24+
25+
override fun fallbackDisplayMessage(e: Throwable): String? =
26+
when (e) {
27+
is SQLException -> sqlExceptionDisplayMessage(e)
28+
else -> null
29+
}
30+
31+
fun sqlExceptionDisplayMessage(e: SQLException): String =
32+
listOfNotNull(
33+
e.sqlState?.let { "State code: $it" },
34+
e.errorCode.takeIf { it != 0 }?.let { "Error code: $it" },
35+
e.message?.let { "Message: $it" },
36+
)
37+
.joinToString(separator = "; ")
38+
}

airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/h2source/H2SourceIntegrationTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class H2SourceIntegrationTest {
3535
port = h2.port
3636
database = h2.database + "_garbage"
3737
}
38-
SyncsTestFixture.testCheck(configPojo, "Error code: 90149")
38+
SyncsTestFixture.testCheck(configPojo, "Database \"mem:.*_garbage\" not found")
3939
}
4040
}
4141

0 commit comments

Comments
 (0)