diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 8fe186d4e5186..327bf2795170b 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.20.5 | 2024-02-13 | [\#34869](https://github.com/airbytehq/airbyte/pull/34869) | Don't emit final state in SourceStateIterator there is an underlying stream failure. | | 0.20.4 | 2024-02-12 | [\#35042](https://github.com/airbytehq/airbyte/pull/35042) | Use delegate's isDestinationV2 invocation in SshWrappedDestination. | | 0.20.3 | 2024-02-09 | [\#34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in mysql/mssql database name. | | 0.20.2 | 2024-02-12 | [\#35111](https://github.com/airbytehq/airbyte/pull/35144) | Make state emission from async framework synchronized. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 5ca3b8c87d355..09d1c8ab8e97d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.20.4 \ No newline at end of file +version=0.20.5 \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java index 203244800b421..5166ae2898ae0 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.java @@ -34,15 +34,19 @@ public SourceStateIterator(final Iterator messageIterator, @CheckForNull @Override protected AirbyteMessage computeNext() { + boolean iteratorHasNextValue = false; try { iteratorHasNextValue = messageIterator.hasNext(); - } catch (Exception ex) { - LOGGER.info("Caught exception while trying to get the next from message iterator. Treating hasNext to false. ", ex); + } catch (final Exception ex) { + // If the initial snapshot is incomplete for this stream, throw an exception failing the sync. This + // will ensure the platform retry logic + // kicks in and keeps retrying the sync until the initial snapshot is complete. + throw new RuntimeException(ex); } if (iteratorHasNextValue) { if (sourceStateIteratorManager.shouldEmitStateMessage(recordCount, lastCheckpoint)) { - AirbyteStateMessage stateMessage = sourceStateIteratorManager.generateStateMessageAtCheckpoint(); + final AirbyteStateMessage stateMessage = sourceStateIteratorManager.generateStateMessageAtCheckpoint(); stateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount)); recordCount = 0L; @@ -62,12 +66,12 @@ protected AirbyteMessage computeNext() { } } else if (!hasEmittedFinalState) { hasEmittedFinalState = true; - final AirbyteStateMessage finalStateMessage = sourceStateIteratorManager.createFinalStateMessage(); - finalStateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount)); + final AirbyteStateMessage finalStateMessageForStream = sourceStateIteratorManager.createFinalStateMessage(); + finalStateMessageForStream.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount)); recordCount = 0L; return new AirbyteMessage() .withType(Type.STATE) - .withState(finalStateMessage); + .withState(finalStateMessageForStream); } else { return endOfData(); } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.java index 7a32b03607b3d..34560be119d98 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.java @@ -5,11 +5,13 @@ package io.airbyte.cdk.integrations.source.relationaldb.state; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -89,4 +91,11 @@ void testShouldSendEndOfData() { assertEquals(null, sourceStateIterator.computeNext()); } + @Test + void testShouldRethrowExceptions() { + processRecordMessage(); + doThrow(new ArrayIndexOutOfBoundsException("unexpected error")).when(messageIterator).hasNext(); + assertThrows(RuntimeException.class, () -> sourceStateIterator.computeNext()); + } + } diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 24c52f470102f..11b9d5b4e156f 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -6,7 +6,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.3' + cdkVersionRequired = '0.20.5' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 16b607af14ba5..a50e4a1846552 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.3.5 + dockerImageTag: 3.3.6 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 64f8de10e4041..7ad708fe1736c 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,10 +223,11 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name | -| 3.3.4 | 2024-02-08 | [34750](https://github.com/airbytehq/airbyte/pull/34750) | Adopt CDK 0.19.0 | -| 3.3.3 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0. | -| 3.3.2 | 2024-01-08 | [33005](https://github.com/airbytehq/airbyte/pull/33005) | Adding count stats for incremental sync in AirbyteStateMessage +| 3.3.6 | 2024-02-13 | [34869](https://github.com/airbytehq/airbyte/pull/34573) | Don't emit state in SourceStateIterator when there is an underlying stream failure. | +| 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name | +| 3.3.4 | 2024-02-08 | [34750](https://github.com/airbytehq/airbyte/pull/34750) | Adopt CDK 0.19.0 | +| 3.3.3 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0. | +| 3.3.2 | 2024-01-08 | [33005](https://github.com/airbytehq/airbyte/pull/33005) | Adding count stats for incremental sync in AirbyteStateMessage | | 3.3.1 | 2024-01-03 | [33312](https://github.com/airbytehq/airbyte/pull/33312) | Adding count stats in AirbyteStateMessage | | 3.3.0 | 2023-12-19 | [33436](https://github.com/airbytehq/airbyte/pull/33436) | Remove LEGACY state flag | | 3.2.4 | 2023-12-12 | [33356](https://github.com/airbytehq/airbyte/pull/33210) | Support for better debugging tools.. |