Skip to content

Commit 44a24ef

Browse files
make sure AirbyteExceptionHandler always terminates (#38122)
1 parent 1188221 commit 44a24ef

File tree

3 files changed

+52
-44
lines changed

3 files changed

+52
-44
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ corresponds to that version.
173173
### Java CDK
174174

175175
| Version | Date | Pull Request | Subject |
176-
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
176+
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
177178
| 0.34.3 | 2024-05-10 | [\#38095](https://github.com/airbytehq/airbyte/pull/38095) | Minor changes for databricks connector |
178179
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
179180
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteExceptionHandler.kt

+49-42
Original file line numberDiff line numberDiff line change
@@ -26,51 +26,58 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
2626
// the sync."
2727
// from the spec:
2828
// https://docs.google.com/document/d/1ctrj3Yh_GjtQ93aND-WH3ocqGxsmxyC3jfiarrF6NY0/edit#
29-
LOGGER.error(logMessage, throwable)
30-
// Attempt to deinterpolate the error message before emitting a trace message
31-
val mangledMessage: String?
32-
// If any exception in the chain is of a deinterpolatable type, find it and deinterpolate
33-
// its
34-
// message.
35-
// This assumes that any wrapping exceptions are just noise (e.g. runtime exception).
36-
val deinterpolatableException =
37-
ExceptionUtils.getThrowableList(throwable)
38-
.stream()
39-
.filter { t: Throwable ->
40-
THROWABLES_TO_DEINTERPOLATE.stream().anyMatch {
41-
deinterpolatableClass: Class<out Throwable> ->
42-
deinterpolatableClass.isAssignableFrom(t.javaClass)
29+
try {
30+
LOGGER.error(logMessage, throwable)
31+
// Attempt to deinterpolate the error message before emitting a trace message
32+
val mangledMessage: String?
33+
// If any exception in the chain is of a deinterpolatable type, find it and
34+
// deinterpolate
35+
// its
36+
// message.
37+
// This assumes that any wrapping exceptions are just noise (e.g. runtime exception).
38+
val deinterpolatableException =
39+
ExceptionUtils.getThrowableList(throwable)
40+
.stream()
41+
.filter { t: Throwable ->
42+
THROWABLES_TO_DEINTERPOLATE.stream().anyMatch {
43+
deinterpolatableClass: Class<out Throwable> ->
44+
deinterpolatableClass.isAssignableFrom(t.javaClass)
45+
}
4346
}
44-
}
45-
.findFirst()
46-
val messageWasMangled: Boolean
47-
if (deinterpolatableException.isPresent) {
48-
val originalMessage = deinterpolatableException.get().message
49-
mangledMessage =
50-
STRINGS_TO_DEINTERPOLATE
51-
.stream() // Sort the strings longest to shortest, in case any target string is
52-
// a substring of another
53-
// e.g. "airbyte_internal" should be swapped out before "airbyte"
54-
.sorted(Comparator.comparing { obj: String -> obj.length }.reversed())
55-
.reduce(originalMessage) { message: String?, targetString: String? ->
56-
deinterpolate(message, targetString)
57-
}
58-
messageWasMangled = mangledMessage != originalMessage
59-
} else {
60-
mangledMessage = throwable.message
61-
messageWasMangled = false
62-
}
47+
.findFirst()
48+
val messageWasMangled: Boolean
49+
if (deinterpolatableException.isPresent) {
50+
val originalMessage = deinterpolatableException.get().message
51+
mangledMessage =
52+
STRINGS_TO_DEINTERPOLATE
53+
.stream() // Sort the strings longest to shortest, in case any target string
54+
// is
55+
// a substring of another
56+
// e.g. "airbyte_internal" should be swapped out before "airbyte"
57+
.sorted(Comparator.comparing { obj: String -> obj.length }.reversed())
58+
.reduce(originalMessage) { message: String?, targetString: String? ->
59+
deinterpolate(message, targetString)
60+
}
61+
messageWasMangled = mangledMessage != originalMessage
62+
} else {
63+
mangledMessage = throwable.message
64+
messageWasMangled = false
65+
}
6366

64-
if (!messageWasMangled) {
65-
// If we did not modify the message (either not a deinterpolatable class, or we tried to
66-
// deinterpolate but made no changes) then emit our default trace message
67-
AirbyteTraceMessageUtility.emitSystemErrorTrace(throwable, logMessage)
68-
} else {
69-
// If we did modify the message, then emit a custom trace message
70-
AirbyteTraceMessageUtility.emitCustomErrorTrace(throwable.message, mangledMessage)
67+
if (!messageWasMangled) {
68+
// If we did not modify the message (either not a deinterpolatable class, or we
69+
// tried to
70+
// deinterpolate but made no changes) then emit our default trace message
71+
AirbyteTraceMessageUtility.emitSystemErrorTrace(throwable, logMessage)
72+
} else {
73+
// If we did modify the message, then emit a custom trace message
74+
AirbyteTraceMessageUtility.emitCustomErrorTrace(throwable.message, mangledMessage)
75+
}
76+
} catch (t: Throwable) {
77+
LOGGER.error("exception in the exception handler", t)
78+
} finally {
79+
terminate()
7180
}
72-
73-
terminate()
7481
}
7582

7683
// by doing this in a separate method we can mock it to avoid closing the jvm and therefore test
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.34.3
1+
version=0.34.4

0 commit comments

Comments
 (0)