Skip to content

Patch for mssql source cdc update #13168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0828f8e
MSSQL CDC feature to capture only changes option added
sivankumar86 May 11, 2022
874fff2
MSSQL CDC feature to capture only changes option added
sivankumar86 May 11, 2022
86ae3f1
added option to disable snapshot
sivankumar86 May 11, 2022
03d04c1
recommitting
sivankumar86 May 12, 2022
6237ab8
docker version change
sivankumar86 May 13, 2022
c0393a5
resolve conflict
sivankumar86 May 14, 2022
aadd7f1
Merge branch 'master' of https://github.com/sivankumar86/airbyte
sivankumar86 May 14, 2022
5d23be3
resolve conflict
sivankumar86 May 14, 2022
50c7a1a
review 1 added
sivankumar86 May 22, 2022
8e56a91
review 1 added
sivankumar86 May 22, 2022
6818b1e
removed extra space
sivankumar86 May 22, 2022
93ce289
Update spec.json
tuliren May 25, 2022
07c4ba8
Format java code
tuliren May 25, 2022
9b25bcf
Update doc and version
tuliren May 25, 2022
8f7e46e
Update is-cdc method and add test
tuliren May 25, 2022
8632467
Update spec
tuliren May 25, 2022
027d941
Update doc
tuliren May 25, 2022
29127c3
Update snapshot isolation check
tuliren May 25, 2022
fd44567
Add helper method to check snapshot level
tuliren May 25, 2022
1de1fd7
Introduce enums
tuliren May 25, 2022
3717d6b
Format code
tuliren May 25, 2022
4ab7a86
Remove empty constant class
tuliren May 25, 2022
fc60cf6
Update expected spec
tuliren May 25, 2022
1732f47
Update spec
tuliren May 25, 2022
f092b8d
Prevent npe
tuliren May 25, 2022
899dcd0
Update changelog
tuliren May 25, 2022
c68ff5e
Use a different field name
tuliren May 25, 2022
5be411f
Merge branch 'master' into liren/sivankumar-mssql-patch
tuliren May 25, 2022
31a95e4
Fix missing return statement
tuliren May 25, 2022
4284ce6
Merge branch 'master' into liren/sivankumar-mssql-patch
tuliren May 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.22
LABEL io.airbyte.version=0.4.0
LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,61 @@
}
]
},
"replication_method": {
"type": "string",
"replication": {
"type": "object",
"title": "Replication Method",
"description": "The replication method used for extracting data from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"default": "STANDARD",
"enum": ["STANDARD", "CDC"],
"order": 8
"additionalProperties": true,
"order": 8,
"oneOf": [
{
"title": "Standard",
"additionalProperties": false,
"description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.",
"required": ["replication_type"],
"properties": {
"replication_type": {
"type": "string",
"const": "Standard",
"enum": ["Standard"],
"default": "Standard",
"order": 0
}
}
},
{
"title": "Logical Replication (CDC)",
"additionalProperties": false,
"description": "CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"required": ["replication_type"],
"properties": {
"replication_type": {
"type": "string",
"const": "CDC",
"enum": ["CDC"],
"default": "CDC",
"order":0
},
"data_to_sync": {
"title": "Data to Sync",
"type": "string",
"default": "Existing and New",
"enum": ["Existing and New", "New Changes Only"],
"description": "What data should be synced under the CDC. \"Existing and New\" will read existing data as a snapshot, and sync new changes through CDC. \"New Changes Only\" will skip the initial snapshot, and only sync new changes through CDC.",
"order": 1
},
"snapshot_isolation": {
"title": "Initial Snapshot Isolation Level",
"type": "string",
"default": "Snapshot",
"enum": ["Snapshot", "Read Committed"],
"description": "Existing data in the database are synced through an initial snapshot. This parameter controls the isolation level that will be used during the initial snapshotting. If you choose the \"Snapshot\" level, you must enable the <a href=\"https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server\">snapshot isolation mode</a> on the database.",
"order": 2
}
}
}
]
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.23
LABEL io.airbyte.version=0.4.0
LABEL io.airbyte.name=airbyte/source-mssql
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mssql;

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.annotation.VisibleForTesting;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MssqlCdcHelper {

// legacy replication method config before version 0.4.0
// it is an enum with possible values: STANDARD and CDC
private static final String LEGACY_REPLICATION_FIELD = "replication_method";
// new replication method config since version 0.4.0
// it is an oneOf object
private static final String REPLICATION_FIELD = "replication";
private static final String REPLICATION_TYPE_FIELD = "replication_type";
private static final String CDC_SNAPSHOT_ISOLATION_FIELD = "snapshot_isolation";
private static final String CDC_DATA_TO_SYNC_FIELD = "data_to_sync";

public enum ReplicationMethod {
STANDARD,
CDC
}

/**
* The default "SNAPSHOT" mode can prevent other (non-Airbyte) transactions from updating table rows
* while we snapshot. References:
* https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver15
* https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode
*/
public enum SnapshotIsolation {

SNAPSHOT("Snapshot", "snapshot"),
READ_COMMITTED("Read Committed", "read_committed");

private final String snapshotIsolationLevel;
private final String debeziumIsolationMode;

SnapshotIsolation(final String snapshotIsolationLevel, final String debeziumIsolationMode) {
this.snapshotIsolationLevel = snapshotIsolationLevel;
this.debeziumIsolationMode = debeziumIsolationMode;
}

public String getDebeziumIsolationMode() {
return debeziumIsolationMode;
}

public static SnapshotIsolation from(final String jsonValue) {
for (final SnapshotIsolation value : values()) {
if (value.snapshotIsolationLevel.equalsIgnoreCase(jsonValue)) {
return value;
}
}
throw new IllegalArgumentException("Unexpected snapshot isolation level: " + jsonValue);
}

}

// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-mode
public enum DataToSync {

EXISTING_AND_NEW("Existing and New", "initial"),
NEW_CHANGES_ONLY("New Changes Only", "schema_only");

private final String dataToSyncConfig;
private final String debeziumSnapshotMode;

DataToSync(final String value, final String debeziumSnapshotMode) {
this.dataToSyncConfig = value;
this.debeziumSnapshotMode = debeziumSnapshotMode;
}

public String getDebeziumSnapshotMode() {
return debeziumSnapshotMode;
}

public static DataToSync from(final String value) {
for (final DataToSync s : values()) {
if (s.dataToSyncConfig.equalsIgnoreCase(value)) {
return s;
}
}
throw new IllegalArgumentException("Unexpected data to sync setting: " + value);
}

}

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcHelper.class);

@VisibleForTesting
static boolean isCdc(final JsonNode config) {
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
}
// new replication method config since version 0.4.0
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
}
return false;
}

@VisibleForTesting
static SnapshotIsolation getSnapshotIsolationConfig(final JsonNode config) {
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
return SnapshotIsolation.SNAPSHOT;
}
// new replication method config since version 0.4.0
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
final JsonNode snapshotIsolation = replicationConfig.get(CDC_SNAPSHOT_ISOLATION_FIELD);
return SnapshotIsolation.from(snapshotIsolation.asText());
}
return SnapshotIsolation.SNAPSHOT;
}

@VisibleForTesting
static DataToSync getDataToSyncConfig(final JsonNode config) {
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
return DataToSync.EXISTING_AND_NEW;
}
// new replication method config since version 0.4.0
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
final JsonNode dataToSync = replicationConfig.get(CDC_DATA_TO_SYNC_FIELD);
return DataToSync.from(dataToSync.asText());
}
return DataToSync.EXISTING_AND_NEW;
}

@VisibleForTesting
static Properties getDebeziumProperties(final JsonNode config) {
final Properties props = new Properties();
props.setProperty("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector");

// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-include-schema-changes
props.setProperty("include.schema.changes", "false");
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata
props.setProperty("provide.transaction.metadata", "false");

props.setProperty("converters", "mssql_converter");
props.setProperty("mssql_converter.type", "io.airbyte.integrations.debezium.internals.MSSQLConverter");

props.setProperty("snapshot.mode", getDataToSyncConfig(config).getDebeziumSnapshotMode());
props.setProperty("snapshot.isolation.mode", getSnapshotIsolationConfig(config).getDebeziumIsolationMode());

return props;
}

}

This file was deleted.

Loading