Skip to content

Commit 90b757b

Browse files
authored
🎉 Source MSSQL: add option to disable initial snapshot and change snapshot isolation level (#12759)
* MSSQL CDC feature to capture only changes option added * MSSQL CDC feature to capture only changes option added * added option to disable snapshot snapshot isolation setting is hardcode and I modified as user option * recommitting * docker version change docker version change * resolve conflict * review 1 added * review 1 added * removed extra space removed extra space
1 parent 5a4cb12 commit 90b757b

File tree

6 files changed

+135
-46
lines changed

6 files changed

+135
-46
lines changed

‎airbyte-integrations/connectors/source-mssql/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-mssql
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.3.22
19+
LABEL io.airbyte.version=0.3.23
2020
LABEL io.airbyte.name=airbyte/source-mssql

‎airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcProperties.java

+28-10
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,16 @@
44

55
package io.airbyte.integrations.source.mssql;
66

7+
import com.fasterxml.jackson.databind.JsonNode;
8+
79
import java.util.Properties;
810

911
public class MssqlCdcProperties {
1012

11-
static Properties getDebeziumProperties() {
13+
static Properties getDebeziumProperties(final JsonNode config) {
1214
final Properties props = new Properties();
1315
props.setProperty("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector");
1416

15-
// snapshot config
16-
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-mode
17-
props.setProperty("snapshot.mode", "initial");
18-
// https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver15
19-
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode
20-
// we set this to avoid preventing other (non-Airbyte) transactions from updating table rows while
21-
// we snapshot
22-
props.setProperty("snapshot.isolation.mode", "snapshot");
23-
2417
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-include-schema-changes
2518
props.setProperty("include.schema.changes", "false");
2619
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata
@@ -29,6 +22,31 @@ static Properties getDebeziumProperties() {
2922
props.setProperty("converters", "mssql_converter");
3023
props.setProperty("mssql_converter.type", "io.airbyte.integrations.debezium.internals.MSSQLConverter");
3124

25+
final JsonNode replication_config = config.get("replication_method");
26+
if(replication_config.hasNonNull("replication_method")) {
27+
final JsonNode cdcMethod = config.get("replication_method");
28+
if(cdcMethod.hasNonNull("is_snapshot_disabled") &&
29+
cdcMethod.get("is_snapshot_disabled").asBoolean()) {
30+
props.setProperty("snapshot.isolation.mode", "read_committed");
31+
} else {
32+
// https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver15
33+
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode
34+
// we set this to avoid preventing other (non-Airbyte) transactions from updating table rows while
35+
// we snapshot
36+
props.setProperty("snapshot.isolation.mode", "snapshot");
37+
}
38+
if(cdcMethod.hasNonNull("is_cdc_only") &&
39+
cdcMethod.get("is_cdc_only").asBoolean()) {
40+
props.setProperty("snapshot.mode", "schema_only");
41+
} else {
42+
// snapshot config
43+
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-mode
44+
props.setProperty("snapshot.mode", "initial");
45+
}
46+
} else {
47+
props.setProperty("snapshot.isolation.mode", "snapshot");
48+
props.setProperty("snapshot.mode", "initial");
49+
}
3250
return props;
3351
}
3452

‎airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java

+39-28
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,7 @@
3838
import java.sql.PreparedStatement;
3939
import java.sql.SQLException;
4040
import java.time.Instant;
41-
import java.util.ArrayList;
42-
import java.util.List;
43-
import java.util.Map;
44-
import java.util.Optional;
45-
import java.util.Set;
41+
import java.util.*;
4642
import java.util.stream.Stream;
4743
import org.slf4j.Logger;
4844
import org.slf4j.LoggerFactory;
@@ -317,26 +313,30 @@ protected void assertSqlServerAgentRunning(final JdbcDatabase database) throws S
317313

318314
protected void assertSnapshotIsolationAllowed(final JsonNode config, final JdbcDatabase database)
319315
throws SQLException {
320-
final List<JsonNode> queryResponse = database.queryJsons(connection -> {
321-
final String sql = "SELECT name, snapshot_isolation_state FROM sys.databases WHERE name = ?";
322-
final PreparedStatement ps = connection.prepareStatement(sql);
323-
ps.setString(1, config.get("database").asText());
324-
LOGGER.info(String.format(
325-
"Checking that snapshot isolation is enabled on database '%s' using the query: '%s'",
326-
config.get("database").asText(), sql));
327-
return ps;
328-
}, sourceOperations::rowToJson);
316+
final JsonNode replication_config =config.get("replication_method");
317+
if(!replication_config.hasNonNull("is_snapshot_disabled") ||
318+
!replication_config.get("is_snapshot_disabled").asBoolean()) {
319+
final List<JsonNode> queryResponse = database.queryJsons(connection -> {
320+
final String sql = "SELECT name, snapshot_isolation_state FROM sys.databases WHERE name = ?";
321+
final PreparedStatement ps = connection.prepareStatement(sql);
322+
ps.setString(1, config.get("database").asText());
323+
LOGGER.info(String.format(
324+
"Checking that snapshot isolation is enabled on database '%s' using the query: '%s'",
325+
config.get("database").asText(), sql));
326+
return ps;
327+
}, sourceOperations::rowToJson);
329328

330-
if (queryResponse.size() < 1) {
331-
throw new RuntimeException(String.format(
332-
"Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).",
333-
config.get("database").asText()));
334-
}
335-
if (queryResponse.get(0).get("snapshot_isolation_state").asInt() != 1) {
336-
throw new RuntimeException(String.format(
337-
"Detected that snapshot isolation is not enabled for database '%s'. MSSQL CDC relies on snapshot isolation. "
338-
+ "Please check the documentation on how to enable snapshot isolation on MS SQL Server.",
339-
config.get("database").asText()));
329+
if (queryResponse.size() < 1) {
330+
throw new RuntimeException(String.format(
331+
"Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).",
332+
config.get("database").asText()));
333+
}
334+
if (queryResponse.get(0).get("snapshot_isolation_state").asInt() != 1) {
335+
throw new RuntimeException(String.format(
336+
"Detected that snapshot isolation is not enabled for database '%s'. MSSQL CDC relies on snapshot isolation. "
337+
+ "Please check the documentation on how to enable snapshot isolation on MS SQL Server.",
338+
config.get("database").asText()));
339+
}
340340
}
341341
}
342342

@@ -350,9 +350,10 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
350350
final JsonNode sourceConfig = database.getSourceConfig();
351351
if (isCdc(sourceConfig) && shouldUseCDC(catalog)) {
352352
LOGGER.info("using CDC: {}", true);
353+
Properties props=MssqlCdcProperties.getDebeziumProperties(sourceConfig);
353354
final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig,
354355
MssqlCdcTargetPosition.getTargetPosition(database, sourceConfig.get("database").asText()),
355-
MssqlCdcProperties.getDebeziumProperties(), catalog, true);
356+
props, catalog, true);
356357
return handler.getIncrementalIterators(
357358
new MssqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
358359
new MssqlCdcStateHandler(stateManager), new MssqlCdcConnectorMetadataInjector(),
@@ -364,9 +365,18 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
364365
}
365366

366367
private static boolean isCdc(final JsonNode config) {
367-
return config.hasNonNull("replication_method")
368-
&& ReplicationMethod.valueOf(config.get("replication_method").asText())
369-
.equals(ReplicationMethod.CDC);
368+
if(config.hasNonNull("replication_method")){
369+
final JsonNode replication_config = config.get("replication_method");
370+
if(replication_config.hasNonNull("replication_method")){
371+
return ReplicationMethod.valueOf(replication_config.get("replication_method").asText())
372+
.equals(ReplicationMethod.CDC);
373+
}else{
374+
return ReplicationMethod.valueOf(replication_config.asText())
375+
.equals(ReplicationMethod.CDC);
376+
}
377+
} else {
378+
return false;
379+
}
370380
}
371381

372382
private static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) {
@@ -408,6 +418,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
408418
return stream;
409419
}
410420

421+
411422
private void readSsl(final JsonNode sslMethod, final List<String> additionalParameters) {
412423
final JsonNode config = sslMethod.get("ssl_method");
413424
switch (config.get("ssl_method").asText()) {

‎airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json

+47-5
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,54 @@
105105
]
106106
},
107107
"replication_method": {
108-
"type": "string",
108+
"type": "object",
109109
"title": "Replication Method",
110-
"description": "The replication method used for extracting data from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
111-
"default": "STANDARD",
112-
"enum": ["STANDARD", "CDC"],
113-
"order": 8
110+
"description": "The replication method used for extracting data from the database. ",
111+
"order": 8,
112+
"oneOf": [
113+
{
114+
"title": "STANDARD",
115+
"additionalProperties": false,
116+
"description": "STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally.",
117+
"properties": {
118+
"replication_method": {
119+
"type": "string",
120+
"const": "STANDARD",
121+
"enum": ["STANDARD"],
122+
"default": "STANDARD",
123+
"order": 0
124+
}
125+
}
126+
},
127+
{
128+
"title": "CDC",
129+
"additionalProperties": false,
130+
"description": "CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
131+
"properties": {
132+
"replication_method": {
133+
"type": "string",
134+
"const": "CDC",
135+
"enum": ["CDC"],
136+
"default": "CDC",
137+
"order":0
138+
},
139+
"is_snapshot_disabled": {
140+
"title": "DisableSnapshot",
141+
"type": "boolean",
142+
"description": "validate the snapshot enable in the database. true would set snapshot.isolation.mode to snapshot otherwise, it would be ",
143+
"default": false,
144+
"order": 1
145+
},
146+
"is_cdc_only": {
147+
"title": "cdcOnly",
148+
"type": "boolean",
149+
"default": false,
150+
"description": "true would set snapshot mode to schema_only otherwise, it would set to initial. Refer, https://debezium.io/documentation/reference/stable/connectors/sqlserver.html",
151+
"order": 2
152+
}
153+
}
154+
}
155+
]
114156
}
115157
}
116158
}

‎airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.fasterxml.jackson.databind.JsonNode;
2222
import com.fasterxml.jackson.databind.node.ObjectNode;
2323
import com.google.common.collect.ImmutableMap;
24+
import com.google.common.collect.Lists;
2425
import io.airbyte.commons.json.Jsons;
2526
import io.airbyte.commons.string.Strings;
2627
import io.airbyte.db.Database;
@@ -38,6 +39,7 @@
3839
import io.airbyte.protocol.models.AirbyteStream;
3940
import io.debezium.connector.sqlserver.Lsn;
4041
import java.sql.SQLException;
42+
import java.util.Arrays;
4143
import java.util.List;
4244
import java.util.Map;
4345
import java.util.Optional;
@@ -81,7 +83,6 @@ private void init() {
8183

8284
dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
8385
source = new MssqlSource();
84-
8586
config = Jsons.jsonNode(ImmutableMap.builder()
8687
.put("host", container.getHost())
8788
.put("port", container.getFirstMappedPort())
@@ -268,6 +269,23 @@ void testAssertSnapshotIsolationAllowed() {
268269
assertThrows(RuntimeException.class, () -> source.assertSnapshotIsolationAllowed(config, testJdbcDatabase));
269270
}
270271

272+
@Test
273+
void testAssertSnapshotIsolationDisabled() {
274+
//disabled the snapshot
275+
JsonNode replication_config = Jsons.jsonNode(ImmutableMap.builder()
276+
.put("replication_method", "CDC")
277+
.put("is_cdc_only", "false")
278+
.put("is_snapshot_disabled", "true")
279+
.build());
280+
Jsons.replaceNestedValue(config, Arrays.asList(new String[]{"replication_method"}),replication_config);
281+
// snapshot isolation enabled by setup so assert check passes
282+
assertDoesNotThrow(() -> source.assertSnapshotIsolationAllowed(config, testJdbcDatabase));
283+
// now disable snapshot isolation and assert that check fails
284+
switchSnapshotIsolation(false, dbName);
285+
// snapshot isolation disabled and snapshot validation is disabled so assert check passes
286+
assertDoesNotThrow(() -> source.assertSnapshotIsolationAllowed(config, testJdbcDatabase));
287+
}
288+
271289
// Ensure the CDC check operations are included when CDC is enabled
272290
// todo: make this better by checking the returned checkOperations from source.getCheckOperations
273291
@Test

‎docs/integrations/sources/mssql.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ Please read the [CDC docs](../../understanding-airbyte/cdc.md) for an overview o
6363
* There are some critical issues regarding certain datatypes. Please find detailed info in [this Github issue](https://github.com/airbytehq/airbyte/issues/4542).
6464
* CDC is only available for SQL Server 2016 Service Pack 1 \(SP1\) and later.
6565
* _db\_owner_ \(or higher\) permissions are required to perform the [neccessary setup](mssql.md#setting-up-cdc-for-mssql) for CDC.
66-
* You must enable [snapshot isolation mode](https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server) on the database\(s\) you want to sync. This is used for retrieving an initial snapshot without locking tables.
66+
* You must enable [snapshot isolation mode](https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server) on the database\(s\) you want to sync. This is used for retrieving an initial snapshot without locking tables. You can disable snapshot isolation and initial load in connector setting if required.
6767
* On Linux, CDC is not supported on versions earlier than SQL Server 2017 CU18 \(SQL Server 2019 is supported\).
6868
* Change data capture cannot be enabled on tables with a clustered columnstore index. \(It can be enabled on tables with a _non-clustered_ columnstore index\).
6969
* The SQL Server CDC feature processes changes that occur in user-created tables only. You cannot enable CDC on the SQL Server master database.

0 commit comments

Comments
 (0)