Skip to content

Commit 15d8e46

Browse files
Publish new mssql source (#13176)
* Remove logger * Make the spec backward compatible * Match new replication config first * auto-bump connector version * Fix expected spec json Co-authored-by: Octavia Squidington III <[email protected]>
1 parent e65a97a commit 15d8e46

File tree

6 files changed

+114
-26
lines changed

6 files changed

+114
-26
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@
525525
- name: Microsoft SQL Server (MSSQL)
526526
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
527527
dockerRepository: airbyte/source-mssql
528-
dockerImageTag: 0.3.22
528+
dockerImageTag: 0.4.0
529529
documentationUrl: https://docs.airbyte.io/integrations/sources/mssql
530530
icon: mssql.svg
531531
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4736,7 +4736,7 @@
47364736
supportsNormalization: false
47374737
supportsDBT: false
47384738
supported_destination_sync_modes: []
4739-
- dockerImage: "airbyte/source-mssql:0.3.22"
4739+
- dockerImage: "airbyte/source-mssql:0.4.0"
47404740
spec:
47414741
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql"
47424742
connectionSpecification:
@@ -4748,7 +4748,7 @@
47484748
- "port"
47494749
- "database"
47504750
- "username"
4751-
additionalProperties: false
4751+
additionalProperties: true
47524752
properties:
47534753
host:
47544754
description: "The hostname of the database."
@@ -4841,19 +4841,71 @@
48414841
description: "Specifies the host name of the server. The value of\
48424842
\ this property must match the subject property of the certificate."
48434843
order: 7
4844-
replication_method:
4845-
type: "string"
4844+
replication:
4845+
type: "object"
48464846
title: "Replication Method"
48474847
description: "The replication method used for extracting data from the database.\
48484848
\ STANDARD replication requires no setup on the DB side but will not be\
48494849
\ able to represent deletions incrementally. CDC uses {TBC} to detect\
48504850
\ inserts, updates, and deletes. This needs to be configured on the source\
48514851
\ database itself."
48524852
default: "STANDARD"
4853-
enum:
4854-
- "STANDARD"
4855-
- "CDC"
4853+
additionalProperties: true
48564854
order: 8
4855+
oneOf:
4856+
- title: "Standard"
4857+
additionalProperties: false
4858+
description: "Standard replication requires no setup on the DB side but\
4859+
\ will not be able to represent deletions incrementally."
4860+
required:
4861+
- "replication_type"
4862+
properties:
4863+
replication_type:
4864+
type: "string"
4865+
const: "Standard"
4866+
enum:
4867+
- "Standard"
4868+
default: "Standard"
4869+
order: 0
4870+
- title: "Logical Replication (CDC)"
4871+
additionalProperties: false
4872+
description: "CDC uses {TBC} to detect inserts, updates, and deletes.\
4873+
\ This needs to be configured on the source database itself."
4874+
required:
4875+
- "replication_type"
4876+
properties:
4877+
replication_type:
4878+
type: "string"
4879+
const: "CDC"
4880+
enum:
4881+
- "CDC"
4882+
default: "CDC"
4883+
order: 0
4884+
data_to_sync:
4885+
title: "Data to Sync"
4886+
type: "string"
4887+
default: "Existing and New"
4888+
enum:
4889+
- "Existing and New"
4890+
- "New Changes Only"
4891+
description: "What data should be synced under the CDC. \"Existing\
4892+
\ and New\" will read existing data as a snapshot, and sync new\
4893+
\ changes through CDC. \"New Changes Only\" will skip the initial\
4894+
\ snapshot, and only sync new changes through CDC."
4895+
order: 1
4896+
snapshot_isolation:
4897+
title: "Initial Snapshot Isolation Level"
4898+
type: "string"
4899+
default: "Snapshot"
4900+
enum:
4901+
- "Snapshot"
4902+
- "Read Committed"
4903+
description: "Existing data in the database are synced through an\
4904+
\ initial snapshot. This parameter controls the isolation level\
4905+
\ that will be used during the initial snapshotting. If you choose\
4906+
\ the \"Snapshot\" level, you must enable the <a href=\"https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server\"\
4907+
>snapshot isolation mode</a> on the database."
4908+
order: 2
48574909
tunnel_method:
48584910
type: "object"
48594911
title: "SSH Tunnel Method"

airbyte-integrations/connectors/source-mssql-strict-encrypt/src/test/resources/expected_spec.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"title": "MSSQL Source Spec",
66
"type": "object",
77
"required": ["host", "port", "database", "username"],
8-
"additionalProperties": false,
8+
"additionalProperties": true,
99
"properties": {
1010
"host": {
1111
"description": "The hostname of the database.",

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import com.fasterxml.jackson.databind.JsonNode;
88
import io.debezium.annotation.VisibleForTesting;
99
import java.util.Properties;
10-
import org.slf4j.Logger;
11-
import org.slf4j.LoggerFactory;
1210

1311
public class MssqlCdcHelper {
1412

@@ -90,28 +88,22 @@ public static DataToSync from(final String value) {
9088

9189
}
9290

93-
private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcHelper.class);
94-
9591
@VisibleForTesting
9692
static boolean isCdc(final JsonNode config) {
97-
// legacy replication method config before version 0.4.0
98-
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
99-
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
100-
}
10193
// new replication method config since version 0.4.0
10294
if (config.hasNonNull(REPLICATION_FIELD)) {
10395
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
10496
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
10597
}
98+
// legacy replication method config before version 0.4.0
99+
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
100+
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
101+
}
106102
return false;
107103
}
108104

109105
@VisibleForTesting
110106
static SnapshotIsolation getSnapshotIsolationConfig(final JsonNode config) {
111-
// legacy replication method config before version 0.4.0
112-
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
113-
return SnapshotIsolation.SNAPSHOT;
114-
}
115107
// new replication method config since version 0.4.0
116108
if (config.hasNonNull(REPLICATION_FIELD)) {
117109
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
@@ -123,10 +115,6 @@ static SnapshotIsolation getSnapshotIsolationConfig(final JsonNode config) {
123115

124116
@VisibleForTesting
125117
static DataToSync getDataToSyncConfig(final JsonNode config) {
126-
// legacy replication method config before version 0.4.0
127-
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
128-
return DataToSync.EXISTING_AND_NEW;
129-
}
130118
// new replication method config since version 0.4.0
131119
if (config.hasNonNull(REPLICATION_FIELD)) {
132120
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);

airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"title": "MSSQL Source Spec",
66
"type": "object",
77
"required": ["host", "port", "database", "username"],
8-
"additionalProperties": false,
8+
"additionalProperties": true,
99
"properties": {
1010
"host": {
1111
"description": "The hostname of the database.",

airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlCdcHelperTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ public void testIsCdc() {
3737
"data_to_sync", "Existing and New",
3838
"snapshot_isolation", "Snapshot"))));
3939
assertTrue(MssqlCdcHelper.isCdc(newCdc));
40+
41+
// migration from legacy to new config
42+
final JsonNode mixNonCdc = Jsons.jsonNode(Map.of(
43+
"replication_method", "CDC",
44+
"replication", Jsons.jsonNode(Map.of("replication_type", "STANDARD"))));
45+
assertFalse(MssqlCdcHelper.isCdc(mixNonCdc));
46+
47+
final JsonNode mixCdc = Jsons.jsonNode(Map.of(
48+
"replication_method", "Standard",
49+
"replication", Jsons.jsonNode(Map.of(
50+
"replication_type", "CDC",
51+
"data_to_sync", "Existing and New",
52+
"snapshot_isolation", "Snapshot"))));
53+
assertTrue(MssqlCdcHelper.isCdc(mixCdc));
4054
}
4155

4256
@Test
@@ -58,6 +72,23 @@ public void testGetSnapshotIsolation() {
5872
"data_to_sync", "Existing and New",
5973
"snapshot_isolation", "Snapshot"))));
6074
assertEquals(SnapshotIsolation.SNAPSHOT, MssqlCdcHelper.getSnapshotIsolationConfig(newCdcSnapshot));
75+
76+
// migration from legacy to new config
77+
final JsonNode mixCdcNonSnapshot = Jsons.jsonNode(Map.of(
78+
"replication_method", "Standard",
79+
"replication", Jsons.jsonNode(Map.of(
80+
"replication_type", "CDC",
81+
"data_to_sync", "Existing and New",
82+
"snapshot_isolation", "Read Committed"))));
83+
assertEquals(SnapshotIsolation.READ_COMMITTED, MssqlCdcHelper.getSnapshotIsolationConfig(mixCdcNonSnapshot));
84+
85+
final JsonNode mixCdcSnapshot = Jsons.jsonNode(Map.of(
86+
"replication_method", "Standard",
87+
"replication", Jsons.jsonNode(Map.of(
88+
"replication_type", "CDC",
89+
"data_to_sync", "Existing and New",
90+
"snapshot_isolation", "Snapshot"))));
91+
assertEquals(SnapshotIsolation.SNAPSHOT, MssqlCdcHelper.getSnapshotIsolationConfig(mixCdcSnapshot));
6192
}
6293

6394
@Test
@@ -79,6 +110,23 @@ public void testGetDataToSyncConfig() {
79110
"data_to_sync", "New Changes Only",
80111
"snapshot_isolation", "Snapshot"))));
81112
assertEquals(DataToSync.NEW_CHANGES_ONLY, MssqlCdcHelper.getDataToSyncConfig(newCdcNewOnly));
113+
114+
final JsonNode mixCdcExistingAndNew = Jsons.jsonNode(Map.of(
115+
"replication_method", "Standard",
116+
"replication", Jsons.jsonNode(Map.of(
117+
"replication_type", "CDC",
118+
"data_to_sync", "Existing and New",
119+
"snapshot_isolation", "Read Committed"))));
120+
assertEquals(DataToSync.EXISTING_AND_NEW, MssqlCdcHelper.getDataToSyncConfig(mixCdcExistingAndNew));
121+
122+
final JsonNode mixCdcNewOnly = Jsons.jsonNode(Map.of(
123+
"replication_method", "Standard",
124+
"replication",
125+
Jsons.jsonNode(Map.of(
126+
"replication_type", "CDC",
127+
"data_to_sync", "New Changes Only",
128+
"snapshot_isolation", "Snapshot"))));
129+
assertEquals(DataToSync.NEW_CHANGES_ONLY, MssqlCdcHelper.getDataToSyncConfig(mixCdcNewOnly));
82130
}
83131

84132
}

0 commit comments

Comments
 (0)