Skip to content

Commit 70732ca

Browse files
author
Marius Posta
authored
bulk-cdk: add more exception classifier implementations, add extra checks (#44824)
1 parent 67e82e1 commit 70732ca

File tree

20 files changed

+690
-94
lines changed

20 files changed

+690
-94
lines changed

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunnable.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
package io.airbyte.cdk
33

44
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
5-
import io.airbyte.cdk.output.ExceptionClassifier
5+
import io.airbyte.cdk.output.ExceptionHandler
66
import io.airbyte.cdk.output.OutputConsumer
77
import io.github.oshai.kotlinlogging.KotlinLogging
88
import io.micronaut.context.annotation.Value
@@ -20,7 +20,7 @@ class AirbyteConnectorRunnable : Runnable {
2020

2121
@Inject lateinit var outputConsumer: OutputConsumer
2222

23-
@Inject lateinit var exceptionClassifier: ExceptionClassifier
23+
@Inject lateinit var exceptionHandler: ExceptionHandler
2424

2525
override fun run() {
2626
var operation: Operation? = null
@@ -40,7 +40,7 @@ class AirbyteConnectorRunnable : Runnable {
4040
"Failed ${operation::class} operation execution."
4141
}
4242
}
43-
outputConsumer.accept(exceptionClassifier.handle(e))
43+
outputConsumer.accept(exceptionHandler.handle(e))
4444
throw e
4545
} finally {
4646
log.info { "Flushing output consumer prior to shutdown." }

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/ConnectorErrorException.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ class TransientErrorException(
2525
) : ConnectorErrorException(displayMessage, exception)
2626

2727
/** See [io.airbyte.cdk.output.SystemError]. */
28-
class SystemErrorException
29-
private constructor(
28+
class SystemErrorException(
3029
displayMessage: String?,
3130
exception: Throwable? = null,
3231
) : ConnectorErrorException(displayMessage, exception)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.output
6+
7+
import io.airbyte.cdk.ConfigErrorException
8+
import io.airbyte.cdk.ConnectorErrorException
9+
import io.airbyte.cdk.SystemErrorException
10+
import io.airbyte.cdk.TransientErrorException
11+
import io.micronaut.context.annotation.ConfigurationProperties
12+
import io.micronaut.context.annotation.Value
13+
import jakarta.inject.Singleton
14+
15+
const val DEFAULT_CLASSIFIER_PREFIX = "${EXCEPTION_CLASSIFIER_PREFIX}.default"
16+
17+
/** Default implementation of [ExceptionClassifier]. */
18+
@Singleton
19+
@ConfigurationProperties(DEFAULT_CLASSIFIER_PREFIX)
20+
class DefaultExceptionClassifier(
21+
@Value("\${$DEFAULT_CLASSIFIER_PREFIX.order:1}") override val orderValue: Int
22+
) : ExceptionClassifier {
23+
24+
override fun classify(e: Throwable): ConnectorError? {
25+
return when (val connectorErrorException: ConnectorErrorException? = unwind(e)) {
26+
is ConfigErrorException -> ConfigError(connectorErrorException.message!!)
27+
is TransientErrorException -> TransientError(connectorErrorException.message!!)
28+
is SystemErrorException -> SystemError(connectorErrorException.message)
29+
null -> null
30+
}
31+
}
32+
33+
/** Recursively walks the causes of [e] and returns the last [ConnectorErrorException]. */
34+
fun unwind(e: Throwable): ConnectorErrorException? {
35+
var connectorErrorException: ConnectorErrorException? = null
36+
var unwound: Throwable? = e
37+
while (unwound != null) {
38+
if (unwound is ConnectorErrorException) {
39+
connectorErrorException = unwound
40+
}
41+
unwound = unwound.cause
42+
}
43+
return connectorErrorException
44+
}
45+
}

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt

+59-58
Original file line numberDiff line numberDiff line change
@@ -3,51 +3,18 @@
33
*/
44
package io.airbyte.cdk.output
55

6-
import io.airbyte.cdk.ConfigErrorException
7-
import io.airbyte.cdk.ConnectorErrorException
8-
import io.airbyte.cdk.SystemErrorException
9-
import io.airbyte.cdk.TransientErrorException
10-
import io.airbyte.cdk.util.ApmTraceUtils
116
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
12-
import io.micronaut.context.annotation.DefaultImplementation
13-
import jakarta.inject.Singleton
14-
import org.apache.commons.lang3.exception.ExceptionUtils
7+
import io.micronaut.core.order.Ordered
158

16-
@Singleton
17-
@DefaultImplementation(DefaultExceptionClassifier::class)
18-
fun interface ExceptionClassifier {
9+
interface ExceptionClassifier : Ordered {
1910

2011
/** Classifies [e] into a [ConnectorError] if possible, null otherwise. */
2112
fun classify(e: Throwable): ConnectorError?
2213

23-
/** [SystemError] display message for [e] in case it can't be classified. */
24-
fun fallbackDisplayMessage(e: Throwable): String? = e.message
25-
26-
/** Maps [e] to a [AirbyteErrorTraceMessage] to be passed to the [OutputConsumer]. */
27-
fun handle(e: Throwable): AirbyteErrorTraceMessage {
28-
ApmTraceUtils.addExceptionToTrace(e)
29-
val connectorError: ConnectorError =
30-
DefaultExceptionClassifier().classify(e)
31-
?: classify(e) ?: SystemError(fallbackDisplayMessage(e) ?: e.message)
32-
val errorTraceMessage =
33-
AirbyteErrorTraceMessage()
34-
.withInternalMessage(e.toString())
35-
.withStackTrace(ExceptionUtils.getStackTrace(e))
36-
return when (connectorError) {
37-
is ConfigError ->
38-
errorTraceMessage
39-
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
40-
.withMessage(connectorError.displayMessage)
41-
is TransientError ->
42-
errorTraceMessage
43-
.withFailureType(AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
44-
.withMessage(connectorError.displayMessage)
45-
is SystemError ->
46-
errorTraceMessage
47-
.withFailureType(AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR)
48-
.withMessage(connectorError.displayMessage ?: e.message)
49-
}
50-
}
14+
/** Convenience val for [getOrder]. */
15+
val orderValue: Int
16+
17+
override fun getOrder(): Int = orderValue
5118
}
5219

5320
/** Each [ConnectorError] subtype corresponds to a [AirbyteErrorTraceMessage.FailureType]. */
@@ -75,29 +42,63 @@ data class TransientError(val displayMessage: String) : ConnectorError
7542
*/
7643
data class SystemError(val displayMessage: String?) : ConnectorError
7744

78-
/** Default implementation of [ExceptionClassifier]. */
79-
@Singleton
80-
class DefaultExceptionClassifier : ExceptionClassifier {
45+
/** Common Micronaut property prefix for all exception classifiers. */
46+
const val EXCEPTION_CLASSIFIER_PREFIX = "airbyte.connector.exception-classifiers"
47+
48+
/** Convenience interface for rules-based [ExceptionClassifier] implementations. */
49+
interface RuleBasedExceptionClassifier<T : RuleBasedExceptionClassifier.Rule> :
50+
ExceptionClassifier {
51+
52+
/** List of rules to match for. */
53+
val rules: List<T>
8154

8255
override fun classify(e: Throwable): ConnectorError? {
83-
return when (val connectorErrorException: ConnectorErrorException? = unwind(e)) {
84-
is ConfigErrorException -> ConfigError(connectorErrorException.message!!)
85-
is TransientErrorException -> TransientError(connectorErrorException.message!!)
86-
is SystemErrorException -> SystemError(connectorErrorException.message)
87-
null -> null
56+
for (rule in rules) {
57+
if (!rule.matches(e)) {
58+
continue
59+
}
60+
val message: String = rule.output ?: e.message ?: e.toString()
61+
val firstLine: String = if (rule.group == null) message else "${rule.group}: $message"
62+
val lines: List<String> = listOf(firstLine) + rule.referenceLinks
63+
val displayMessage: String = lines.joinToString(separator = "\n")
64+
return when (rule.error) {
65+
ErrorKind.CONFIG -> ConfigError(displayMessage)
66+
ErrorKind.TRANSIENT -> TransientError(displayMessage)
67+
ErrorKind.SYSTEM -> SystemError(displayMessage)
68+
}
8869
}
70+
return null
8971
}
9072

91-
/** Recursively walks the causes of [e] and returns the last [ConnectorErrorException]. */
92-
fun unwind(e: Throwable): ConnectorErrorException? {
93-
var connectorErrorException: ConnectorErrorException? = null
94-
var unwound: Throwable? = e
95-
while (unwound != null) {
96-
if (unwound is ConnectorErrorException) {
97-
connectorErrorException = unwound
98-
}
99-
unwound = unwound.cause
100-
}
101-
return connectorErrorException
73+
interface Rule : Ordered {
74+
75+
/** Rule ordinal in the rule set. */
76+
val ordinal: Int
77+
78+
/** If the rule matches, the kind of [ConnectorError] to produce. */
79+
val error: ErrorKind
80+
81+
/** Optional display message prefix. */
82+
val group: String?
83+
84+
/** Optional display message. */
85+
val output: String?
86+
87+
/** Optional list of reference links to display. */
88+
val referenceLinks: List<String>
89+
90+
/** Rule predicate. */
91+
fun matches(e: Throwable): Boolean
92+
93+
override fun getOrder(): Int = ordinal
94+
95+
/** Validates rule definition correctness. */
96+
fun validate()
97+
}
98+
99+
enum class ErrorKind {
100+
CONFIG,
101+
TRANSIENT,
102+
SYSTEM,
102103
}
103104
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.output
6+
7+
import io.airbyte.cdk.util.ApmTraceUtils
8+
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
9+
import jakarta.inject.Singleton
10+
import org.apache.commons.lang3.exception.ExceptionUtils
11+
12+
/** [ExceptionHandler] applies all available [ExceptionClassifier] implementations in sequence. */
13+
@Singleton
14+
class ExceptionHandler(val classifiers: List<ExceptionClassifier>) {
15+
16+
fun classify(e: Throwable): ConnectorError {
17+
for (classifier in classifiers) {
18+
val classified: ConnectorError? = classifier.classify(e)
19+
if (classified != null) {
20+
return classified
21+
}
22+
}
23+
return SystemError(e.message)
24+
}
25+
26+
/** Maps [e] to a [AirbyteErrorTraceMessage] to be passed to the [OutputConsumer]. */
27+
fun handle(e: Throwable): AirbyteErrorTraceMessage {
28+
ApmTraceUtils.addExceptionToTrace(e)
29+
val errorTraceMessage =
30+
AirbyteErrorTraceMessage()
31+
.withInternalMessage(e.toString())
32+
.withStackTrace(ExceptionUtils.getStackTrace(e))
33+
return when (val classified: ConnectorError = classify(e)) {
34+
is ConfigError ->
35+
errorTraceMessage
36+
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
37+
.withMessage(classified.displayMessage)
38+
is TransientError ->
39+
errorTraceMessage
40+
.withFailureType(AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
41+
.withMessage(classified.displayMessage)
42+
is SystemError ->
43+
errorTraceMessage
44+
.withFailureType(AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR)
45+
.withMessage(classified.displayMessage ?: e.message)
46+
}
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.output
6+
7+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
8+
import io.micronaut.context.annotation.EachProperty
9+
import io.micronaut.context.annotation.Parameter
10+
import io.micronaut.context.annotation.Requires
11+
import io.micronaut.context.annotation.Value
12+
import jakarta.inject.Singleton
13+
14+
const val REGEX_CLASSIFIER_PREFIX = "${EXCEPTION_CLASSIFIER_PREFIX}.regex"
15+
16+
/** [ExceptionClassifier] implementation based on regexes applied to the exception message. */
17+
@Singleton
18+
@Requires(property = "${REGEX_CLASSIFIER_PREFIX}.rules")
19+
class RegexExceptionClassifier(
20+
@Value("\${${REGEX_CLASSIFIER_PREFIX}.order:10}") override val orderValue: Int,
21+
override val rules: List<RegexExceptionClassifierRule>,
22+
) : RuleBasedExceptionClassifier<RegexExceptionClassifierRule> {
23+
24+
init {
25+
for (rule in rules) {
26+
rule.validate()
27+
}
28+
}
29+
}
30+
31+
/** Micronaut configuration object for [RuleBasedExceptionClassifier] rules. */
32+
@EachProperty("${REGEX_CLASSIFIER_PREFIX}.rules", list = true)
33+
@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI")
34+
class RegexExceptionClassifierRule(
35+
@param:Parameter override val ordinal: Int,
36+
) : RuleBasedExceptionClassifier.Rule {
37+
38+
// Micronaut configuration objects work better with mutable properties.
39+
override lateinit var error: RuleBasedExceptionClassifier.ErrorKind
40+
lateinit var pattern: String
41+
lateinit var inputExample: String
42+
override var group: String? = null
43+
override var output: String? = null
44+
override var referenceLinks: List<String> = emptyList()
45+
46+
val regex: Regex by lazy {
47+
pattern.toRegex(setOf(RegexOption.MULTILINE, RegexOption.IGNORE_CASE))
48+
}
49+
50+
override fun matches(e: Throwable): Boolean =
51+
e.message?.let { regex.containsMatchIn(it) } ?: false
52+
53+
override fun validate() {
54+
require(runCatching { error }.isSuccess) { "error kind must be set" }
55+
require(runCatching { pattern }.isSuccess) { "regex pattern must be set" }
56+
require(runCatching { inputExample }.isSuccess) {
57+
"input exception message example must be set"
58+
}
59+
val compileResult: Result<Regex> = runCatching { regex }
60+
require(compileResult.isSuccess) {
61+
"regex pattern error: ${compileResult.exceptionOrNull()?.message}"
62+
}
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.output
6+
7+
import io.airbyte.cdk.ConfigErrorException
8+
import io.airbyte.cdk.SystemErrorException
9+
import io.airbyte.cdk.TransientErrorException
10+
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
11+
import jakarta.inject.Inject
12+
import org.junit.jupiter.api.Assertions
13+
import org.junit.jupiter.api.Test
14+
15+
@MicronautTest
16+
class DefaultExceptionClassifierTest {
17+
18+
@Inject lateinit var classifier: DefaultExceptionClassifier
19+
20+
@Test
21+
fun testConfigError() {
22+
Assertions.assertEquals(
23+
ConfigError("foo"),
24+
classifier.classify(ConfigErrorException("foo")),
25+
)
26+
}
27+
28+
@Test
29+
fun testTransientError() {
30+
Assertions.assertEquals(
31+
TransientError("bar"),
32+
classifier.classify(TransientErrorException("bar")),
33+
)
34+
}
35+
36+
@Test
37+
fun testSystemError() {
38+
Assertions.assertEquals(
39+
SystemError("baz"),
40+
classifier.classify(SystemErrorException("baz")),
41+
)
42+
}
43+
44+
@Test
45+
fun testUnclassified() {
46+
Assertions.assertNull(classifier.classify(RuntimeException("quux")))
47+
}
48+
}

0 commit comments

Comments
 (0)