Skip to content

[source-mssql] update datetimeoffset format #45142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.10
dockerImageTag: 4.1.11
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static void getIndexInfoForStreams(final JdbcDatabase database, final Con
final String query = INDEX_QUERY.formatted(fullTableName);
LOGGER.debug("Index lookup query: {}", query);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(query).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
resultSet -> new MssqlSourceOperations().rowToJson(resultSet));
if (jsonNodes != null) {
jsonNodes.stream().map(node -> Jsons.convertValue(node, Index.class))
.forEach(i -> LOGGER.info("Index {}", i));
Expand All @@ -106,7 +106,7 @@ public static String getMaxOcValueForStream(final JdbcDatabase database,
LOGGER.info("Querying for max oc value: {}", maxOcQuery);
try {
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(maxOcQuery).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
resultSet -> new MssqlSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
if (jsonNodes.get(0).get(MAX_OC_COL) == null) {
LOGGER.info("Max PK is null for table {} - this could indicate an empty table", fullTableName);
Expand Down Expand Up @@ -213,7 +213,7 @@ public static Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair,
final List<JsonNode> jsonNodes;
try {
jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(cursorBasedSyncStatusQuery).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
resultSet -> new MssqlSourceOperations().rowToJson(resultSet));
} catch (SQLException e) {
throw new RuntimeException("Failed to read max cursor value from %s.%s".formatted(namespace, name), e);
}
Expand Down Expand Up @@ -241,7 +241,7 @@ private static List<JsonNode> getTableEstimate(final JdbcDatabase database, fina
String.format(TABLE_ESTIMATE_QUERY, namespace, name);
LOGGER.info("Querying for table estimate size: {}", tableEstimateQuery);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
resultSet -> new MssqlSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
LOGGER.debug("Estimate: {}", jsonNodes);
return jsonNodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.integrations.source.mssql;

import static io.airbyte.cdk.db.DataTypeUtils.OFFSETDATETIME_FORMATTER;
import static io.airbyte.cdk.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
import static io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME;
import static io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE;
import static io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE_NAME;
Expand Down Expand Up @@ -183,7 +183,7 @@ public JsonSchemaType getAirbyteType(final JDBCType jdbcType) {
protected void setTimestampWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value)
throws SQLException {
try {
final OffsetDateTime offsetDateTime = OffsetDateTime.parse(value, OFFSETDATETIME_FORMATTER);
final OffsetDateTime offsetDateTime = OffsetDateTime.parse(value, TIMESTAMPTZ_FORMATTER);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be consistent with other places (DBZ) we use to parse MSSQL datetime offset

final Timestamp timestamp = Timestamp.valueOf(offsetDateTime.atZoneSameInstant(offsetDateTime.getOffset()).toLocalDateTime());
// Final step of conversion from
// OffsetDateTime (a Java construct) object -> Timestamp (a Java construct) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ static Optional<OrderedColumnInfo> getOrderedColumnInfo(final JdbcDatabase datab
final JDBCType ocFieldType = table.getFields().stream()
.filter(field -> field.getName().equals(ocFieldName))
.findFirst().get().getType();

final String ocMaxValue = MssqlQueryUtils.getMaxOcValueForStream(database, stream, ocFieldName, quoteString);
return Optional.of(new OrderedColumnInfo(ocFieldName, ocFieldType, ocMaxValue));
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.11 | 2024-09-04 | [45142](https://github.com/airbytehq/airbyte/pull/45142) | Fix incorrect datetimeoffset format in cursor state. |
| 4.1.10 | 2024-08-27 | [44759](https://github.com/airbytehq/airbyte/pull/44759) | Improve null safety in parsing debezium change events. |
| 4.1.9 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
| 4.1.8 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. |
Expand Down
Loading