Skip to content

Commit fe3f459

Browse files
mssql-source:upgrade debezium version to 1.9.6 (#18732)
* mssql-source:upgrade debezium version to 1.9.6 * more improvements * upgrade version * auto-bump connector version * fix test Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 7bb3241 commit fe3f459

File tree

23 files changed

+323
-109
lines changed

23 files changed

+323
-109
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@
765765
- name: Microsoft SQL Server (MSSQL)
766766
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
767767
dockerRepository: airbyte/source-mssql
768-
dockerImageTag: 0.4.24
768+
dockerImageTag: 0.4.25
769769
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
770770
icon: mssql.svg
771771
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6897,7 +6897,7 @@
68976897
supportsNormalization: false
68986898
supportsDBT: false
68996899
supported_destination_sync_modes: []
6900-
- dockerImage: "airbyte/source-mssql:0.4.24"
6900+
- dockerImage: "airbyte/source-mssql:0.4.25"
69016901
spec:
69026902
documentationUrl: "https://docs.airbyte.com/integrations/destinations/mssql"
69036903
connectionSpecification:
@@ -7071,6 +7071,18 @@
70717071
\ the \"Snapshot\" level, you must enable the <a href=\"https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server\"\
70727072
>snapshot isolation mode</a> on the database."
70737073
order: 2
7074+
initial_waiting_seconds:
7075+
type: "integer"
7076+
title: "Initial Waiting Time in Seconds (Advanced)"
7077+
description: "The amount of time the connector will wait when it launches\
7078+
\ to determine if there is new data to sync or not. Defaults to\
7079+
\ 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about\
7080+
\ <a href=\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\"\
7081+
>initial waiting time</a>."
7082+
default: 300
7083+
min: 120
7084+
max: 1200
7085+
order: 3
70747086
tunnel_method:
70757087
type: "object"
70767088
title: "SSH Tunnel Method"

airbyte-integrations/bases/debezium-v1-9-6/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ dependencies {
1111

1212
implementation 'io.debezium:debezium-api:1.9.6.Final'
1313
implementation 'io.debezium:debezium-embedded:1.9.6.Final'
14-
// implementation 'io.debezium:debezium-connector-sqlserver:1.9.2.Final'
14+
implementation 'io.debezium:debezium-connector-sqlserver:1.9.6.Final'
1515
implementation 'io.debezium:debezium-connector-mysql:1.9.6.Final'
1616
implementation 'io.debezium:debezium-connector-postgres:1.9.6.Final'
1717
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.debezium.internals;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import java.time.Duration;
9+
import java.util.Optional;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
public class FirstRecordWaitTimeUtil {
14+
15+
private static final Logger LOGGER = LoggerFactory.getLogger(FirstRecordWaitTimeUtil.class);
16+
17+
public static final Duration MIN_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(2);
18+
public static final Duration MAX_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(20);
19+
public static final Duration DEFAULT_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(5);
20+
21+
public static void checkFirstRecordWaitTime(final JsonNode config) {
22+
// we need to skip the check because in tests, we set initial_waiting_seconds
23+
// to 5 seconds for performance reasons, which is shorter than the minimum
24+
// value allowed in production
25+
if (config.has("is_test") && config.get("is_test").asBoolean()) {
26+
return;
27+
}
28+
29+
final Optional<Integer> firstRecordWaitSeconds = getFirstRecordWaitSeconds(config);
30+
if (firstRecordWaitSeconds.isPresent()) {
31+
final int seconds = firstRecordWaitSeconds.get();
32+
if (seconds < MIN_FIRST_RECORD_WAIT_TIME.getSeconds() || seconds > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) {
33+
throw new IllegalArgumentException(
34+
String.format("initial_waiting_seconds must be between %d and %d seconds",
35+
MIN_FIRST_RECORD_WAIT_TIME.getSeconds(), MAX_FIRST_RECORD_WAIT_TIME.getSeconds()));
36+
}
37+
}
38+
}
39+
40+
public static Duration getFirstRecordWaitTime(final JsonNode config) {
41+
final boolean isTest = config.has("is_test") && config.get("is_test").asBoolean();
42+
Duration firstRecordWaitTime = DEFAULT_FIRST_RECORD_WAIT_TIME;
43+
44+
final Optional<Integer> firstRecordWaitSeconds = getFirstRecordWaitSeconds(config);
45+
if (firstRecordWaitSeconds.isPresent()) {
46+
firstRecordWaitTime = Duration.ofSeconds(firstRecordWaitSeconds.get());
47+
if (!isTest && firstRecordWaitTime.compareTo(MIN_FIRST_RECORD_WAIT_TIME) < 0) {
48+
LOGGER.warn("First record waiting time is overridden to {} minutes, which is the min time allowed for safety.",
49+
MIN_FIRST_RECORD_WAIT_TIME.toMinutes());
50+
firstRecordWaitTime = MIN_FIRST_RECORD_WAIT_TIME;
51+
} else if (!isTest && firstRecordWaitTime.compareTo(MAX_FIRST_RECORD_WAIT_TIME) > 0) {
52+
LOGGER.warn("First record waiting time is overridden to {} minutes, which is the max time allowed for safety.",
53+
MAX_FIRST_RECORD_WAIT_TIME.toMinutes());
54+
firstRecordWaitTime = MAX_FIRST_RECORD_WAIT_TIME;
55+
}
56+
}
57+
58+
LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds());
59+
return firstRecordWaitTime;
60+
}
61+
62+
public static Optional<Integer> getFirstRecordWaitSeconds(final JsonNode config) {
63+
final JsonNode replicationMethod = config.get("replication_method");
64+
if (replicationMethod != null && replicationMethod.has("initial_waiting_seconds")) {
65+
final int seconds = config.get("replication_method").get("initial_waiting_seconds").asInt();
66+
return Optional.of(seconds);
67+
}
68+
return Optional.empty();
69+
}
70+
71+
}

airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,21 @@
44

55
package io.airbyte.integrations.debezium.internals;
66

7+
import com.microsoft.sqlserver.jdbc.Geography;
8+
import com.microsoft.sqlserver.jdbc.Geometry;
9+
import com.microsoft.sqlserver.jdbc.SQLServerException;
710
import io.airbyte.db.DataTypeUtils;
811
import io.debezium.spi.converter.CustomConverter;
912
import io.debezium.spi.converter.RelationalColumn;
1013
import java.math.BigDecimal;
14+
import java.nio.charset.Charset;
1115
import java.sql.Timestamp;
16+
import java.time.OffsetDateTime;
17+
import java.time.format.DateTimeFormatter;
1218
import java.util.Objects;
1319
import java.util.Properties;
1420
import java.util.Set;
21+
import microsoft.sql.DateTimeOffset;
1522
import org.apache.kafka.connect.data.SchemaBuilder;
1623
import org.slf4j.Logger;
1724
import org.slf4j.LoggerFactory;
@@ -20,9 +27,14 @@ public class MSSQLConverter implements CustomConverter<SchemaBuilder, Relational
2027

2128
private final Logger LOGGER = LoggerFactory.getLogger(MSSQLConverter.class);
2229

23-
private final Set<String> DATE_TYPES = Set.of("DATE", "DATETIME", "DATETIME2", "DATETIMEOFFSET", "SMALLDATETIME");
24-
private final String TIME_TYPE = "TIME";
25-
private final String SMALLMONEY_TYPE = "SMALLMONEY";
30+
private final Set<String> DATE_TYPES = Set.of("DATE", "DATETIME", "DATETIME2", "SMALLDATETIME");
31+
private final Set<String> BINARY = Set.of("VARBINARY", "BINARY");
32+
private static final String DATETIMEOFFSET = "DATETIMEOFFSET";
33+
private static final String TIME_TYPE = "TIME";
34+
private static final String SMALLMONEY_TYPE = "SMALLMONEY";
35+
private static final String GEOMETRY = "GEOMETRY";
36+
private static final String GEOGRAPHY = "GEOGRAPHY";
37+
private static final String DEBEZIUM_DATETIMEOFFSET_FORMAT = "yyyy-MM-dd HH:mm:ss XXX";
2638

2739
@Override
2840
public void configure(Properties props) {}
@@ -34,11 +46,61 @@ public void converterFor(final RelationalColumn field,
3446
registerDate(field, registration);
3547
} else if (SMALLMONEY_TYPE.equalsIgnoreCase(field.typeName())) {
3648
registerMoney(field, registration);
49+
} else if (BINARY.contains(field.typeName().toUpperCase())) {
50+
registerBinary(field, registration);
51+
} else if (GEOMETRY.equalsIgnoreCase(field.typeName())) {
52+
registerGeometry(field, registration);
53+
} else if (GEOGRAPHY.equalsIgnoreCase(field.typeName())) {
54+
registerGeography(field, registration);
3755
} else if (TIME_TYPE.equalsIgnoreCase(field.typeName())) {
3856
registerTime(field, registration);
57+
} else if (DATETIMEOFFSET.equalsIgnoreCase(field.typeName())) {
58+
registerDateTimeOffSet(field, registration);
3959
}
4060
}
4161

62+
private void registerGeometry(final RelationalColumn field,
63+
final ConverterRegistration<SchemaBuilder> registration) {
64+
registration.register(SchemaBuilder.string(), input -> {
65+
if (Objects.isNull(input)) {
66+
return DebeziumConverterUtils.convertDefaultValue(field);
67+
}
68+
69+
if (input instanceof byte[]) {
70+
try {
71+
return Geometry.deserialize((byte[]) input).toString();
72+
} catch (SQLServerException e) {
73+
LOGGER.error(e.getMessage());
74+
}
75+
}
76+
77+
LOGGER.warn("Uncovered Geometry class type '{}'. Use default converter",
78+
input.getClass().getName());
79+
return input.toString();
80+
});
81+
}
82+
83+
private void registerGeography(final RelationalColumn field,
84+
final ConverterRegistration<SchemaBuilder> registration) {
85+
registration.register(SchemaBuilder.string(), input -> {
86+
if (Objects.isNull(input)) {
87+
return DebeziumConverterUtils.convertDefaultValue(field);
88+
}
89+
90+
if (input instanceof byte[]) {
91+
try {
92+
return Geography.deserialize((byte[]) input).toString();
93+
} catch (SQLServerException e) {
94+
LOGGER.error(e.getMessage());
95+
}
96+
}
97+
98+
LOGGER.warn("Uncovered Geography class type '{}'. Use default converter",
99+
input.getClass().getName());
100+
return input.toString();
101+
});
102+
}
103+
42104
private void registerDate(final RelationalColumn field,
43105
final ConverterRegistration<SchemaBuilder> registration) {
44106
registration.register(SchemaBuilder.string(), input -> {
@@ -50,6 +112,25 @@ private void registerDate(final RelationalColumn field,
50112
});
51113
}
52114

115+
private void registerDateTimeOffSet(final RelationalColumn field,
116+
final ConverterRegistration<SchemaBuilder> registration) {
117+
registration.register(SchemaBuilder.string(), input -> {
118+
if (Objects.isNull(input)) {
119+
return DebeziumConverterUtils.convertDefaultValue(field);
120+
}
121+
122+
if (input instanceof DateTimeOffset) {
123+
return DataTypeUtils.toISO8601String(
124+
OffsetDateTime.parse(input.toString(),
125+
DateTimeFormatter.ofPattern(DEBEZIUM_DATETIMEOFFSET_FORMAT)));
126+
}
127+
128+
LOGGER.warn("Uncovered DateTimeOffSet class type '{}'. Use default converter",
129+
input.getClass().getName());
130+
return input.toString();
131+
});
132+
}
133+
53134
private void registerTime(final RelationalColumn field,
54135
final ConverterRegistration<SchemaBuilder> registration) {
55136
registration.register(SchemaBuilder.string(), input -> {
@@ -84,4 +165,21 @@ private void registerMoney(final RelationalColumn field,
84165
});
85166
}
86167

168+
private void registerBinary(final RelationalColumn field,
169+
final ConverterRegistration<SchemaBuilder> registration) {
170+
registration.register(SchemaBuilder.string(), input -> {
171+
if (Objects.isNull(input)) {
172+
return DebeziumConverterUtils.convertDefaultValue(field);
173+
}
174+
175+
if (input instanceof byte[]) {
176+
return new String((byte[]) input, Charset.defaultCharset());
177+
}
178+
179+
LOGGER.warn("Uncovered binary class type '{}'. Use default converter",
180+
input.getClass().getName());
181+
return input.toString();
182+
});
183+
}
184+
87185
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.debezium.internals;
6+
7+
import static io.airbyte.integrations.debezium.internals.FirstRecordWaitTimeUtil.MAX_FIRST_RECORD_WAIT_TIME;
8+
import static io.airbyte.integrations.debezium.internals.FirstRecordWaitTimeUtil.MIN_FIRST_RECORD_WAIT_TIME;
9+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
10+
import static org.junit.jupiter.api.Assertions.assertEquals;
11+
import static org.junit.jupiter.api.Assertions.assertThrows;
12+
13+
import com.fasterxml.jackson.databind.JsonNode;
14+
import io.airbyte.commons.json.Jsons;
15+
import java.time.Duration;
16+
import java.util.Collections;
17+
import java.util.Map;
18+
import java.util.Optional;
19+
import org.junit.jupiter.api.Test;
20+
21+
public class FirstRecordWaitTimeUtilTest {
22+
23+
@Test
24+
void testGetFirstRecordWaitTime() {
25+
final JsonNode emptyConfig = Jsons.jsonNode(Collections.emptyMap());
26+
assertDoesNotThrow(() -> FirstRecordWaitTimeUtil.checkFirstRecordWaitTime(emptyConfig));
27+
assertEquals(Optional.empty(), FirstRecordWaitTimeUtil.getFirstRecordWaitSeconds(emptyConfig));
28+
assertEquals(FirstRecordWaitTimeUtil.DEFAULT_FIRST_RECORD_WAIT_TIME, FirstRecordWaitTimeUtil.getFirstRecordWaitTime(emptyConfig));
29+
30+
final JsonNode normalConfig = Jsons.jsonNode(Map.of("replication_method",
31+
Map.of("method", "CDC", "initial_waiting_seconds", 500)));
32+
assertDoesNotThrow(() -> FirstRecordWaitTimeUtil.checkFirstRecordWaitTime(normalConfig));
33+
assertEquals(Optional.of(500), FirstRecordWaitTimeUtil.getFirstRecordWaitSeconds(normalConfig));
34+
assertEquals(Duration.ofSeconds(500), FirstRecordWaitTimeUtil.getFirstRecordWaitTime(normalConfig));
35+
36+
final int tooShortTimeout = (int) MIN_FIRST_RECORD_WAIT_TIME.getSeconds() - 1;
37+
final JsonNode tooShortConfig = Jsons.jsonNode(Map.of("replication_method",
38+
Map.of("method", "CDC", "initial_waiting_seconds", tooShortTimeout)));
39+
assertThrows(IllegalArgumentException.class, () -> FirstRecordWaitTimeUtil.checkFirstRecordWaitTime(tooShortConfig));
40+
assertEquals(Optional.of(tooShortTimeout), FirstRecordWaitTimeUtil.getFirstRecordWaitSeconds(tooShortConfig));
41+
assertEquals(MIN_FIRST_RECORD_WAIT_TIME, FirstRecordWaitTimeUtil.getFirstRecordWaitTime(tooShortConfig));
42+
43+
final int tooLongTimeout = (int) MAX_FIRST_RECORD_WAIT_TIME.getSeconds() + 1;
44+
final JsonNode tooLongConfig = Jsons.jsonNode(Map.of("replication_method",
45+
Map.of("method", "CDC", "initial_waiting_seconds", tooLongTimeout)));
46+
assertThrows(IllegalArgumentException.class, () -> FirstRecordWaitTimeUtil.checkFirstRecordWaitTime(tooLongConfig));
47+
assertEquals(Optional.of(tooLongTimeout), FirstRecordWaitTimeUtil.getFirstRecordWaitSeconds(tooLongConfig));
48+
assertEquals(MAX_FIRST_RECORD_WAIT_TIME, FirstRecordWaitTimeUtil.getFirstRecordWaitTime(tooLongConfig));
49+
}
50+
51+
}

airbyte-integrations/bases/debezium-v1-9-6/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public abstract class CdcSourceTest {
6464
protected static final String COL_ID = "id";
6565
protected static final String COL_MAKE_ID = "make_id";
6666
protected static final String COL_MODEL = "model";
67+
protected static final int INITIAL_WAITING_SECONDS = 5;
6768

6869
protected final List<JsonNode> MODEL_RECORDS_RANDOM = ImmutableList.of(
6970
Jsons

airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.4.24
19+
LABEL io.airbyte.version=0.4.25
2020
LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt

airbyte-integrations/connectors/source-mssql-strict-encrypt/src/test/resources/expected_spec.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,15 @@
147147
"enum": ["Snapshot", "Read Committed"],
148148
"description": "Existing data in the database are synced through an initial snapshot. This parameter controls the isolation level that will be used during the initial snapshotting. If you choose the \"Snapshot\" level, you must enable the <a href=\"https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server\">snapshot isolation mode</a> on the database.",
149149
"order": 2
150+
},
151+
"initial_waiting_seconds": {
152+
"type": "integer",
153+
"title": "Initial Waiting Time in Seconds (Advanced)",
154+
"description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about <a href=\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\">initial waiting time</a>.",
155+
"default": 300,
156+
"min": 120,
157+
"max": 1200,
158+
"order": 3
150159
}
151160
}
152161
}

airbyte-integrations/connectors/source-mssql/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-mssql
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.4.24
19+
LABEL io.airbyte.version=0.4.25
2020
LABEL io.airbyte.name=airbyte/source-mssql

airbyte-integrations/connectors/source-mssql/build.gradle

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@ dependencies {
1515

1616
implementation project(':airbyte-db:db-lib')
1717
implementation project(':airbyte-integrations:bases:base-java')
18-
implementation project(':airbyte-integrations:bases:debezium-v1-4-2')
18+
implementation project(':airbyte-integrations:bases:debezium-v1-9-6')
1919
implementation project(':airbyte-protocol:protocol-models')
2020
implementation project(':airbyte-integrations:connectors:source-jdbc')
2121
implementation project(':airbyte-integrations:connectors:source-relational-db')
2222

23-
implementation 'io.debezium:debezium-connector-sqlserver:1.4.2.Final'
23+
implementation 'io.debezium:debezium-connector-sqlserver:1.9.6.Final'
2424
implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14'
25+
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'
2526

26-
testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-4-2'))
27+
testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-6'))
2728
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
2829

2930
testImplementation 'org.apache.commons:commons-lang3:3.11'

0 commit comments

Comments
 (0)