Skip to content

Commit ca394d2

Browse files
authored
[Source-mssql] : Add meta error handling in initial load path (#37325)
1 parent 63d4d5e commit ca394d2

File tree

5 files changed

+25
-15
lines changed

5 files changed

+25
-15
lines changed

airbyte-integrations/connectors/source-mssql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
12-
dockerImageTag: 4.0.10
12+
dockerImageTag: 4.0.11
1313
dockerRepository: airbyte/source-mssql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
1515
githubIssueLabel: source-mssql

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.microsoft.sqlserver.jdbc.Geography;
1717
import com.microsoft.sqlserver.jdbc.Geometry;
1818
import com.microsoft.sqlserver.jdbc.SQLServerResultSetMetaData;
19+
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
1920
import io.airbyte.cdk.db.jdbc.JdbcSourceOperations;
2021
import io.airbyte.integrations.source.mssql.initialsync.CdcMetadataInjector;
2122
import io.airbyte.protocol.models.JsonSchemaType;
@@ -51,13 +52,14 @@ public MssqlSourceOperations(final Optional<CdcMetadataInjector> metadataInjecto
5152
}
5253

5354
@Override
54-
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
55-
final ObjectNode jsonNode = (ObjectNode) super.rowToJson(queryContext);
55+
public AirbyteRecordData convertDatabaseRowToAirbyteRecordData(final ResultSet queryContext) throws SQLException {
56+
final AirbyteRecordData recordData = super.convertDatabaseRowToAirbyteRecordData(queryContext);
57+
final ObjectNode jsonNode = (ObjectNode) recordData.rawRowData();
5658
if (!metadataInjector.isPresent()) {
57-
return jsonNode;
59+
return recordData;
5860
}
5961
metadataInjector.get().inject(jsonNode);
60-
return jsonNode;
62+
return new AirbyteRecordData(jsonNode, recordData.meta());
6163
}
6264

6365
/**

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadHandler.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.fasterxml.jackson.databind.JsonNode;
1616
import com.google.common.annotations.VisibleForTesting;
1717
import io.airbyte.cdk.db.SqlDatabase;
18+
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
1819
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
1920
import io.airbyte.cdk.db.jdbc.JdbcUtils;
2021
import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants;
@@ -32,6 +33,7 @@
3233
import io.airbyte.protocol.models.v0.AirbyteMessage;
3334
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
3435
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
36+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
3537
import io.airbyte.protocol.models.v0.AirbyteStream;
3638
import io.airbyte.protocol.models.v0.CatalogHelpers;
3739
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
@@ -166,7 +168,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
166168
}
167169
});
168170

169-
final AutoCloseableIterator<JsonNode> queryStream =
171+
final AutoCloseableIterator<AirbyteRecordData> queryStream =
170172
new MssqlInitialLoadRecordIterator(database, sourceOperations, quoteString, initialLoadStateManager, selectedDatabaseFields, pair,
171173
calculateChunkSize(tableSizeInfoMap.get(pair), pair), isCompositePrimaryKey(airbyteStream));
172174
final AutoCloseableIterator<AirbyteMessage> recordIterator =
@@ -180,7 +182,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
180182

181183
// Transforms the given iterator to create an {@link AirbyteRecordMessage}
182184
private AutoCloseableIterator<AirbyteMessage> getRecordIterator(
183-
final AutoCloseableIterator<JsonNode> recordIterator,
185+
final AutoCloseableIterator<AirbyteRecordData> recordIterator,
184186
final String streamName,
185187
final String namespace,
186188
final long emittedAt) {
@@ -190,7 +192,12 @@ private AutoCloseableIterator<AirbyteMessage> getRecordIterator(
190192
.withStream(streamName)
191193
.withNamespace(namespace)
192194
.withEmittedAt(emittedAt)
193-
.withData(r)));
195+
.withData(r.rawRowData())
196+
.withMeta(isMetaChangesEmptyOrNull(r.meta()) ? null : r.meta())));
197+
}
198+
199+
private boolean isMetaChangesEmptyOrNull(AirbyteRecordMessageMeta meta) {
200+
return meta == null || meta.getChanges() == null || meta.getChanges().isEmpty();
194201
}
195202

196203
// Augments the given iterator with record count logs.

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadRecordIterator.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier;
88
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
99

10-
import com.fasterxml.jackson.databind.JsonNode;
1110
import com.google.common.collect.AbstractIterator;
1211
import io.airbyte.cdk.db.JdbcCompatibleSourceOperations;
12+
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
1313
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
1414
import io.airbyte.cdk.integrations.source.relationaldb.models.OrderedColumnLoadStatus;
1515
import io.airbyte.commons.util.AutoCloseableIterator;
@@ -28,12 +28,12 @@
2828
import org.slf4j.LoggerFactory;
2929

3030
@SuppressWarnings("try")
31-
public class MssqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
32-
implements AutoCloseableIterator<JsonNode> {
31+
public class MssqlInitialLoadRecordIterator extends AbstractIterator<AirbyteRecordData>
32+
implements AutoCloseableIterator<AirbyteRecordData> {
3333

3434
private static final Logger LOGGER = LoggerFactory.getLogger(MssqlInitialLoadRecordIterator.class);
3535

36-
private AutoCloseableIterator<JsonNode> currentIterator;
36+
private AutoCloseableIterator<AirbyteRecordData> currentIterator;
3737
private final JdbcDatabase database;
3838
private int numSubqueries = 0;
3939
private final String quoteString;
@@ -67,7 +67,7 @@ public class MssqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
6767

6868
@CheckForNull
6969
@Override
70-
protected JsonNode computeNext() {
70+
protected AirbyteRecordData computeNext() {
7171
if (shouldBuildNextSubquery()) {
7272
try {
7373
// We will only issue one query for a composite key load. If we have already processed all the data
@@ -82,8 +82,8 @@ protected JsonNode computeNext() {
8282
}
8383

8484
LOGGER.info("Subquery number : {}", numSubqueries);
85-
final Stream<JsonNode> stream = database.unsafeQuery(
86-
this::getOcPreparedStatement, sourceOperations::rowToJson);
85+
final Stream<AirbyteRecordData> stream = database.unsafeQuery(
86+
this::getOcPreparedStatement, sourceOperations::convertDatabaseRowToAirbyteRecordData);
8787
currentIterator = AutoCloseableIterators.fromStream(stream, pair);
8888
numSubqueries++;
8989
// If the current subquery has no records associated with it, the entire stream has been read.

docs/integrations/sources/mssql.md

+1
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura
327327

328328
| Version | Date | Pull Request | Subject |
329329
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
330+
| 4.0.11 | 2024-04-15 | [37325](https://github.com/airbytehq/airbyte/pull/37325) | Populate airbyte_meta.changes + error handling. |
330331
| 4.0.10 | 2024-04-15 | [37110](https://github.com/airbytehq/airbyte/pull/37110) | Internal cleanup. |
331332
| 4.0.9 | 2024-04-10 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix a bug in conversion of null values. |
332333
| 4.0.8 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. |

0 commit comments

Comments
 (0)