Skip to content

Commit d3864c2

Browse files
authored
[source-mysql] Fix NPE on cursor based full refresh (#37824)
1 parent 0311db0 commit d3864c2

File tree

9 files changed

+59
-5
lines changed

9 files changed

+59
-5
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ corresponds to that version.
173173
### Java CDK
174174

175175
| Version | Date | Pull Request | Subject |
176-
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
176+
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.33.1 | 2024-05-03 | [\#37824](https://github.com/airbytehq/airbyte/pull/37824) | Add a unit test for cursor based sync |
177178
| 0.33.0 | 2024-05-03 | [\#36935](https://github.com/airbytehq/airbyte/pull/36935) | Destinations: Enable non-safe-casting DV2 tests |
178179
| 0.32.0 | 2024-05-03 | [\#36929](https://github.com/airbytehq/airbyte/pull/36929) | Destinations: Assorted DV2 changes for mysql |
179180
| 0.31.7 | 2024-05-02 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | changes for destination-snowflake |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.33.0
1+
version=0.33.1

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt

+40
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,46 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
479479
Assertions.assertTrue(actualRecordMessages.containsAll(expectedMessages))
480480
}
481481

482+
@Test
483+
@Throws(Exception::class)
484+
protected fun testReadBothIncrementalAndFullRefreshStreams() {
485+
val catalog = getConfiguredCatalogWithOneStream(defaultNamespace)
486+
val expectedMessages: MutableList<AirbyteMessage> = ArrayList(testMessages)
487+
488+
val streamName2 = streamName() + 2
489+
val tableName = getFullyQualifiedTableName(TABLE_NAME + 2)
490+
testdb!!
491+
.with(createTableQuery(tableName, "id INTEGER, name VARCHAR(200)", ""))
492+
.with("INSERT INTO %s(id, name) VALUES (1,'picard')", tableName)
493+
.with("INSERT INTO %s(id, name) VALUES (2, 'crusher')", tableName)
494+
.with("INSERT INTO %s(id, name) VALUES (3, 'vash')", tableName)
495+
496+
val airbyteStream2 =
497+
CatalogHelpers.createConfiguredAirbyteStream(
498+
streamName2,
499+
defaultNamespace,
500+
Field.of(COL_ID, JsonSchemaType.NUMBER),
501+
Field.of(COL_NAME, JsonSchemaType.STRING)
502+
)
503+
airbyteStream2.syncMode = SyncMode.INCREMENTAL
504+
airbyteStream2.cursorField = java.util.List.of(COL_ID)
505+
airbyteStream2.destinationSyncMode = DestinationSyncMode.APPEND
506+
catalog.streams.add(airbyteStream2)
507+
508+
expectedMessages.addAll(getAirbyteMessagesSecondSync(streamName2))
509+
510+
System.out.println("catalog: " + catalog)
511+
512+
val actualMessages = MoreIterators.toList(source()!!.read(config(), catalog, null))
513+
val actualRecordMessages = filterRecords(actualMessages)
514+
515+
setEmittedAtToNull(actualMessages)
516+
517+
Assertions.assertEquals(expectedMessages.size, actualRecordMessages.size)
518+
Assertions.assertTrue(expectedMessages.containsAll(actualRecordMessages))
519+
Assertions.assertTrue(actualRecordMessages.containsAll(expectedMessages))
520+
}
521+
482522
protected open fun getAirbyteMessagesSecondSync(streamName: String?): List<AirbyteMessage> {
483523
return testMessages
484524
.stream()

airbyte-integrations/connectors/source-mysql/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
airbyteJavaConnector {
9-
cdkVersionRequired = '0.31.8'
9+
cdkVersionRequired = '0.33.1'
1010
features = ['db-sources']
1111
useLocalCdk = false
1212
}

airbyte-integrations/connectors/source-mysql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.4.0
12+
dockerImageTag: 3.4.1
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlQueryUtils.java

+2
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,9 @@ public static Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair,
175175
}
176176

177177
LOGGER.info("Querying max cursor value for {}.{}", namespace, name);
178+
178179
final String cursorField = cursorInfoOptional.get().getCursorField();
180+
LOGGER.info("cursor field", cursorField);
179181
final String quotedCursorField = getIdentifierWithQuoting(cursorField, quoteString);
180182
final String cursorBasedSyncStatusQuery = String.format(MAX_CURSOR_VALUE_QUERY,
181183
quotedCursorField,

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams;
1515
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.logStreamSyncStatus;
1616
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.convertNameNamespacePairFromV0;
17+
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.filterStreamInIncrementalMode;
1718
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.getMySqlFullRefreshInitialLoadHandler;
1819
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.getMySqlInitialLoadGlobalStateManager;
1920
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.initPairToPrimaryKeyInfoMap;
@@ -467,7 +468,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
467468
if (isAnyStreamIncrementalSyncMode(catalog)) {
468469
LOGGER.info("Syncing via Primary Key");
469470
final MySqlCursorBasedStateManager cursorBasedStateManager = new MySqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
470-
final InitialLoadStreams initialLoadStreams = streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog);
471+
final InitialLoadStreams initialLoadStreams =
472+
filterStreamInIncrementalMode(streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog));
471473
final Map<AirbyteStreamNameNamespacePair, CursorBasedStatus> pairToCursorBasedStatus =
472474
getCursorBasedSyncStatusForStreams(database, initialLoadStreams.streamsForInitialLoad(), stateManager, getQuoteString());
473475
final CursorBasedStreams cursorBasedStreams =

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java

+8
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
5151
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
5252
import io.airbyte.protocol.models.v0.StreamDescriptor;
53+
import io.airbyte.protocol.models.v0.SyncMode;
5354
import java.time.Duration;
5455
import java.time.Instant;
5556
import java.util.ArrayList;
@@ -368,6 +369,13 @@ private static boolean streamHasPrimaryKey(final ConfiguredAirbyteStream stream)
368369
return stream.getStream().getSourceDefinedPrimaryKey().size() > 0;
369370
}
370371

372+
public static InitialLoadStreams filterStreamInIncrementalMode(final InitialLoadStreams stream) {
373+
return new InitialLoadStreams(
374+
stream.streamsForInitialLoad.stream().filter(airbyteStream -> airbyteStream.getSyncMode() == SyncMode.INCREMENTAL)
375+
.collect(Collectors.toList()),
376+
stream.pairToInitialLoadStatus);
377+
}
378+
371379
public static List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog,
372380
final Set<AirbyteStreamNameNamespacePair> alreadySyncedStreams) {
373381
final Set<AirbyteStreamNameNamespacePair> allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);

docs/integrations/sources/mysql.md

+1
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ Any database or table encoding combination of charset and collation is supported
226226

227227
| Version | Date | Pull Request | Subject |
228228
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
229+
| 3.4.1 | 2024-05-03 | [37824](https://github.com/airbytehq/airbyte/pull/37824) | Fixed a bug on Resumeable full refresh where cursor based source throw NPE. |
229230
| 3.4.0 | 2024-05-02 | [36932](https://github.com/airbytehq/airbyte/pull/36932) | Resumeable full refresh. Note please upgrade your platform - minimum platform version is 0.58.0. |
230231
| 3.3.25 | 2024-05-02 | [37781](https://github.com/airbytehq/airbyte/pull/37781) | Adopt latest CDK. |
231232
| 3.3.24 | 2024-05-01 | [37742](https://github.com/airbytehq/airbyte/pull/37742) | Adopt latest CDK. Remove Debezium retries. |

0 commit comments

Comments
 (0)