From 0810cfad3825885c6ff5de6ed4fb2e7281989947 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 15 Apr 2024 12:53:38 -0700 Subject: [PATCH 1/4] [Source-mysql] : Add meta error handling in initial load path --- .../connectors/source-mysql/metadata.yaml | 2 +- .../source/mysql/MySqlSourceOperations.java | 10 ++++++---- .../mysql/initialsync/MySqlInitialLoadHandler.java | 13 ++++++++++--- .../initialsync/MySqlInitialLoadRecordIterator.java | 13 +++++++------ 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 8a1857bb9bd11..6cf2ddd3f27d0 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.3.18 + dockerImageTag: 3.3.19 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java index e64dc8927323f..c716a26ffcf41 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java @@ -46,6 +46,7 @@ import com.mysql.cj.result.Field; import io.airbyte.cdk.db.SourceOperations; import io.airbyte.cdk.db.jdbc.AbstractJdbcCompatibleSourceOperations; +import io.airbyte.cdk.db.jdbc.AirbyteRecordData; import io.airbyte.integrations.source.mysql.initialsync.CdcMetadataInjector; import io.airbyte.protocol.models.JsonSchemaType; import java.sql.PreparedStatement; @@ -81,13 +82,14 @@ public MySqlSourceOperations(final Optional metadataInjecto } @Override - public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { - final ObjectNode jsonNode = (ObjectNode) super.rowToJson(queryContext); + public AirbyteRecordData convertDatabaseRowToAirbyteRecordData(final ResultSet queryContext) throws SQLException { + final AirbyteRecordData recordData = super.convertDatabaseRowToAirbyteRecordData(queryContext); + final ObjectNode jsonNode = (ObjectNode) recordData.rawRowData(); if (!metadataInjector.isPresent()) { - return jsonNode; + return recordData; } metadataInjector.get().inject(jsonNode); - return jsonNode; + return new AirbyteRecordData(jsonNode, recordData.meta()); } /** diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java index 6b03ff28128f0..2457bc5924fe5 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.mysql.cj.MysqlType; +import io.airbyte.cdk.db.jdbc.AirbyteRecordData; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants; import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil; @@ -27,6 +28,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; @@ -110,7 +112,7 @@ public List> getIncrementalIterators( } }); - final AutoCloseableIterator queryStream = + final AutoCloseableIterator queryStream = new MySqlInitialLoadRecordIterator(database, sourceOperations, quoteString, initialLoadStateManager, selectedDatabaseFields, pair, calculateChunkSize(tableSizeInfoMap.get(pair), pair), isCompositePrimaryKey(airbyteStream)); final AutoCloseableIterator recordIterator = @@ -144,7 +146,7 @@ public static long calculateChunkSize(final TableSizeInfo tableSizeInfo, final A // Transforms the given iterator to create an {@link AirbyteRecordMessage} private AutoCloseableIterator getRecordIterator( - final AutoCloseableIterator recordIterator, + final AutoCloseableIterator recordIterator, final String streamName, final String namespace, final long emittedAt) { @@ -154,7 +156,12 @@ private AutoCloseableIterator getRecordIterator( .withStream(streamName) .withNamespace(namespace) .withEmittedAt(emittedAt) - .withData(r))); + .withData(r.rawRowData()) + .withMeta(isMetaChangesEmptyOrNull(r.meta()) ? null : r.meta()))); + } + + private boolean isMetaChangesEmptyOrNull(AirbyteRecordMessageMeta meta) { + return meta == null || meta.getChanges() == null || meta.getChanges().isEmpty(); } // Augments the given iterator with record count logs. diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java index be4bb4d6739b8..28aebfe96eae2 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java @@ -8,6 +8,7 @@ import com.google.common.collect.AbstractIterator; import com.mysql.cj.MysqlType; import io.airbyte.cdk.db.JdbcCompatibleSourceOperations; +import io.airbyte.cdk.db.jdbc.AirbyteRecordData; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils; import io.airbyte.commons.util.AutoCloseableIterator; @@ -37,8 +38,8 @@ * records processed here. */ @SuppressWarnings("try") -public class MySqlInitialLoadRecordIterator extends AbstractIterator - implements AutoCloseableIterator { +public class MySqlInitialLoadRecordIterator extends AbstractIterator + implements AutoCloseableIterator { private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialLoadRecordIterator.class); @@ -54,7 +55,7 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator private final PrimaryKeyInfo pkInfo; private final boolean isCompositeKeyLoad; private int numSubqueries = 0; - private AutoCloseableIterator currentIterator; + private AutoCloseableIterator currentIterator; MySqlInitialLoadRecordIterator( final JdbcDatabase database, @@ -78,7 +79,7 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator @CheckForNull @Override - protected JsonNode computeNext() { + protected AirbyteRecordData computeNext() { if (shouldBuildNextSubquery()) { try { // We will only issue one query for a composite key load. If we have already processed all the data @@ -93,8 +94,8 @@ protected JsonNode computeNext() { } LOGGER.info("Subquery number : {}", numSubqueries); - final Stream stream = database.unsafeQuery( - this::getPkPreparedStatement, sourceOperations::rowToJson); + final Stream stream = database.unsafeQuery( + this::getPkPreparedStatement, sourceOperations::convertDatabaseRowToAirbyteRecordData); currentIterator = AutoCloseableIterators.fromStream(stream, pair); numSubqueries++; From c8e6644b083903a97f49de21a71bc62d3724f1d4 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 15 Apr 2024 12:57:33 -0700 Subject: [PATCH 2/4] Fix formatting + docs --- .../mysql/initialsync/MySqlInitialLoadRecordIterator.java | 1 - docs/integrations/sources/mysql.md | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java index 28aebfe96eae2..7c1c600766a8a 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.source.mysql.initialsync; -import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.AbstractIterator; import com.mysql.cj.MysqlType; import io.airbyte.cdk.db.JdbcCompatibleSourceOperations; diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index ce2209e8295d3..c76cadb31c02a 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,7 +223,8 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.3.18 | 2024-04-15 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Refactor source operations. | +| 3.3.18 | 2024-04-15 | [37328](https://github.com/airbytehq/airbyte/pull/37328) | Populate airbyte_meta.changes | +| 3.3.18 | 2024-04-15 | [37324](https://github.com/airbytehq/airbyte/pull/37324) | Refactor source operations. | | 3.3.17 | 2024-04-10 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix a bug in conversion of null values. | | 3.3.16 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. | | 3.3.15 | 2024-04-05 | [36577](https://github.com/airbytehq/airbyte/pull/36577) | Config error will not send out system trace message | From 5801eb175ea4ae8c96d23da7452c3d429271feaf Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 15 Apr 2024 13:34:54 -0700 Subject: [PATCH 3/4] Fix changelog --- docs/integrations/sources/mysql.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index c76cadb31c02a..c3343beab04af 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,7 +223,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.3.18 | 2024-04-15 | [37328](https://github.com/airbytehq/airbyte/pull/37328) | Populate airbyte_meta.changes | +| 3.3.19 | 2024-04-15 | [37328](https://github.com/airbytehq/airbyte/pull/37328) | Populate airbyte_meta.changes | | 3.3.18 | 2024-04-15 | [37324](https://github.com/airbytehq/airbyte/pull/37324) | Refactor source operations. | | 3.3.17 | 2024-04-10 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix a bug in conversion of null values. | | 3.3.16 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. | From 3f9ba12b6c307ab5df1b24a1b8a4b2915ed26240 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Mon, 15 Apr 2024 14:33:33 -0700 Subject: [PATCH 4/4] Unit test for bad values --- .../source/mysql/CdcMysqlSourceTest.java | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index e5a1a75b802b2..99104ed17cc19 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -50,6 +51,10 @@ import io.airbyte.protocol.models.v0.AirbyteGlobalState; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason; import io.airbyte.protocol.models.v0.AirbyteStateMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.v0.AirbyteStream; @@ -59,6 +64,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.StreamDescriptor; import io.airbyte.protocol.models.v0.SyncMode; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -80,6 +86,11 @@ public class CdcMysqlSourceTest extends CdcSourceTest DATE_TIME_RECORDS = ImmutableList.of( + Jsons.jsonNode(ImmutableMap.of(COL_ID, 120, COL_DATE_TIME, "'2023-00-00 20:37:47'"))); + @Override protected MySQLTestDatabase createTestDatabase() { return MySQLTestDatabase.in(BaseImage.MYSQL_8, ContainerModifier.INVALID_TIMEZONE_CEST).withCdcPermissions(); @@ -734,6 +745,70 @@ public void testCompressedSchemaHistory() throws Exception { assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size()); } + private void writeDateRecords( + final JsonNode recordJson, + final String dbName, + final String streamName, + final String idCol, + final String dateCol) { + testdb.with("INSERT INTO `%s` .`%s` (%s, %s) VALUES (%s, %s);", dbName, streamName, + idCol, dateCol, + recordJson.get(idCol).asInt(), recordJson.get(dateCol).asText()); + } + + @Test + public void testInvalidDatetime_metaChangesPopulated() throws Exception { + final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(getConfiguredCatalog()); + + // Add a datetime stream to the catalog + testdb + .withoutStrictMode() + .with(createTableSqlFmt(), getDatabaseName(), TEST_DATE_STREAM_NAME, + columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_DATE_TIME, "DATETIME"), Optional.of(COL_ID))); + + for (final JsonNode recordJson : DATE_TIME_RECORDS) { + writeDateRecords(recordJson, getDatabaseName(), TEST_DATE_STREAM_NAME, COL_ID, COL_DATE_TIME); + } + + final ConfiguredAirbyteStream airbyteStream = new ConfiguredAirbyteStream() + .withStream(CatalogHelpers.createAirbyteStream( + TEST_DATE_STREAM_NAME, + getDatabaseName(), + Field.of(COL_ID, JsonSchemaType.INTEGER), + Field.of(COL_DATE_TIME, JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)) + .withSupportedSyncModes( + Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID)))); + airbyteStream.setSyncMode(SyncMode.INCREMENTAL); + + final List streams = new ArrayList<>(); + streams.add(airbyteStream); + configuredCatalog.withStreams(streams); + + final AutoCloseableIterator read1 = source() + .read(config(), configuredCatalog, null); + final List actualRecords = AutoCloseableIterators.toListAndClose(read1); + + // Sync is expected to succeed with one record. However, the meta changes column should be populated + // for this record + // as it is an invalid date. As a result, this field will be omitted as Airbyte is unable to + // serialize the source value. + final Set recordMessages = extractRecordMessages(actualRecords); + assertEquals(recordMessages.size(), 1); + final AirbyteRecordMessage invalidDateRecord = recordMessages.stream().findFirst().get(); + + final AirbyteRecordMessageMetaChange expectedChange = + new AirbyteRecordMessageMetaChange().withReason(Reason.SOURCE_SERIALIZATION_ERROR).withChange( + Change.NULLED).withField(COL_DATE_TIME); + final AirbyteRecordMessageMeta expectedMessageMeta = new AirbyteRecordMessageMeta().withChanges(List.of(expectedChange)); + assertEquals(expectedMessageMeta, invalidDateRecord.getMeta()); + + ObjectMapper mapper = new ObjectMapper(); + final JsonNode expectedDataWithoutCdcFields = mapper.readTree("{\"id\":120}"); + removeCDCColumns((ObjectNode) invalidDateRecord.getData()); + assertEquals(expectedDataWithoutCdcFields, invalidDateRecord.getData()); + } + private void createTablesToIncreaseSchemaHistorySize() { for (int i = 0; i <= 200; i++) { final String tableName = generateRandomStringOf32Characters();