Skip to content

Commit d26bd10

Browse files
authored
[DB sources] : Add plumbing for adding transient errors (#38030)
1 parent 49bb246 commit d26bd10

File tree

13 files changed

+77
-23
lines changed

13 files changed

+77
-23
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ corresponds to that version.
173173
### Java CDK
174174

175175
| Version | Date | Pull Request | Subject |
176-
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
176+
|:--------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
177178
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |
178179
| 0.33.2 | 2024-05-03 | [\#37824](https://github.com/airbytehq/airbyte/pull/37824) | improve source acceptance tests |
179180
| 0.33.1 | 2024-05-03 | [\#37824](https://github.com/airbytehq/airbyte/pull/37824) | Add a unit test for cursor based sync |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteExceptionHandler.kt

-8
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.base
55

66
import com.fasterxml.jackson.databind.JsonNode
77
import com.google.common.annotations.VisibleForTesting
8-
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil
98
import java.util.*
109
import java.util.regex.Pattern
1110
import javax.validation.constraints.NotNull
@@ -28,13 +27,6 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
2827
// from the spec:
2928
// https://docs.google.com/document/d/1ctrj3Yh_GjtQ93aND-WH3ocqGxsmxyC3jfiarrF6NY0/edit#
3029
LOGGER.error(logMessage, throwable)
31-
32-
val rootThrowable = ConnectorExceptionUtil.getRootConfigError(Exception(throwable))
33-
34-
if (ConnectorExceptionUtil.isConfigError(rootThrowable)) {
35-
terminate()
36-
}
37-
3830
// Attempt to deinterpolate the error message before emitting a trace message
3931
val mangledMessage: String?
4032
// If any exception in the chain is of a deinterpolatable type, find it and deinterpolate

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ object AirbyteTraceMessageUtility {
2121

2222
@JvmStatic
2323
fun emitTransientErrorTrace(e: Throwable, displayMessage: String?) {
24-
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR)
24+
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
2525
}
2626

2727
fun emitCustomErrorTrace(displayMessage: String?, internalMessage: String?) {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

+22-6
Original file line numberDiff line numberDiff line change
@@ -220,14 +220,11 @@ internal constructor(
220220
// exist, we
221221
// just return the original exception.
222222
ApmTraceUtils.addExceptionToTrace(e)
223-
val rootThrowable = ConnectorExceptionUtil.getRootConfigError(e)
224-
val displayMessage = ConnectorExceptionUtil.getDisplayMessage(rootThrowable)
223+
val rootConfigErrorThrowable = ConnectorExceptionUtil.getRootConfigError(e)
224+
val rootTransientErrorThrowable = ConnectorExceptionUtil.getRootTransientError(e)
225225
// If the source connector throws a config error, a trace message with the relevant
226226
// message should
227227
// be surfaced.
228-
if (ConnectorExceptionUtil.isConfigError(rootThrowable)) {
229-
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage)
230-
}
231228
if (parsed.command == Command.CHECK) {
232229
// Currently, special handling is required for the CHECK case since the user display
233230
// information in
@@ -240,11 +237,30 @@ internal constructor(
240237
.withConnectionStatus(
241238
AirbyteConnectionStatus()
242239
.withStatus(AirbyteConnectionStatus.Status.FAILED)
243-
.withMessage(displayMessage)
240+
.withMessage(
241+
ConnectorExceptionUtil.getDisplayMessage(
242+
rootConfigErrorThrowable
243+
)
244+
)
244245
)
245246
)
246247
return
247248
}
249+
250+
if (ConnectorExceptionUtil.isConfigError(rootConfigErrorThrowable)) {
251+
AirbyteTraceMessageUtility.emitConfigErrorTrace(
252+
e,
253+
ConnectorExceptionUtil.getDisplayMessage(rootConfigErrorThrowable),
254+
)
255+
// On receiving a config error, the container should be immediately shut down.
256+
} else if (ConnectorExceptionUtil.isTransientError(rootTransientErrorThrowable)) {
257+
AirbyteTraceMessageUtility.emitTransientErrorTrace(
258+
e,
259+
ConnectorExceptionUtil.getDisplayMessage(rootTransientErrorThrowable)
260+
)
261+
// On receiving a transient error, the container should be immediately shut down.
262+
System.exit(1)
263+
}
248264
throw e
249265
}
250266

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.kt

+27-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import com.google.common.collect.ImmutableList
77
import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage
88
import io.airbyte.commons.exceptions.ConfigErrorException
99
import io.airbyte.commons.exceptions.ConnectionErrorException
10+
import io.airbyte.commons.exceptions.TransientErrorException
1011
import io.airbyte.commons.functional.Either
1112
import java.sql.SQLException
1213
import java.sql.SQLSyntaxErrorException
@@ -30,13 +31,18 @@ object ConnectorExceptionUtil {
3031
fun isConfigError(e: Throwable?): Boolean {
3132
return isConfigErrorException(e) ||
3233
isConnectionError(e) ||
33-
isRecoveryConnectionException(e) ||
3434
isUnknownColumnInFieldListException(e)
3535
}
3636

37+
fun isTransientError(e: Throwable?): Boolean {
38+
return isTransientErrorException(e) || isRecoveryConnectionException(e)
39+
}
40+
3741
fun getDisplayMessage(e: Throwable?): String? {
3842
return if (e is ConfigErrorException) {
3943
e.displayMessage
44+
} else if (e is TransientErrorException) {
45+
e.message
4046
} else if (e is ConnectionErrorException) {
4147
ErrorMessage.getErrorMessage(e.stateCode, e.errorCode, e.exceptionMessage, e)
4248
} else if (isRecoveryConnectionException(e)) {
@@ -67,6 +73,22 @@ object ConnectorExceptionUtil {
6773
return e
6874
}
6975

76+
/**
77+
* Returns the first instance of an exception associated with a configuration error (if it
78+
* exists). Otherwise, the original exception is returned.
79+
*/
80+
fun getRootTransientError(e: Exception?): Throwable? {
81+
var current: Throwable? = e
82+
while (current != null) {
83+
if (isTransientError(current)) {
84+
return current
85+
} else {
86+
current = current.cause
87+
}
88+
}
89+
return e
90+
}
91+
7092
/**
7193
* Log all the exceptions, and rethrow the first. This is useful for e.g. running multiple
7294
* futures and waiting for them to complete/fail. Rather than combining them into a single
@@ -103,6 +125,10 @@ object ConnectorExceptionUtil {
103125
return eithers.stream().map { obj: Either<out T, Result> -> obj.right!! }.toList()
104126
}
105127

128+
private fun isTransientErrorException(e: Throwable?): Boolean {
129+
return e is TransientErrorException
130+
}
131+
106132
private fun isConfigErrorException(e: Throwable?): Boolean {
107133
return e is ConfigErrorException
108134
}
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.34.0
1+
version=0.34.1

airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ dependencies {
1717
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
1818
api 'com.google.guava:guava:33.0.0-jre'
1919
api 'commons-io:commons-io:2.15.1'
20-
api ('io.airbyte.airbyte-protocol:protocol-models:0.7.0') { exclude group: 'com.google.api-client', module: 'google-api-client' }
20+
api ('io.airbyte.airbyte-protocol:protocol-models:0.9.0') { exclude group: 'com.google.api-client', module: 'google-api-client' }
2121
api 'javax.annotation:javax.annotation-api:1.3.2'
2222
api 'org.apache.commons:commons-compress:1.25.0'
2323
api 'org.apache.commons:commons-lang3:3.14.0'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.exceptions
6+
7+
/**
8+
* An exception that indicates a transient error was encountered. This exception is caught and emits
9+
* an AirbyteTraceMessage.
10+
*/
11+
class TransientErrorException : RuntimeException {
12+
13+
constructor(displayMessage: String) : super(displayMessage)
14+
15+
constructor(displayMessage: String, exception: Throwable?) : super(displayMessage, exception)
16+
}

airbyte-integrations/connectors/source-postgres/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ java {
1212
}
1313

1414
airbyteJavaConnector {
15-
cdkVersionRequired = '0.31.5'
15+
cdkVersionRequired = '0.34.1'
1616
features = ['db-sources', 'datastore-postgres']
1717
useLocalCdk = false
1818
}

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.3.32
12+
dockerImageTag: 3.3.33
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.junit.jupiter.api.Test;
7575

7676
@Order(1)
77+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH")
7778
public class CdcPostgresSourceTest extends CdcSourceTest<PostgresSource, PostgresTestDatabase> {
7879

7980
protected BaseImage postgresImage;

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.junit.jupiter.api.BeforeEach;
4949
import org.junit.jupiter.api.Test;
5050

51+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH")
5152
class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest<PostgresSource, PostgresTestDatabase> {
5253

5354
private static final String DATABASE = "new_db";

docs/integrations/sources/postgres.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,9 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp
304304

305305
## Changelog
306306

307-
| Version | Date | Pull Request | Subject |
308-
|---------|------------|----------------------------------------------------------|-----------------------------------------------------------|
307+
| Version | Date | Pull Request | Subject |
308+
| ------- | ---------- | -------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --- |
309+
| 3.3.33 | 2024-05-07 | [38030](https://github.com/airbytehq/airbyte/pull/38030) | Mark PG hot standby error as transient. |
309310
| 3.3.32 | 2024-04-30 | [37758](https://github.com/airbytehq/airbyte/pull/37758) | Correct previous release to disable debezium retries |
310311
| 3.3.31 | 2024-04-30 | [37754](https://github.com/airbytehq/airbyte/pull/37754) | Add CDC logs |
311312
| 3.3.30 | 2024-04-30 | [37726](https://github.com/airbytehq/airbyte/pull/37726) | Remove debezium retries |

0 commit comments

Comments
 (0)