Skip to content

Commit cc7b376

Browse files
subodh1810Phlairsherifnada
authored
🎉 Source MSSQL: implementation for CDC (#4689)
* first few classes for mssql cdc * wip * mssql cdc working against unit tests * increment version * add cdc acceptance test * tweaks * add file * working on comprehensive tests * change isolation from snapshot to read_committed_snapshot * finalised type tests * Revert "change isolation from snapshot to read_committed_snapshot" This reverts commit 20c6768. * small docstring fix * remove unused imports * stress test fixes * minor formatting improvements * mssql cdc docs * finish off cdc docs * format fix * update connector version * add to changelog * fix for sql server agent offline failing cdc enable on tables * final structure * few more updates * undo unwanted changes * add abstract test + more refinement * remove CDC metadata to debezium * use new cdc abstraction for mysql * undo wanted change * use cdc abstraction for postgres * add files * pull in latest changes * ready * rename class + add missing property * use renamed class + move constants to MySqlSource * use renamed class + move constants to PostgresSource * move debezium to bases + upgrade debezium version + review comments * downgrade version + minor fixes * bring in latest changes from cdc abstraction * reset to minutes * bring in the latest changes * format * fix build * address review comments * bring in latest changes * bring in latest changes * use common abstraction for CDC via debezium for sql server * remove debezium from build * finalise PR * should return Optional * pull in latest changes * pull in latest changes * address review comments * use common abstraction for CDC via debezium for mysql (#4604) * use new cdc abstraction for mysql * undo wanted change * pull in latest changes * use renamed class + move constants to MySqlSource * bring in latest changes from cdc abstraction * format * bring in latest changes * pull in latest changes * use common abstraction for CDC via debezium for postgres (#4607) * use cdc abstraction for postgres * add files * ready * use renamed class + move constants to PostgresSource * bring in the latest changes * bring in latest changes * pull in latest changes * lower version for tests to run on CI * format * Update docs/integrations/sources/mssql.md Co-authored-by: Sherif A. Nada <[email protected]> * addressing review comments * fix for testGetTargetPosition * format changes Co-authored-by: George Claireaux <[email protected]> Co-authored-by: Sherif A. Nada <[email protected]>
1 parent b18cedd commit cc7b376

File tree

19 files changed

+1933
-43
lines changed

19 files changed

+1933
-43
lines changed

airbyte-integrations/bases/debezium/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ dependencies {
1212
implementation 'io.debezium:debezium-embedded:1.4.2.Final'
1313
implementation 'io.debezium:debezium-connector-mysql:1.4.2.Final'
1414
implementation 'io.debezium:debezium-connector-postgres:1.4.2.Final'
15+
implementation 'io.debezium:debezium-connector-sqlserver:1.4.2.Final'
1516

1617
testFixturesImplementation project(':airbyte-db')
1718
testFixturesImplementation project(':airbyte-integrations:bases:base-java')

airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
import java.util.Comparator;
6060
import java.util.HashSet;
6161
import java.util.List;
62+
import java.util.Map;
63+
import java.util.Optional;
6264
import java.util.Set;
6365
import java.util.stream.Collectors;
6466
import java.util.stream.Stream;
@@ -78,9 +80,8 @@ public abstract class CdcSourceTest {
7880
protected static final String COL_ID = "id";
7981
protected static final String COL_MAKE_ID = "make_id";
8082
protected static final String COL_MODEL = "model";
81-
protected static final String DB_NAME = MODELS_SCHEMA;
8283

83-
private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
84+
protected static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
8485
CatalogHelpers.createAirbyteStream(
8586
MODELS_STREAM_NAME,
8687
MODELS_SCHEMA,
@@ -124,6 +125,24 @@ protected void executeQuery(String query) {
124125
}
125126
}
126127

128+
public String columnClause(Map<String, String> columnsWithDataType, Optional<String> primaryKey) {
129+
StringBuilder columnClause = new StringBuilder();
130+
int i = 0;
131+
for (Map.Entry<String, String> column : columnsWithDataType.entrySet()) {
132+
columnClause.append(column.getKey());
133+
columnClause.append(" ");
134+
columnClause.append(column.getValue());
135+
if (i < (columnsWithDataType.size() - 1)) {
136+
columnClause.append(",");
137+
columnClause.append(" ");
138+
}
139+
i++;
140+
}
141+
primaryKey.ifPresent(s -> columnClause.append(", PRIMARY KEY (").append(s).append(")"));
142+
143+
return columnClause.toString();
144+
}
145+
127146
public void createTable(String schemaName, String tableName, String columnClause) {
128147
executeQuery(createTableQuery(schemaName, tableName, columnClause));
129148
}
@@ -143,7 +162,7 @@ public String createSchemaQuery(String schemaName) {
143162
private void createAndPopulateActualTable() {
144163
createSchema(MODELS_SCHEMA);
145164
createTable(MODELS_SCHEMA, MODELS_STREAM_NAME,
146-
String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s)", COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID));
165+
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID)));
147166
for (JsonNode recordJson : MODEL_RECORDS) {
148167
writeModelRecord(recordJson);
149168
}
@@ -156,9 +175,8 @@ private void createAndPopulateActualTable() {
156175
private void createAndPopulateRandomTable() {
157176
createSchema(MODELS_SCHEMA + "_random");
158177
createTable(MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
159-
String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s)", COL_ID + "_random",
160-
COL_MAKE_ID + "_random",
161-
COL_MODEL + "_random", COL_ID + "_random"));
178+
columnClause(ImmutableMap.of(COL_ID + "_random", "INTEGER", COL_MAKE_ID + "_random", "INTEGER", COL_MODEL + "_random", "VARCHAR(200)"),
179+
Optional.of(COL_ID + "_random")));
162180
final List<JsonNode> MODEL_RECORDS_RANDOM = ImmutableList.of(
163181
Jsons
164182
.jsonNode(ImmutableMap
@@ -448,7 +466,7 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
448466
Jsons.jsonNode(ImmutableMap.of(COL_ID, 160, COL_MAKE_ID, 2, COL_MODEL, "E 350-2")));
449467

450468
createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2",
451-
String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s)", COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID));
469+
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID)));
452470

453471
for (JsonNode recordJson : MODEL_RECORDS_2) {
454472
writeRecords(recordJson, MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID,
@@ -571,7 +589,8 @@ void testDiscover() throws Exception {
571589
protected AirbyteCatalog expectedCatalogForDiscover() {
572590
final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG);
573591

574-
createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200)", COL_ID, COL_MAKE_ID, COL_MODEL));
592+
createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2",
593+
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.empty()));
575594

576595
List<AirbyteStream> streams = expectedCatalog.getStreams();
577596
// stream with PK
@@ -588,7 +607,19 @@ protected AirbyteCatalog expectedCatalogForDiscover() {
588607
streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH));
589608
addCdcMetadataColumns(streamWithoutPK);
590609

610+
AirbyteStream randomStream = CatalogHelpers.createAirbyteStream(
611+
MODELS_STREAM_NAME + "_random",
612+
MODELS_SCHEMA + "_random",
613+
Field.of(COL_ID + "_random", JsonSchemaPrimitive.NUMBER),
614+
Field.of(COL_MAKE_ID + "_random", JsonSchemaPrimitive.NUMBER),
615+
Field.of(COL_MODEL + "_random", JsonSchemaPrimitive.STRING))
616+
.withSourceDefinedCursor(true)
617+
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
618+
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random")));
619+
addCdcMetadataColumns(randomStream);
620+
591621
streams.add(streamWithoutPK);
622+
streams.add(randomStream);
592623
expectedCatalog.withStreams(streams);
593624
return expectedCatalog;
594625
}

airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,9 @@ public TestDataHolderBuilder airbyteType(JsonSchemaPrimitive airbyteType) {
117117

118118
/**
119119
* Set custom the create table script pattern. Use it if you source uses untypical table creation
120-
* sql. Default patter described {@link #DEFAULT_CREATE_TABLE_SQL} Note! The patter should contains
121-
* two String place holders for the table name and data type.
120+
* sql. Default patter described {@link #DEFAULT_CREATE_TABLE_SQL} Note! The patter should contain
121+
* four String place holders for the: - namespace.table name (as one placeholder together) - id
122+
* column name - test column name - test column data type
122123
*
123124
* @param createTablePatternSql creation table sql pattern
124125
* @return builder

airbyte-integrations/connectors/source-mssql/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,22 @@ dependencies {
1313

1414
implementation project(':airbyte-db')
1515
implementation project(':airbyte-integrations:bases:base-java')
16+
implementation project(':airbyte-integrations:bases:debezium')
1617
implementation project(':airbyte-protocol:models')
1718
implementation project(':airbyte-integrations:connectors:source-jdbc')
1819
implementation project(':airbyte-integrations:connectors:source-relational-db')
1920

21+
implementation 'io.debezium:debezium-connector-sqlserver:1.4.2.Final'
2022
implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14'
2123

24+
testImplementation testFixtures(project(':airbyte-integrations:bases:debezium'))
2225
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
2326

2427
testImplementation 'org.apache.commons:commons-lang3:3.11'
2528
testImplementation "org.testcontainers:mssqlserver:1.15.1"
2629

2730
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
31+
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-mssql')
2832

2933
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
3034
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2020 Airbyte
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package io.airbyte.integrations.source.mssql;
26+
27+
import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_LSN;
28+
29+
import com.fasterxml.jackson.databind.JsonNode;
30+
import com.fasterxml.jackson.databind.node.ObjectNode;
31+
import io.airbyte.integrations.debezium.CdcMetadataInjector;
32+
33+
public class MssqlCdcConnectorMetadataInjector implements CdcMetadataInjector {
34+
35+
@Override
36+
public void addMetaData(ObjectNode event, JsonNode source) {
37+
String commitLsn = source.get("commit_lsn").asText();
38+
event.put(CDC_LSN, commitLsn);
39+
}
40+
41+
@Override
42+
public String namespace(JsonNode source) {
43+
return source.get("schema").asText();
44+
}
45+
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2020 Airbyte
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package io.airbyte.integrations.source.mssql;
26+
27+
import java.util.Properties;
28+
29+
public class MssqlCdcProperties {
30+
31+
static Properties getDebeziumProperties() {
32+
final Properties props = new Properties();
33+
props.setProperty("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector");
34+
35+
// snapshot config
36+
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-mode
37+
props.setProperty("snapshot.mode", "initial");
38+
// https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver15
39+
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode
40+
// we set this to avoid preventing other (non-Airbyte) transactions from updating table rows while
41+
// we snapshot
42+
props.setProperty("snapshot.isolation.mode", "snapshot");
43+
44+
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-include-schema-changes
45+
props.setProperty("include.schema.changes", "false");
46+
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata
47+
props.setProperty("provide.transaction.metadata", "false");
48+
49+
return props;
50+
}
51+
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2020 Airbyte
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package io.airbyte.integrations.source.mssql;
26+
27+
import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET;
28+
import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY;
29+
30+
import com.fasterxml.jackson.databind.JsonNode;
31+
import io.airbyte.integrations.debezium.CdcSavedInfoFetcher;
32+
import io.airbyte.integrations.source.relationaldb.models.CdcState;
33+
import java.util.Optional;
34+
35+
public class MssqlCdcSavedInfoFetcher implements CdcSavedInfoFetcher {
36+
37+
private final JsonNode savedOffset;
38+
private final JsonNode savedSchemaHistory;
39+
40+
protected MssqlCdcSavedInfoFetcher(CdcState savedState) {
41+
final boolean savedStatePresent = savedState != null && savedState.getState() != null;
42+
this.savedOffset = savedStatePresent ? savedState.getState().get(MSSQL_CDC_OFFSET) : null;
43+
this.savedSchemaHistory = savedStatePresent ? savedState.getState().get(MSSQL_DB_HISTORY) : null;
44+
}
45+
46+
@Override
47+
public JsonNode getSavedOffset() {
48+
return savedOffset;
49+
}
50+
51+
@Override
52+
public Optional<JsonNode> getSavedSchemaHistory() {
53+
return Optional.ofNullable(savedSchemaHistory);
54+
}
55+
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2020 Airbyte
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package io.airbyte.integrations.source.mssql;
26+
27+
import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET;
28+
import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY;
29+
30+
import com.fasterxml.jackson.databind.JsonNode;
31+
import io.airbyte.commons.json.Jsons;
32+
import io.airbyte.integrations.debezium.CdcStateHandler;
33+
import io.airbyte.integrations.source.relationaldb.StateManager;
34+
import io.airbyte.integrations.source.relationaldb.models.CdcState;
35+
import io.airbyte.protocol.models.AirbyteMessage;
36+
import io.airbyte.protocol.models.AirbyteMessage.Type;
37+
import io.airbyte.protocol.models.AirbyteStateMessage;
38+
import java.util.HashMap;
39+
import java.util.Map;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
public class MssqlCdcStateHandler implements CdcStateHandler {
44+
45+
private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcStateHandler.class);
46+
private final StateManager stateManager;
47+
48+
public MssqlCdcStateHandler(StateManager stateManager) {
49+
this.stateManager = stateManager;
50+
}
51+
52+
@Override
53+
public AirbyteMessage saveState(Map<String, String> offset, String dbHistory) {
54+
Map<String, Object> state = new HashMap<>();
55+
state.put(MSSQL_CDC_OFFSET, offset);
56+
state.put(MSSQL_DB_HISTORY, dbHistory);
57+
58+
final JsonNode asJson = Jsons.jsonNode(state);
59+
60+
LOGGER.info("debezium state: {}", asJson);
61+
62+
final CdcState cdcState = new CdcState().withState(asJson);
63+
stateManager.getCdcStateManager().setCdcState(cdcState);
64+
final AirbyteStateMessage stateMessage = stateManager.emit();
65+
return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
66+
}
67+
68+
}

0 commit comments

Comments
 (0)