Skip to content

Commit 2b260de

Browse files
authored
[DB source error messages] : Better error messages when switching between global/per-stream modes (#37507)
1 parent 994e8c9 commit 2b260de

File tree

8 files changed

+15
-12
lines changed

8 files changed

+15
-12
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ corresponds to that version.
173173
### Java CDK
174174

175175
| Version | Date | Pull Request | Subject |
176-
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
176+
|:--------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.31.2 | 2024-04-30 | [\#37507](https://github.com/airbytehq/airbyte/pull/37507) | Better error messages when switching between global/per-stream modes. |
177178
| 0.31.0 | 2024-04-26 | [\#37584](https://github.com/airbytehq/airbyte/pull/37584) | Update S3 destination deps to exclude zookeeper and hadoop-yarn-common |
178179
| 0.30.11 | 2024-04-25 | [\#36899](https://github.com/airbytehq/airbyte/pull/36899) | changes for bigQuery destination. |
179180
| 0.30.10 | 2024-04-24 | [\#37541](https://github.com/airbytehq/airbyte/pull/37541) | remove excessive logging |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.31.0
1+
version=0.31.2

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManagerFactory.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package io.airbyte.cdk.integrations.source.relationaldb.state
55

66
import io.airbyte.cdk.integrations.source.relationaldb.models.DbState
7+
import io.airbyte.commons.exceptions.ConfigErrorException
78
import io.airbyte.commons.json.Jsons
89
import io.airbyte.protocol.models.v0.AirbyteStateMessage
910
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
@@ -95,8 +96,8 @@ object StateManagerFactory {
9596

9697
when (airbyteStateMessage.type) {
9798
AirbyteStateMessage.AirbyteStateType.STREAM ->
98-
throw IllegalArgumentException(
99-
"Unable to convert connector state from stream to global. Please reset the connection to continue."
99+
throw ConfigErrorException(
100+
"You've changed replication modes - please reset the streams in this connector"
100101
)
101102
AirbyteStateMessage.AirbyteStateType.LEGACY -> {
102103
globalStateMessage =
@@ -127,8 +128,8 @@ object StateManagerFactory {
127128
val streamStates: MutableList<AirbyteStateMessage> = ArrayList()
128129
when (airbyteStateMessage.type) {
129130
AirbyteStateMessage.AirbyteStateType.GLOBAL ->
130-
throw IllegalArgumentException(
131-
"Unable to convert connector state from global to stream. Please reset the connection to continue."
131+
throw ConfigErrorException(
132+
"You've changed replication modes - please reset the streams in this connector"
132133
)
133134
AirbyteStateMessage.AirbyteStateType.LEGACY ->
134135
streamStates.addAll(

airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManagerFactoryTest.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.source.relationaldb.state
66
import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState
77
import io.airbyte.cdk.integrations.source.relationaldb.models.DbState
88
import io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState
9+
import io.airbyte.commons.exceptions.ConfigErrorException
910
import io.airbyte.commons.json.Jsons
1011
import io.airbyte.protocol.models.v0.*
1112
import java.util.List
@@ -160,7 +161,7 @@ class StateManagerFactoryTest {
160161
.withStreamState(Jsons.jsonNode(DbStreamState()))
161162
)
162163

163-
Assertions.assertThrows(IllegalArgumentException::class.java) {
164+
Assertions.assertThrows(ConfigErrorException::class.java) {
164165
StateManagerFactory.createStateManager(
165166
AirbyteStateMessage.AirbyteStateType.GLOBAL,
166167
List.of(airbyteStateMessage),
@@ -280,7 +281,7 @@ class StateManagerFactoryTest {
280281
.withType(AirbyteStateMessage.AirbyteStateType.GLOBAL)
281282
.withGlobal(globalState)
282283

283-
Assertions.assertThrows(IllegalArgumentException::class.java) {
284+
Assertions.assertThrows(ConfigErrorException::class.java) {
284285
StateManagerFactory.createStateManager(
285286
AirbyteStateMessage.AirbyteStateType.STREAM,
286287
List.of(airbyteStateMessage),

airbyte-integrations/connectors/source-mysql/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
airbyteJavaConnector {
9-
cdkVersionRequired = '0.30.9'
9+
cdkVersionRequired = '0.31.2'
1010
features = ['db-sources']
1111
useLocalCdk = false
1212
}

airbyte-integrations/connectors/source-mysql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.3.22
12+
dockerImageTag: 3.3.23
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ public static List<AutoCloseableIterator<AirbyteMessage>> getCdcReadIterators(fi
8989
final String quoteString) {
9090
final JsonNode sourceConfig = database.getSourceConfig();
9191
final Duration firstRecordWaitTime = RecordWaitTimeUtil.getFirstRecordWaitTime(sourceConfig);
92-
final Duration subsequentRecordWaitTime = RecordWaitTimeUtil.getSubsequentRecordWaitTime(sourceConfig);
9392
LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds());
9493
// Determine the streams that need to be loaded via primary key sync.
9594
final List<AutoCloseableIterator<AirbyteMessage>> initialLoadIterator = new ArrayList<>();
@@ -160,7 +159,7 @@ public static List<AutoCloseableIterator<AirbyteMessage>> getCdcReadIterators(fi
160159
}
161160

162161
// Build the incremental CDC iterators.
163-
final AirbyteDebeziumHandler<MySqlCdcPosition> handler = new AirbyteDebeziumHandler<>(
162+
final AirbyteDebeziumHandler<MySqlCdcPosition> handler = new AirbyteDebeziumHandler<MySqlCdcPosition>(
164163
sourceConfig,
165164
MySqlCdcTargetPosition.targetPosition(database),
166165
true,

docs/integrations/sources/mysql.md

+1
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ Any database or table encoding combination of charset and collation is supported
223223

224224
| Version | Date | Pull Request | Subject |
225225
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
226+
| 3.3.23 | 2024-04-23 | [37507](https://github.com/airbytehq/airbyte/pull/37507) | Better errors when user switches from CDC to non-CDC mode. |
226227
| 3.3.22 | 2024-04-22 | [37541](https://github.com/airbytehq/airbyte/pull/37541) | Adopt latest CDK. reduce excessive logs. |
227228
| 3.3.21 | 2024-04-22 | [37476](https://github.com/airbytehq/airbyte/pull/37476) | Adopt latest CDK. |
228229
| 3.3.20 | 2024-04-16 | [37111](https://github.com/airbytehq/airbyte/pull/37111) | Populate null values in record message. |

0 commit comments

Comments
 (0)