Skip to content

Commit 868ed76

Browse files
Patch for mssql source cdc update (#13168)
* MSSQL CDC feature to capture only changes option added * MSSQL CDC feature to capture only changes option added * added option to disable snapshot snapshot isolation setting is hardcode and I modified as user option * recommitting * docker version change docker version change * resolve conflict * review 1 added * review 1 added * removed extra space removed extra space * Update spec.json * Format java code * Update doc and version * Update is-cdc method and add test * Update spec * Update doc * Update snapshot isolation check * Add helper method to check snapshot level * Introduce enums * Format code * Remove empty constant class * Update expected spec * Update spec * Prevent npe * Update changelog * Use a different field name * Fix missing return statement Co-authored-by: Sivakumar Ramaswamy <[email protected]> Co-authored-by: sivankumar86 <[email protected]> Co-authored-by: sivankumar86 <[email protected]>
1 parent 90b757b commit 868ed76

File tree

13 files changed

+397
-148
lines changed

13 files changed

+397
-148
lines changed

airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.3.22
19+
LABEL io.airbyte.version=0.4.0
2020
LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt

airbyte-integrations/connectors/source-mssql-strict-encrypt/src/test/resources/expected_spec.json

+52-4
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,61 @@
9090
}
9191
]
9292
},
93-
"replication_method": {
94-
"type": "string",
93+
"replication": {
94+
"type": "object",
9595
"title": "Replication Method",
9696
"description": "The replication method used for extracting data from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
9797
"default": "STANDARD",
98-
"enum": ["STANDARD", "CDC"],
99-
"order": 8
98+
"additionalProperties": true,
99+
"order": 8,
100+
"oneOf": [
101+
{
102+
"title": "Standard",
103+
"additionalProperties": false,
104+
"description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.",
105+
"required": ["replication_type"],
106+
"properties": {
107+
"replication_type": {
108+
"type": "string",
109+
"const": "Standard",
110+
"enum": ["Standard"],
111+
"default": "Standard",
112+
"order": 0
113+
}
114+
}
115+
},
116+
{
117+
"title": "Logical Replication (CDC)",
118+
"additionalProperties": false,
119+
"description": "CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
120+
"required": ["replication_type"],
121+
"properties": {
122+
"replication_type": {
123+
"type": "string",
124+
"const": "CDC",
125+
"enum": ["CDC"],
126+
"default": "CDC",
127+
"order":0
128+
},
129+
"data_to_sync": {
130+
"title": "Data to Sync",
131+
"type": "string",
132+
"default": "Existing and New",
133+
"enum": ["Existing and New", "New Changes Only"],
134+
"description": "What data should be synced under the CDC. \"Existing and New\" will read existing data as a snapshot, and sync new changes through CDC. \"New Changes Only\" will skip the initial snapshot, and only sync new changes through CDC.",
135+
"order": 1
136+
},
137+
"snapshot_isolation": {
138+
"title": "Initial Snapshot Isolation Level",
139+
"type": "string",
140+
"default": "Snapshot",
141+
"enum": ["Snapshot", "Read Committed"],
142+
"description": "Existing data in the database are synced through an initial snapshot. This parameter controls the isolation level that will be used during the initial snapshotting. If you choose the \"Snapshot\" level, you must enable the <a href=\"https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server\">snapshot isolation mode</a> on the database.",
143+
"order": 2
144+
}
145+
}
146+
}
147+
]
100148
}
101149
}
102150
}

airbyte-integrations/connectors/source-mssql/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-mssql
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.3.23
19+
LABEL io.airbyte.version=0.4.0
2020
LABEL io.airbyte.name=airbyte/source-mssql
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.source.mssql;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.debezium.annotation.VisibleForTesting;
9+
import java.util.Properties;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
public class MssqlCdcHelper {
14+
15+
// legacy replication method config before version 0.4.0
16+
// it is an enum with possible values: STANDARD and CDC
17+
private static final String LEGACY_REPLICATION_FIELD = "replication_method";
18+
// new replication method config since version 0.4.0
19+
// it is an oneOf object
20+
private static final String REPLICATION_FIELD = "replication";
21+
private static final String REPLICATION_TYPE_FIELD = "replication_type";
22+
private static final String CDC_SNAPSHOT_ISOLATION_FIELD = "snapshot_isolation";
23+
private static final String CDC_DATA_TO_SYNC_FIELD = "data_to_sync";
24+
25+
public enum ReplicationMethod {
26+
STANDARD,
27+
CDC
28+
}
29+
30+
/**
31+
* The default "SNAPSHOT" mode can prevent other (non-Airbyte) transactions from updating table rows
32+
* while we snapshot. References:
33+
* https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver15
34+
* https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode
35+
*/
36+
public enum SnapshotIsolation {
37+
38+
SNAPSHOT("Snapshot", "snapshot"),
39+
READ_COMMITTED("Read Committed", "read_committed");
40+
41+
private final String snapshotIsolationLevel;
42+
private final String debeziumIsolationMode;
43+
44+
SnapshotIsolation(final String snapshotIsolationLevel, final String debeziumIsolationMode) {
45+
this.snapshotIsolationLevel = snapshotIsolationLevel;
46+
this.debeziumIsolationMode = debeziumIsolationMode;
47+
}
48+
49+
public String getDebeziumIsolationMode() {
50+
return debeziumIsolationMode;
51+
}
52+
53+
public static SnapshotIsolation from(final String jsonValue) {
54+
for (final SnapshotIsolation value : values()) {
55+
if (value.snapshotIsolationLevel.equalsIgnoreCase(jsonValue)) {
56+
return value;
57+
}
58+
}
59+
throw new IllegalArgumentException("Unexpected snapshot isolation level: " + jsonValue);
60+
}
61+
62+
}
63+
64+
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-mode
65+
public enum DataToSync {
66+
67+
EXISTING_AND_NEW("Existing and New", "initial"),
68+
NEW_CHANGES_ONLY("New Changes Only", "schema_only");
69+
70+
private final String dataToSyncConfig;
71+
private final String debeziumSnapshotMode;
72+
73+
DataToSync(final String value, final String debeziumSnapshotMode) {
74+
this.dataToSyncConfig = value;
75+
this.debeziumSnapshotMode = debeziumSnapshotMode;
76+
}
77+
78+
public String getDebeziumSnapshotMode() {
79+
return debeziumSnapshotMode;
80+
}
81+
82+
public static DataToSync from(final String value) {
83+
for (final DataToSync s : values()) {
84+
if (s.dataToSyncConfig.equalsIgnoreCase(value)) {
85+
return s;
86+
}
87+
}
88+
throw new IllegalArgumentException("Unexpected data to sync setting: " + value);
89+
}
90+
91+
}
92+
93+
private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcHelper.class);
94+
95+
@VisibleForTesting
96+
static boolean isCdc(final JsonNode config) {
97+
// legacy replication method config before version 0.4.0
98+
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
99+
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
100+
}
101+
// new replication method config since version 0.4.0
102+
if (config.hasNonNull(REPLICATION_FIELD)) {
103+
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
104+
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
105+
}
106+
return false;
107+
}
108+
109+
@VisibleForTesting
110+
static SnapshotIsolation getSnapshotIsolationConfig(final JsonNode config) {
111+
// legacy replication method config before version 0.4.0
112+
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
113+
return SnapshotIsolation.SNAPSHOT;
114+
}
115+
// new replication method config since version 0.4.0
116+
if (config.hasNonNull(REPLICATION_FIELD)) {
117+
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
118+
final JsonNode snapshotIsolation = replicationConfig.get(CDC_SNAPSHOT_ISOLATION_FIELD);
119+
return SnapshotIsolation.from(snapshotIsolation.asText());
120+
}
121+
return SnapshotIsolation.SNAPSHOT;
122+
}
123+
124+
@VisibleForTesting
125+
static DataToSync getDataToSyncConfig(final JsonNode config) {
126+
// legacy replication method config before version 0.4.0
127+
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
128+
return DataToSync.EXISTING_AND_NEW;
129+
}
130+
// new replication method config since version 0.4.0
131+
if (config.hasNonNull(REPLICATION_FIELD)) {
132+
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
133+
final JsonNode dataToSync = replicationConfig.get(CDC_DATA_TO_SYNC_FIELD);
134+
return DataToSync.from(dataToSync.asText());
135+
}
136+
return DataToSync.EXISTING_AND_NEW;
137+
}
138+
139+
@VisibleForTesting
140+
static Properties getDebeziumProperties(final JsonNode config) {
141+
final Properties props = new Properties();
142+
props.setProperty("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector");
143+
144+
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-include-schema-changes
145+
props.setProperty("include.schema.changes", "false");
146+
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata
147+
props.setProperty("provide.transaction.metadata", "false");
148+
149+
props.setProperty("converters", "mssql_converter");
150+
props.setProperty("mssql_converter.type", "io.airbyte.integrations.debezium.internals.MSSQLConverter");
151+
152+
props.setProperty("snapshot.mode", getDataToSyncConfig(config).getDebeziumSnapshotMode());
153+
props.setProperty("snapshot.isolation.mode", getSnapshotIsolationConfig(config).getDebeziumIsolationMode());
154+
155+
return props;
156+
}
157+
158+
}

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcProperties.java

-53
This file was deleted.

0 commit comments

Comments
 (0)