Skip to content

Commit 62e5528

Browse files
Don't emit final state if there is an underlying stream failure (#34869)
Co-authored-by: Xiaohan Song <[email protected]>
1 parent fa66dc8 commit 62e5528

File tree

7 files changed

+28
-13
lines changed

7 files changed

+28
-13
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ MavenLocal debugging steps:
166166

167167
| Version | Date | Pull Request | Subject |
168168
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
169+
| 0.20.5 | 2024-02-13 | [\#34869](https://github.com/airbytehq/airbyte/pull/34869) | Don't emit final state in SourceStateIterator there is an underlying stream failure. |
169170
| 0.20.4 | 2024-02-12 | [\#35042](https://github.com/airbytehq/airbyte/pull/35042) | Use delegate's isDestinationV2 invocation in SshWrappedDestination. |
170171
| 0.20.3 | 2024-02-09 | [\#34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in mysql/mssql database name. |
171172
| 0.20.2 | 2024-02-12 | [\#35111](https://github.com/airbytehq/airbyte/pull/35144) | Make state emission from async framework synchronized. |
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.20.4
1+
version=0.20.5

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,19 @@ public SourceStateIterator(final Iterator<T> messageIterator,
3434
@CheckForNull
3535
@Override
3636
protected AirbyteMessage computeNext() {
37+
3738
boolean iteratorHasNextValue = false;
3839
try {
3940
iteratorHasNextValue = messageIterator.hasNext();
40-
} catch (Exception ex) {
41-
LOGGER.info("Caught exception while trying to get the next from message iterator. Treating hasNext to false. ", ex);
41+
} catch (final Exception ex) {
42+
// If the initial snapshot is incomplete for this stream, throw an exception failing the sync. This
43+
// will ensure the platform retry logic
44+
// kicks in and keeps retrying the sync until the initial snapshot is complete.
45+
throw new RuntimeException(ex);
4246
}
4347
if (iteratorHasNextValue) {
4448
if (sourceStateIteratorManager.shouldEmitStateMessage(recordCount, lastCheckpoint)) {
45-
AirbyteStateMessage stateMessage = sourceStateIteratorManager.generateStateMessageAtCheckpoint();
49+
final AirbyteStateMessage stateMessage = sourceStateIteratorManager.generateStateMessageAtCheckpoint();
4650
stateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount));
4751

4852
recordCount = 0L;
@@ -62,12 +66,12 @@ protected AirbyteMessage computeNext() {
6266
}
6367
} else if (!hasEmittedFinalState) {
6468
hasEmittedFinalState = true;
65-
final AirbyteStateMessage finalStateMessage = sourceStateIteratorManager.createFinalStateMessage();
66-
finalStateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount));
69+
final AirbyteStateMessage finalStateMessageForStream = sourceStateIteratorManager.createFinalStateMessage();
70+
finalStateMessageForStream.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount));
6771
recordCount = 0L;
6872
return new AirbyteMessage()
6973
.withType(Type.STATE)
70-
.withState(finalStateMessage);
74+
.withState(finalStateMessageForStream);
7175
} else {
7276
return endOfData();
7377
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
package io.airbyte.cdk.integrations.source.relationaldb.state;
66

77
import static org.junit.Assert.assertEquals;
8+
import static org.junit.Assert.assertThrows;
89
import static org.mockito.ArgumentMatchers.any;
910
import static org.mockito.ArgumentMatchers.anyLong;
1011
import static org.mockito.ArgumentMatchers.eq;
1112
import static org.mockito.Mockito.atLeastOnce;
1213
import static org.mockito.Mockito.doReturn;
14+
import static org.mockito.Mockito.doThrow;
1315
import static org.mockito.Mockito.mock;
1416
import static org.mockito.Mockito.verify;
1517

@@ -89,4 +91,11 @@ void testShouldSendEndOfData() {
8991
assertEquals(null, sourceStateIterator.computeNext());
9092
}
9193

94+
@Test
95+
void testShouldRethrowExceptions() {
96+
processRecordMessage();
97+
doThrow(new ArrayIndexOutOfBoundsException("unexpected error")).when(messageIterator).hasNext();
98+
assertThrows(RuntimeException.class, () -> sourceStateIterator.computeNext());
99+
}
100+
92101
}

airbyte-integrations/connectors/source-mysql/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
airbyteJavaConnector {
9-
cdkVersionRequired = '0.20.3'
9+
cdkVersionRequired = '0.20.5'
1010
features = ['db-sources']
1111
useLocalCdk = false
1212
}

airbyte-integrations/connectors/source-mysql/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.3.5
12+
dockerImageTag: 3.3.6
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

docs/integrations/sources/mysql.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,11 @@ Any database or table encoding combination of charset and collation is supported
223223

224224
| Version | Date | Pull Request | Subject |
225225
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
226-
| 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name |
227-
| 3.3.4 | 2024-02-08 | [34750](https://github.com/airbytehq/airbyte/pull/34750) | Adopt CDK 0.19.0 |
228-
| 3.3.3 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0. |
229-
| 3.3.2 | 2024-01-08 | [33005](https://github.com/airbytehq/airbyte/pull/33005) | Adding count stats for incremental sync in AirbyteStateMessage
226+
| 3.3.6 | 2024-02-13 | [34869](https://github.com/airbytehq/airbyte/pull/34573) | Don't emit state in SourceStateIterator when there is an underlying stream failure. |
227+
| 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name |
228+
| 3.3.4 | 2024-02-08 | [34750](https://github.com/airbytehq/airbyte/pull/34750) | Adopt CDK 0.19.0 |
229+
| 3.3.3 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0. |
230+
| 3.3.2 | 2024-01-08 | [33005](https://github.com/airbytehq/airbyte/pull/33005) | Adding count stats for incremental sync in AirbyteStateMessage |
230231
| 3.3.1 | 2024-01-03 | [33312](https://github.com/airbytehq/airbyte/pull/33312) | Adding count stats in AirbyteStateMessage |
231232
| 3.3.0 | 2023-12-19 | [33436](https://github.com/airbytehq/airbyte/pull/33436) | Remove LEGACY state flag |
232233
| 3.2.4 | 2023-12-12 | [33356](https://github.com/airbytehq/airbyte/pull/33210) | Support for better debugging tools.. |

0 commit comments

Comments
 (0)