Skip to content

Commit f535283

Browse files
yurii-bidiukDoNotPanicUA
authored andcommitted
🐛 Source MSSQL: fix data type (smalldatetime, smallmoney) conversion from mssql source (airbytehq#5609) (airbytehq#7386)
* Fix data type (smalldatetime, smallmoney) conversion from mssql source (airbytehq#5609) * Fixed code format * Bumb new version * Update documentation (mssql.md) * formating * fixed converter properties * aligned converter utils with airbytehq#7339 Co-authored-by: Andrii Leonets <[email protected]>
1 parent f34b63b commit f535283

File tree

6 files changed

+72
-3
lines changed

6 files changed

+72
-3
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
33
"name": "Microsoft SQL Server (MSSQL)",
44
"dockerRepository": "airbyte/source-mssql",
5-
"dockerImageTag": "0.3.6",
5+
"dockerImageTag": "0.3.8",
66
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mssql",
77
"icon": "mssql.svg"
88
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.debezium.internals;
6+
7+
import io.debezium.spi.converter.CustomConverter;
8+
import io.debezium.spi.converter.RelationalColumn;
9+
import java.math.BigDecimal;
10+
import java.util.Objects;
11+
import java.util.Properties;
12+
import org.apache.kafka.connect.data.SchemaBuilder;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
public class MSSQLConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
17+
18+
private final Logger LOGGER = LoggerFactory.getLogger(MSSQLConverter.class);;
19+
20+
private final String SMALLDATETIME_TYPE = "SMALLDATETIME";
21+
private final String SMALLMONEY_TYPE = "SMALLMONEY";
22+
23+
@Override
24+
public void configure(Properties props) {}
25+
26+
@Override
27+
public void converterFor(final RelationalColumn field,
28+
final ConverterRegistration<SchemaBuilder> registration) {
29+
if (SMALLDATETIME_TYPE.equalsIgnoreCase(field.typeName())) {
30+
registerDate(field, registration);
31+
} else if (SMALLMONEY_TYPE.equalsIgnoreCase(field.typeName())) {
32+
registerMoney(field, registration);
33+
}
34+
35+
}
36+
37+
private void registerDate(final RelationalColumn field,
38+
final ConverterRegistration<SchemaBuilder> registration) {
39+
registration.register(SchemaBuilder.string(), input -> {
40+
if (Objects.isNull(input)) {
41+
return DebeziumConverterUtils.convertDefaultValue(field);
42+
}
43+
44+
return DebeziumConverterUtils.convertDate(input);
45+
});
46+
}
47+
48+
private void registerMoney(final RelationalColumn field,
49+
final ConverterRegistration<SchemaBuilder> registration) {
50+
registration.register(SchemaBuilder.float64(), input -> {
51+
if (Objects.isNull(input)) {
52+
return DebeziumConverterUtils.convertDefaultValue(field);
53+
}
54+
55+
if (input instanceof BigDecimal) {
56+
return ((BigDecimal) input).doubleValue();
57+
}
58+
59+
LOGGER.warn("Uncovered money class type '{}'. Use default converter",
60+
input.getClass().getName());
61+
return input.toString();
62+
});
63+
}
64+
65+
}

airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.1.0
11+
LABEL io.airbyte.version=0.1.1
1212
LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt

airbyte-integrations/connectors/source-mssql/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.6
11+
LABEL io.airbyte.version=0.3.8
1212
LABEL io.airbyte.name=airbyte/source-mssql

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcProperties.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ static Properties getDebeziumProperties() {
2626
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata
2727
props.setProperty("provide.transaction.metadata", "false");
2828

29+
props.setProperty("converters", "mssql_converter");
30+
props.setProperty("mssql_converter.type", "io.airbyte.integrations.debezium.internals.MSSQLConverter");
31+
2932
return props;
3033
}
3134

docs/integrations/sources/mssql.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ If you do not see a type in this list, assume that it is coerced into a string.
294294

295295
| Version | Date | Pull Request | Subject | |
296296
| :--- | :--- | :--- | :--- | :--- |
297+
| 0.3.8 | 2021-10-26 | [7386](https://github.com/airbytehq/airbyte/pull/7386) | Fixed data type (smalldatetime, smallmoney) conversion from mssql source | |
297298
| 0.3.7 | 2021-09-30 | [6585](https://github.com/airbytehq/airbyte/pull/6585) | Improved SSH Tunnel key generation steps | |
298299
| 0.3.6 | 2021-09-17 | [6318](https://github.com/airbytehq/airbyte/pull/6318) | Added option to connect to DB via SSH | |
299300
| 0.3.4 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator | |

0 commit comments

Comments
 (0)