Skip to content

Commit 5432fab

Browse files
Resumable Full Refresh sync for mssql (#37451)
Co-authored-by: Xiaohan Song <[email protected]>
1 parent 395effc commit 5432fab

File tree

24 files changed

+598
-381
lines changed

24 files changed

+598
-381
lines changed

airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt

+10-7
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ internal class DefaultJdbcSourceAcceptanceTest :
4040
JdbcSourceAcceptanceTest<
4141
DefaultJdbcSourceAcceptanceTest.PostgresTestSource, BareBonesTestDatabase>() {
4242
override fun config(): JsonNode {
43-
return testdb.testConfigBuilder().build()
43+
return testdb?.testConfigBuilder()?.build()!!
4444
}
4545

4646
override fun source(): PostgresTestSource {
@@ -181,12 +181,15 @@ internal class DefaultJdbcSourceAcceptanceTest :
181181
fun testCustomParametersOverwriteDefaultParametersExpectException() {
182182
val connectionPropertiesUrl = "ssl=false"
183183
val config =
184-
getConfigWithConnectionProperties(
185-
PSQL_CONTAINER,
186-
testdb.databaseName,
187-
connectionPropertiesUrl
188-
)
189-
val customParameters = parseJdbcParameters(config, JdbcUtils.CONNECTION_PROPERTIES_KEY, "&")
184+
testdb?.let {
185+
getConfigWithConnectionProperties(
186+
PSQL_CONTAINER,
187+
it.databaseName,
188+
connectionPropertiesUrl
189+
)
190+
}
191+
val customParameters =
192+
parseJdbcParameters(config!!, JdbcUtils.CONNECTION_PROPERTIES_KEY, "&")
190193
val defaultParameters = mapOf("ssl" to "true", "sslmode" to "require")
191194
Assertions.assertThrows(IllegalArgumentException::class.java) {
192195
JdbcDataSourceUtils.assertCustomParametersDontOverwriteDefaultParameters(

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt

+15-15
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.mockito.Mockito
4040
"The static variables are updated in subclasses for convenience, and cannot be final."
4141
)
4242
abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
43-
@JvmField protected var testdb: T = createTestDatabase()
43+
@JvmField protected var testdb: T? = null
4444

4545
protected fun streamName(): String {
4646
return TABLE_NAME
@@ -120,60 +120,60 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
120120
testdb!!.with("ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'")
121121
}
122122
testdb
123-
.with(
123+
?.with(
124124
createTableQuery(
125125
getFullyQualifiedTableName(TABLE_NAME),
126126
COLUMN_CLAUSE_WITH_PK,
127127
primaryKeyClause(listOf("id"))
128128
)
129129
)
130-
.with(
130+
?.with(
131131
"INSERT INTO %s(id, name, updated_at) VALUES (1, 'picard', '2004-10-19')",
132132
getFullyQualifiedTableName(TABLE_NAME)
133133
)
134-
.with(
134+
?.with(
135135
"INSERT INTO %s(id, name, updated_at) VALUES (2, 'crusher', '2005-10-19')",
136136
getFullyQualifiedTableName(TABLE_NAME)
137137
)
138-
.with(
138+
?.with(
139139
"INSERT INTO %s(id, name, updated_at) VALUES (3, 'vash', '2006-10-19')",
140140
getFullyQualifiedTableName(TABLE_NAME)
141141
)
142-
.with(
142+
?.with(
143143
createTableQuery(
144144
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK),
145145
COLUMN_CLAUSE_WITHOUT_PK,
146146
""
147147
)
148148
)
149-
.with(
149+
?.with(
150150
"INSERT INTO %s(id, name, updated_at) VALUES (1, 'picard', '2004-10-19')",
151151
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK)
152152
)
153-
.with(
153+
?.with(
154154
"INSERT INTO %s(id, name, updated_at) VALUES (2, 'crusher', '2005-10-19')",
155155
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK)
156156
)
157-
.with(
157+
?.with(
158158
"INSERT INTO %s(id, name, updated_at) VALUES (3, 'vash', '2006-10-19')",
159159
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK)
160160
)
161-
.with(
161+
?.with(
162162
createTableQuery(
163163
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK),
164164
COLUMN_CLAUSE_WITH_COMPOSITE_PK,
165165
primaryKeyClause(listOf("first_name", "last_name"))
166166
)
167167
)
168-
.with(
168+
?.with(
169169
"INSERT INTO %s(first_name, last_name, updated_at) VALUES ('first', 'picard', '2004-10-19')",
170170
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK)
171171
)
172-
.with(
172+
?.with(
173173
"INSERT INTO %s(first_name, last_name, updated_at) VALUES ('second', 'crusher', '2005-10-19')",
174174
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK)
175175
)
176-
.with(
176+
?.with(
177177
"INSERT INTO %s(first_name, last_name, updated_at) VALUES ('third', 'vash', '2006-10-19')",
178178
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK)
179179
)
@@ -774,11 +774,11 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
774774

775775
protected open fun executeStatementReadIncrementallyTwice() {
776776
testdb
777-
.with(
777+
?.with(
778778
"INSERT INTO %s (id, name, updated_at) VALUES (4, 'riker', '2006-10-19')",
779779
getFullyQualifiedTableName(TABLE_NAME)
780780
)
781-
.with(
781+
?.with(
782782
"INSERT INTO %s (id, name, updated_at) VALUES (5, 'data', '2006-10-19')",
783783
getFullyQualifiedTableName(TABLE_NAME)
784784
)

airbyte-integrations/connectors/source-mssql/build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ plugins {
55
airbyteJavaConnector {
66
cdkVersionRequired = '0.31.5'
77
features = ['db-sources']
8-
useLocalCdk = false
8+
useLocalCdk = true
99
}
1010

1111
java {
12-
// TODO: rewrite code to avoid javac wornings in the first place
12+
// TODO: rewrite code to avoid javac warnings in the first place
1313
compileJava {
1414
options.compilerArgs += "-Xlint:-try,-rawtypes"
1515
}

airbyte-integrations/connectors/source-mssql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
12-
dockerImageTag: 4.0.17
12+
dockerImageTag: 4.0.18
1313
dockerRepository: airbyte/source-mssql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
1515
githubIssueLabel: source-mssql

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java

+14-12
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,20 @@ public enum ReplicationMethod {
4040

4141
@VisibleForTesting
4242
static boolean isCdc(final JsonNode config) {
43-
// new replication method config since version 0.4.0
44-
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isObject()) {
45-
final JsonNode replicationConfig = config.get(LEGACY_REPLICATION_FIELD);
46-
return ReplicationMethod.valueOf(replicationConfig.get(METHOD_FIELD).asText()) == ReplicationMethod.CDC;
47-
}
48-
// legacy replication method config before version 0.4.0
49-
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isTextual()) {
50-
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
51-
}
52-
if (config.hasNonNull(REPLICATION_FIELD)) {
53-
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
54-
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
43+
if (config != null) {
44+
// new replication method config since version 0.4.0
45+
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isObject()) {
46+
final JsonNode replicationConfig = config.get(LEGACY_REPLICATION_FIELD);
47+
return ReplicationMethod.valueOf(replicationConfig.get(METHOD_FIELD).asText()) == ReplicationMethod.CDC;
48+
}
49+
// legacy replication method config before version 0.4.0
50+
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isTextual()) {
51+
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
52+
}
53+
if (config.hasNonNull(REPLICATION_FIELD)) {
54+
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
55+
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
56+
}
5557
}
5658

5759
return false;

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcStateHandler.java

-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ public AirbyteMessage saveState(final Map<String, String> offset, final SchemaHi
4545
state.put(IS_COMPRESSED, dbHistory.isCompressed());
4646

4747
final JsonNode asJson = Jsons.jsonNode(state);
48-
4948
LOGGER.info("debezium state offset: {}", Jsons.jsonNode(offset));
5049

5150
final CdcState cdcState = new CdcState().withState(asJson);

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlQueryUtils.java

+27-27
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.airbyte.cdk.integrations.source.relationaldb.models.InternalModels.StateType;
2121
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
2222
import io.airbyte.commons.json.Jsons;
23-
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
23+
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
2424
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
2525
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
2626
import java.math.BigDecimal;
@@ -185,46 +185,46 @@ public static Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair,
185185

186186
final Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair, CursorBasedStatus> cursorBasedStatusMap = new HashMap<>();
187187
streams.forEach(stream -> {
188-
try {
189-
final String name = stream.getStream().getName();
190-
final String namespace = stream.getStream().getNamespace();
191-
final String fullTableName =
192-
getFullyQualifiedTableNameWithQuoting(namespace, name, quoteString);
193-
194-
final Optional<CursorInfo> cursorInfoOptional =
195-
stateManager.getCursorInfo(new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(name, namespace));
196-
if (cursorInfoOptional.isEmpty()) {
197-
throw new RuntimeException(String.format("Stream %s was not provided with an appropriate cursor", stream.getStream().getName()));
198-
}
188+
final String name = stream.getStream().getName();
189+
final String namespace = stream.getStream().getNamespace();
190+
final String fullTableName =
191+
getFullyQualifiedTableNameWithQuoting(namespace, name, quoteString);
199192

200-
LOGGER.info("Querying max cursor value for {}.{}", namespace, name);
201-
final String cursorField = cursorInfoOptional.get().getCursorField();
193+
final Optional<CursorInfo> cursorInfoOptional =
194+
stateManager.getCursorInfo(new AirbyteStreamNameNamespacePair(name, namespace));
195+
if (cursorInfoOptional.isEmpty()) {
196+
throw new RuntimeException(String.format("Stream %s was not provided with an appropriate cursor", stream.getStream().getName()));
197+
}
198+
final CursorBasedStatus cursorBasedStatus = new CursorBasedStatus();
199+
final Optional<String> maybeCursorField = Optional.ofNullable(cursorInfoOptional.get().getCursorField());
200+
maybeCursorField.ifPresent(cursorField -> {
201+
LOGGER.info("Cursor {}. Querying max cursor value for {}.{}", cursorField, namespace, name);
202202
final String quotedCursorField = getIdentifierWithQuoting(cursorField, quoteString);
203203
final String cursorBasedSyncStatusQuery = String.format(MAX_CURSOR_VALUE_QUERY,
204204
quotedCursorField,
205205
fullTableName,
206206
quotedCursorField,
207207
quotedCursorField,
208208
fullTableName);
209-
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(cursorBasedSyncStatusQuery).executeQuery(),
210-
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
211-
final CursorBasedStatus cursorBasedStatus = new CursorBasedStatus();
212-
cursorBasedStatus.setStateType(StateType.CURSOR_BASED);
213-
cursorBasedStatus.setVersion(2L);
214-
cursorBasedStatus.setStreamName(name);
215-
cursorBasedStatus.setStreamNamespace(namespace);
209+
final List<JsonNode> jsonNodes;
210+
try {
211+
jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(cursorBasedSyncStatusQuery).executeQuery(),
212+
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
213+
} catch (SQLException e) {
214+
throw new RuntimeException("Failed to read max cursor value from %s.%s".formatted(namespace, name), e);
215+
}
216216
cursorBasedStatus.setCursorField(ImmutableList.of(cursorField));
217-
218217
if (!jsonNodes.isEmpty()) {
219218
final JsonNode result = jsonNodes.get(0);
220219
cursorBasedStatus.setCursor(result.get(cursorField).asText());
221220
cursorBasedStatus.setCursorRecordCount((long) jsonNodes.size());
222221
}
223-
224-
cursorBasedStatusMap.put(new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(name, namespace), cursorBasedStatus);
225-
} catch (final SQLException e) {
226-
throw new RuntimeException(e);
227-
}
222+
cursorBasedStatus.setStateType(StateType.CURSOR_BASED);
223+
cursorBasedStatus.setVersion(2L);
224+
cursorBasedStatus.setStreamName(name);
225+
cursorBasedStatus.setStreamNamespace(namespace);
226+
cursorBasedStatusMap.put(new AirbyteStreamNameNamespacePair(name, namespace), cursorBasedStatus);
227+
});
228228
});
229229

230230
return cursorBasedStatusMap;

0 commit comments

Comments
 (0)