Skip to content

Commit 62cf441

Browse files
authored
improve cdc check for connectors (#14005)
* improve should use cdc check * Revert "improve should use cdc check" This reverts commit 7d01727. * improve should use cdc check * add unit test
1 parent 1498ce9 commit 62cf441

File tree

7 files changed

+129
-19
lines changed

7 files changed

+129
-19
lines changed

airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java

+7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import io.airbyte.integrations.debezium.internals.FilteredFileDatabaseHistory;
1818
import io.airbyte.protocol.models.AirbyteMessage;
1919
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
20+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
21+
import io.airbyte.protocol.models.SyncMode;
2022
import io.debezium.engine.ChangeEvent;
2123
import java.time.Instant;
2224
import java.util.Collections;
@@ -120,4 +122,9 @@ private Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager(final CdcSave
120122
return Optional.empty();
121123
}
122124

125+
public static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) {
126+
return catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode)
127+
.anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL);
128+
}
129+
123130
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.debezium;
6+
7+
import com.google.common.collect.Lists;
8+
import io.airbyte.protocol.models.AirbyteCatalog;
9+
import io.airbyte.protocol.models.CatalogHelpers;
10+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
11+
import io.airbyte.protocol.models.Field;
12+
import io.airbyte.protocol.models.JsonSchemaType;
13+
import io.airbyte.protocol.models.SyncMode;
14+
import java.util.List;
15+
import org.junit.jupiter.api.Assertions;
16+
import org.junit.jupiter.api.Test;
17+
18+
public class AirbyteDebeziumHandlerTest {
19+
20+
@Test
21+
public void shouldUseCdcTestShouldReturnTrue() {
22+
final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(List.of(
23+
CatalogHelpers.createAirbyteStream(
24+
"MODELS_STREAM_NAME",
25+
"MODELS_SCHEMA",
26+
Field.of("COL_ID", JsonSchemaType.NUMBER),
27+
Field.of("COL_MAKE_ID", JsonSchemaType.NUMBER),
28+
Field.of("COL_MODEL", JsonSchemaType.STRING))
29+
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
30+
.withSourceDefinedPrimaryKey(List.of(List.of("COL_ID")))));
31+
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers
32+
.toDefaultConfiguredCatalog(catalog);
33+
// set all streams to incremental.
34+
configuredCatalog.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL));
35+
36+
Assertions.assertTrue(AirbyteDebeziumHandler.shouldUseCDC(configuredCatalog));
37+
}
38+
39+
@Test
40+
public void shouldUseCdcTestShouldReturnFalse() {
41+
final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(List.of(
42+
CatalogHelpers.createAirbyteStream(
43+
"MODELS_STREAM_NAME",
44+
"MODELS_SCHEMA",
45+
Field.of("COL_ID", JsonSchemaType.NUMBER),
46+
Field.of("COL_MAKE_ID", JsonSchemaType.NUMBER),
47+
Field.of("COL_MODEL", JsonSchemaType.STRING))
48+
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
49+
.withSourceDefinedPrimaryKey(List.of(List.of("COL_ID")))));
50+
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers
51+
.toDefaultConfiguredCatalog(catalog);
52+
53+
Assertions.assertFalse(AirbyteDebeziumHandler.shouldUseCDC(configuredCatalog));
54+
}
55+
56+
}

airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java

+7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import io.airbyte.integrations.debezium.internals.FilteredFileDatabaseHistory;
1818
import io.airbyte.protocol.models.AirbyteMessage;
1919
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
20+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
21+
import io.airbyte.protocol.models.SyncMode;
2022
import io.debezium.engine.ChangeEvent;
2123
import java.time.Instant;
2224
import java.util.Collections;
@@ -120,4 +122,9 @@ private Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager(final CdcSave
120122
return Optional.empty();
121123
}
122124

125+
public static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) {
126+
return catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode)
127+
.anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL);
128+
}
129+
123130
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.debezium;
6+
7+
import com.google.common.collect.Lists;
8+
import io.airbyte.protocol.models.AirbyteCatalog;
9+
import io.airbyte.protocol.models.CatalogHelpers;
10+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
11+
import io.airbyte.protocol.models.Field;
12+
import io.airbyte.protocol.models.JsonSchemaType;
13+
import io.airbyte.protocol.models.SyncMode;
14+
import java.util.List;
15+
import org.junit.jupiter.api.Assertions;
16+
import org.junit.jupiter.api.Test;
17+
18+
public class AirbyteDebeziumHandlerTest {
19+
20+
@Test
21+
public void shouldUseCdcTestShouldReturnTrue() {
22+
final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(List.of(
23+
CatalogHelpers.createAirbyteStream(
24+
"MODELS_STREAM_NAME",
25+
"MODELS_SCHEMA",
26+
Field.of("COL_ID", JsonSchemaType.NUMBER),
27+
Field.of("COL_MAKE_ID", JsonSchemaType.NUMBER),
28+
Field.of("COL_MODEL", JsonSchemaType.STRING))
29+
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
30+
.withSourceDefinedPrimaryKey(List.of(List.of("COL_ID")))));
31+
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers
32+
.toDefaultConfiguredCatalog(catalog);
33+
// set all streams to incremental.
34+
configuredCatalog.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL));
35+
36+
Assertions.assertTrue(AirbyteDebeziumHandler.shouldUseCDC(configuredCatalog));
37+
}
38+
39+
@Test
40+
public void shouldUseCdcTestShouldReturnFalse() {
41+
final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(List.of(
42+
CatalogHelpers.createAirbyteStream(
43+
"MODELS_STREAM_NAME",
44+
"MODELS_SCHEMA",
45+
Field.of("COL_ID", JsonSchemaType.NUMBER),
46+
Field.of("COL_MAKE_ID", JsonSchemaType.NUMBER),
47+
Field.of("COL_MODEL", JsonSchemaType.STRING))
48+
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
49+
.withSourceDefinedPrimaryKey(List.of(List.of("COL_ID")))));
50+
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers
51+
.toDefaultConfiguredCatalog(catalog);
52+
53+
Assertions.assertFalse(AirbyteDebeziumHandler.shouldUseCDC(configuredCatalog));
54+
}
55+
56+
}

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.source.mssql;
66

7+
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
78
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
89
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
910
import static java.util.stream.Collectors.toList;
@@ -370,13 +371,6 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
370371
}
371372
}
372373

373-
private static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) {
374-
final Optional<SyncMode> any = catalog.getStreams().stream()
375-
.map(ConfiguredAirbyteStream::getSyncMode)
376-
.filter(syncMode -> syncMode == SyncMode.INCREMENTAL).findAny();
377-
return any.isPresent();
378-
}
379-
380374
// Note: in place mutation.
381375
private static AirbyteStream removeIncrementalWithoutPk(final AirbyteStream stream) {
382376
if (stream.getSourceDefinedPrimaryKey().isEmpty()) {

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.source.mysql;
66

7+
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
78
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
89
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
910
import static io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper.checkBinlog;
@@ -168,12 +169,6 @@ private static boolean isCdc(final JsonNode config) {
168169
.equals(ReplicationMethod.CDC);
169170
}
170171

171-
private static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) {
172-
final Optional<SyncMode> any = catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode)
173-
.filter(syncMode -> syncMode == SyncMode.INCREMENTAL).findAny();
174-
return any.isPresent();
175-
}
176-
177172
@Override
178173
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final JdbcDatabase database,
179174
final ConfiguredAirbyteCatalog catalog,

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.source.postgres;
66

7+
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
78
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
89
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
910
import static java.util.stream.Collectors.toList;
@@ -249,12 +250,6 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
249250
}
250251
}
251252

252-
private static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) {
253-
final Optional<SyncMode> any = catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode)
254-
.filter(syncMode -> syncMode == SyncMode.INCREMENTAL).findAny();
255-
return any.isPresent();
256-
}
257-
258253
@VisibleForTesting
259254
static boolean isCdc(final JsonNode config) {
260255
final boolean isCdc = config.hasNonNull("replication_method")

0 commit comments

Comments
 (0)