Skip to content

Commit 458e96e

Browse files
authored
[source-mssql] update datetimeoffset format (#45142)
1 parent 4264c08 commit 458e96e

File tree

6 files changed

+9
-10
lines changed

6 files changed

+9
-10
lines changed

airbyte-integrations/connectors/source-mssql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
12-
dockerImageTag: 4.1.10
12+
dockerImageTag: 4.1.11
1313
dockerRepository: airbyte/source-mssql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
1515
githubIssueLabel: source-mssql

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlQueryUtils.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import com.google.common.collect.ImmutableList;
1515
import com.microsoft.sqlserver.jdbc.SQLServerResultSetMetaData;
1616
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
17-
import io.airbyte.cdk.db.jdbc.JdbcUtils;
1817
import io.airbyte.cdk.integrations.source.relationaldb.CursorInfo;
1918
import io.airbyte.cdk.integrations.source.relationaldb.models.CursorBasedStatus;
2019
import io.airbyte.cdk.integrations.source.relationaldb.models.InternalModels.StateType;
@@ -79,7 +78,7 @@ public static void getIndexInfoForStreams(final JdbcDatabase database, final Con
7978
final String query = INDEX_QUERY.formatted(fullTableName);
8079
LOGGER.debug("Index lookup query: {}", query);
8180
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(query).executeQuery(),
82-
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
81+
resultSet -> new MssqlSourceOperations().rowToJson(resultSet));
8382
if (jsonNodes != null) {
8483
jsonNodes.stream().map(node -> Jsons.convertValue(node, Index.class))
8584
.forEach(i -> LOGGER.info("Index {}", i));
@@ -106,7 +105,7 @@ public static String getMaxOcValueForStream(final JdbcDatabase database,
106105
LOGGER.info("Querying for max oc value: {}", maxOcQuery);
107106
try {
108107
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(maxOcQuery).executeQuery(),
109-
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
108+
resultSet -> new MssqlSourceOperations().rowToJson(resultSet));
110109
Preconditions.checkState(jsonNodes.size() == 1);
111110
if (jsonNodes.get(0).get(MAX_OC_COL) == null) {
112111
LOGGER.info("Max PK is null for table {} - this could indicate an empty table", fullTableName);
@@ -213,7 +212,7 @@ public static Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair,
213212
final List<JsonNode> jsonNodes;
214213
try {
215214
jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(cursorBasedSyncStatusQuery).executeQuery(),
216-
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
215+
resultSet -> new MssqlSourceOperations().rowToJson(resultSet));
217216
} catch (SQLException e) {
218217
throw new RuntimeException("Failed to read max cursor value from %s.%s".formatted(namespace, name), e);
219218
}
@@ -241,7 +240,7 @@ private static List<JsonNode> getTableEstimate(final JdbcDatabase database, fina
241240
String.format(TABLE_ESTIMATE_QUERY, namespace, name);
242241
LOGGER.info("Querying for table estimate size: {}", tableEstimateQuery);
243242
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery),
244-
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
243+
resultSet -> new MssqlSourceOperations().rowToJson(resultSet));
245244
Preconditions.checkState(jsonNodes.size() == 1);
246245
LOGGER.debug("Estimate: {}", jsonNodes);
247246
return jsonNodes;

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package io.airbyte.integrations.source.mssql;
66

7-
import static io.airbyte.cdk.db.DataTypeUtils.OFFSETDATETIME_FORMATTER;
7+
import static io.airbyte.cdk.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
88
import static io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME;
99
import static io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE;
1010
import static io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE_NAME;
@@ -183,7 +183,7 @@ public JsonSchemaType getAirbyteType(final JDBCType jdbcType) {
183183
protected void setTimestampWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value)
184184
throws SQLException {
185185
try {
186-
final OffsetDateTime offsetDateTime = OffsetDateTime.parse(value, OFFSETDATETIME_FORMATTER);
186+
final OffsetDateTime offsetDateTime = OffsetDateTime.parse(value, TIMESTAMPTZ_FORMATTER);
187187
final Timestamp timestamp = Timestamp.valueOf(offsetDateTime.atZoneSameInstant(offsetDateTime.getOffset()).toLocalDateTime());
188188
// Final step of conversion from
189189
// OffsetDateTime (a Java construct) object -> Timestamp (a Java construct) ->

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialReadUtil.java

-1
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,6 @@ static Optional<OrderedColumnInfo> getOrderedColumnInfo(final JdbcDatabase datab
436436
final JDBCType ocFieldType = table.getFields().stream()
437437
.filter(field -> field.getName().equals(ocFieldName))
438438
.findFirst().get().getType();
439-
440439
final String ocMaxValue = MssqlQueryUtils.getMaxOcValueForStream(database, stream, ocFieldName, quoteString);
441440
return Optional.of(new OrderedColumnInfo(ocFieldName, ocFieldType, ocMaxValue));
442441
}

airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceOperationsTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void setDateTimeOffsetColumnAsCursor() throws SQLException {
6161
executeQuery(insertQuery);
6262
expectedRecords.add(jsonNode);
6363
}
64-
final String cursorAnchorValue = "2023-01-01 00:00:00.0000000 +00:00";
64+
final String cursorAnchorValue = "2023-01-01T00:00:00.000000+00:00";
6565
final List<JsonNode> actualRecords = new ArrayList<>();
6666
try (final Connection connection = testdb.getContainer().createConnection("")) {
6767
final PreparedStatement preparedStatement = connection.prepareStatement(

docs/integrations/sources/mssql.md

+1
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura
422422

423423
| Version | Date | Pull Request | Subject |
424424
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
425+
| 4.1.11 | 2024-09-04 | [45142](https://github.com/airbytehq/airbyte/pull/45142) | Fix incorrect datetimeoffset format in cursor state. |
425426
| 4.1.10 | 2024-08-27 | [44759](https://github.com/airbytehq/airbyte/pull/44759) | Improve null safety in parsing debezium change events. |
426427
| 4.1.9 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
427428
| 4.1.8 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. |

0 commit comments

Comments
 (0)