Skip to content

Commit 40b3819

Browse files
sashaNeshcheretsherifnada
authored andcommitted
🐛 Fix data type tests in CdcPostgresSourceDatatypeTest (airbytehq#7339)
* Fix data type tests in CdcPostgresSourceComprehensiveTest * update style format * bump version for postgres source * bump version for postgres source in json definition * remove unnecessary comments from test and bump version for postgres strict encrypt source * resolved potential conflicts with debezium utils in mssql converter implementation * resolved potential conflicts with debezium utils in mssql converter implementation * Update notes for money type in postgres.md Co-authored-by: Sherif A. Nada <[email protected]> * Update docs/integrations/sources/postgres.md Co-authored-by: Sherif A. Nada <[email protected]> * added test cases for converting data values for postgres cdc, remove time zone utc from test container * remove redundant void message from test Co-authored-by: Sherif A. Nada <[email protected]> * update style format * fix time zone in DebeziumConverterUtilsTest * set utc time zone in DataTypeUtils * set utc time zone for date format * revert changes regarding timezone in date format, disable tests with number and duration Co-authored-by: Sherif A. Nada <[email protected]>
1 parent dfdfe1f commit 40b3819

File tree

12 files changed

+463
-220
lines changed

12 files changed

+463
-220
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
33
"name": "Postgres",
44
"dockerRepository": "airbyte/source-postgres",
5-
"dockerImageTag": "0.3.9",
5+
"dockerImageTag": "0.3.13",
66
"documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres",
77
"icon": "postgresql.svg"
88
}

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@
405405
- name: Postgres
406406
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
407407
dockerRepository: airbyte/source-postgres
408-
dockerImageTag: 0.3.11
408+
dockerImageTag: 0.3.13
409409
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
410410
icon: postgresql.svg
411411
sourceType: database
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.debezium.internals;
6+
7+
import io.airbyte.db.DataTypeUtils;
8+
import io.debezium.spi.converter.RelationalColumn;
9+
import java.sql.Timestamp;
10+
import java.time.Duration;
11+
import java.time.LocalDate;
12+
import java.time.LocalDateTime;
13+
import java.time.format.DateTimeParseException;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
public final class DebeziumConverterUtils {
18+
19+
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumConverterUtils.class);
20+
21+
private DebeziumConverterUtils() {
22+
throw new UnsupportedOperationException();
23+
}
24+
25+
public static String convertDate(final Object input) {
26+
/**
27+
* While building this custom converter we were not sure what type debezium could return cause there
28+
* is no mention of it in the documentation. Secondly if you take a look at
29+
* {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter#converterFor(io.debezium.spi.converter.RelationalColumn, io.debezium.spi.converter.CustomConverter.ConverterRegistration)}
30+
* method, even it is handling multiple data types but its not clear under what circumstances which
31+
* data type would be returned. I just went ahead and handled the data types that made sense.
32+
* Secondly, we use LocalDateTime to handle this cause it represents DATETIME datatype in JAVA
33+
*/
34+
if (input instanceof LocalDateTime) {
35+
return DataTypeUtils.toISO8601String((LocalDateTime) input);
36+
} else if (input instanceof LocalDate) {
37+
return DataTypeUtils.toISO8601String((LocalDate) input);
38+
} else if (input instanceof Duration) {
39+
return DataTypeUtils.toISO8601String((Duration) input);
40+
} else if (input instanceof Timestamp) {
41+
return DataTypeUtils.toISO8601String(((Timestamp) input).toLocalDateTime());
42+
} else if (input instanceof Number) {
43+
return DataTypeUtils.toISO8601String(
44+
new Timestamp(((Number) input).longValue()).toLocalDateTime());
45+
} else if (input instanceof String) {
46+
try {
47+
return LocalDateTime.parse((String) input).toString();
48+
} catch (final DateTimeParseException e) {
49+
LOGGER.warn("Cannot convert value '{}' to LocalDateTime type", input);
50+
return input.toString();
51+
}
52+
}
53+
LOGGER.warn("Uncovered date class type '{}'. Use default converter", input.getClass().getName());
54+
return input.toString();
55+
}
56+
57+
public static Object convertDefaultValue(RelationalColumn field) {
58+
if (field.isOptional()) {
59+
return null;
60+
} else if (field.hasDefaultValue()) {
61+
return field.defaultValue();
62+
}
63+
return null;
64+
}
65+
66+
}

airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,8 @@
44

55
package io.airbyte.integrations.debezium.internals;
66

7-
import io.airbyte.db.DataTypeUtils;
87
import io.debezium.spi.converter.CustomConverter;
98
import io.debezium.spi.converter.RelationalColumn;
10-
import java.sql.Timestamp;
11-
import java.time.Duration;
12-
import java.time.LocalDate;
13-
import java.time.LocalDateTime;
14-
import java.time.format.DateTimeParseException;
159
import java.util.Arrays;
1610
import java.util.Properties;
1711
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -61,50 +55,15 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat
6155

6256
if (x instanceof byte[]) {
6357
return new String((byte[]) x);
64-
} else
58+
} else {
6559
return x.toString();
60+
}
6661
});
6762
}
6863

6964
private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
70-
registration.register(SchemaBuilder.string(), x -> {
71-
if (x == null) {
72-
if (field.isOptional()) {
73-
return null;
74-
} else if (field.hasDefaultValue()) {
75-
return field.defaultValue();
76-
}
77-
return null;
78-
}
79-
/**
80-
* While building this custom converter we were not sure what type debezium could return cause there
81-
* is no mention of it in the documentation. Secondly if you take a look at
82-
* {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter#converterFor(RelationalColumn, ConverterRegistration)}
83-
* method, even it is handling multiple data types but its not clear under what circumstances which
84-
* data type would be returned. I just went ahead and handled the data types that made sense.
85-
* Secondly, we use LocalDateTime to handle this cause it represents DATETIME datatype in JAVA
86-
*/
87-
if (x instanceof LocalDateTime) {
88-
return DataTypeUtils.toISO8601String((LocalDateTime) x);
89-
} else if (x instanceof LocalDate) {
90-
return DataTypeUtils.toISO8601String((LocalDate) x);
91-
} else if (x instanceof Duration) {
92-
return DataTypeUtils.toISO8601String((Duration) x);
93-
} else if (x instanceof Timestamp) {
94-
return DataTypeUtils.toISO8601String(((Timestamp) x).toLocalDateTime());
95-
} else if (x instanceof Number) {
96-
return DataTypeUtils.toISO8601String(new Timestamp(((Number) x).longValue()).toLocalDateTime());
97-
} else if (x instanceof String) {
98-
try {
99-
return LocalDateTime.parse((String) x).toString();
100-
} catch (final DateTimeParseException e) {
101-
LOGGER.warn("Cannot convert value '{}' to LocalDateTime type", x);
102-
return x.toString();
103-
}
104-
}
105-
LOGGER.warn("Uncovered date class type '{}'. Use default converter", x.getClass().getName());
106-
return x.toString();
107-
});
65+
registration.register(SchemaBuilder.string(),
66+
x -> x == null ? DebeziumConverterUtils.convertDefaultValue(field) : DebeziumConverterUtils.convertDate(x));
10867
}
10968

11069
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.debezium.internals;
6+
7+
import io.debezium.spi.converter.CustomConverter;
8+
import io.debezium.spi.converter.RelationalColumn;
9+
import java.math.BigDecimal;
10+
import java.util.Arrays;
11+
import java.util.Properties;
12+
import org.apache.kafka.connect.data.SchemaBuilder;
13+
import org.postgresql.util.PGInterval;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
public class PostgresConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
18+
19+
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class);
20+
21+
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"};
22+
private final String[] BIT_TYPES = {"BIT", "VARBIT"};
23+
private final String[] MONEY_ITEM_TYPE = {"MONEY"};
24+
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"};
25+
private final String[] TEXT_TYPES = {"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR"};
26+
27+
@Override
28+
public void configure(Properties props) {}
29+
30+
@Override
31+
public void converterFor(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
32+
if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
33+
registerDate(field, registration);
34+
} else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))
35+
|| Arrays.stream(GEOMETRICS_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))
36+
|| Arrays.stream(BIT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
37+
registerText(field, registration);
38+
} else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
39+
registerMoney(field, registration);
40+
}
41+
}
42+
43+
private void registerText(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
44+
registration.register(SchemaBuilder.string(), x -> {
45+
if (x == null) {
46+
return DebeziumConverterUtils.convertDefaultValue(field);
47+
}
48+
49+
if (x instanceof byte[]) {
50+
return new String((byte[]) x);
51+
} else {
52+
return x.toString();
53+
}
54+
});
55+
}
56+
57+
private void registerDate(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
58+
registration.register(SchemaBuilder.string(), x -> {
59+
if (x == null) {
60+
return DebeziumConverterUtils.convertDefaultValue(field);
61+
} else if (x instanceof PGInterval) {
62+
return convertInterval((PGInterval) x);
63+
} else {
64+
return DebeziumConverterUtils.convertDate(x);
65+
}
66+
});
67+
}
68+
69+
private String convertInterval(PGInterval pgInterval) {
70+
StringBuilder resultInterval = new StringBuilder();
71+
formatDateUnit(resultInterval, pgInterval.getYears(), " year ");
72+
formatDateUnit(resultInterval, pgInterval.getMonths(), " mons ");
73+
formatDateUnit(resultInterval, pgInterval.getDays(), " days ");
74+
75+
formatTimeValues(resultInterval, pgInterval);
76+
return resultInterval.toString();
77+
}
78+
79+
private void registerMoney(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
80+
registration.register(SchemaBuilder.string(), x -> {
81+
if (x == null) {
82+
return DebeziumConverterUtils.convertDefaultValue(field);
83+
} else if (x instanceof Double) {
84+
BigDecimal result = BigDecimal.valueOf((Double) x);
85+
if (result.compareTo(new BigDecimal("999999999999999")) == 1
86+
|| result.compareTo(new BigDecimal("-999999999999999")) == -1) {
87+
return null;
88+
}
89+
return result.toString();
90+
} else {
91+
return x.toString();
92+
}
93+
});
94+
}
95+
96+
private void formatDateUnit(StringBuilder resultInterval, int dateUnit, String s) {
97+
if (dateUnit != 0) {
98+
resultInterval
99+
.append(dateUnit)
100+
.append(s);
101+
}
102+
}
103+
104+
private void formatTimeValues(StringBuilder resultInterval, PGInterval pgInterval) {
105+
if (isNegativeTime(pgInterval)) {
106+
resultInterval.append("-");
107+
}
108+
// TODO check if value more or less than Integer.MIN_VALUE Integer.MAX_VALUE,
109+
int hours = Math.abs(pgInterval.getHours());
110+
int minutes = Math.abs(pgInterval.getMinutes());
111+
int seconds = Math.abs(pgInterval.getWholeSeconds());
112+
resultInterval.append(addFirstDigit(hours));
113+
resultInterval.append(hours);
114+
resultInterval.append(":");
115+
resultInterval.append(addFirstDigit(minutes));
116+
resultInterval.append(minutes);
117+
resultInterval.append(":");
118+
resultInterval.append(addFirstDigit(seconds));
119+
resultInterval.append(seconds);
120+
}
121+
122+
private String addFirstDigit(int hours) {
123+
return hours <= 9 ? "0" : "";
124+
}
125+
126+
private boolean isNegativeTime(PGInterval pgInterval) {
127+
return pgInterval.getHours() < 0
128+
|| pgInterval.getMinutes() < 0
129+
|| pgInterval.getWholeSeconds() < 0;
130+
}
131+
132+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.debezium.internals;
6+
7+
import static org.mockito.Mockito.mock;
8+
import static org.mockito.Mockito.when;
9+
10+
import io.debezium.spi.converter.RelationalColumn;
11+
import java.sql.Timestamp;
12+
import java.time.Duration;
13+
import java.time.LocalDate;
14+
import java.time.LocalDateTime;
15+
import java.time.LocalTime;
16+
import org.junit.jupiter.api.Assertions;
17+
import org.junit.jupiter.api.Disabled;
18+
import org.junit.jupiter.api.Test;
19+
20+
class DebeziumConverterUtilsTest {
21+
22+
@Test
23+
public void convertDefaultValueTest() {
24+
25+
RelationalColumn relationalColumn = mock(RelationalColumn.class);
26+
27+
when(relationalColumn.isOptional()).thenReturn(true);
28+
Object actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn);
29+
Assertions.assertNull(actualColumnDefaultValue, "Default value for optional relational column should be null");
30+
31+
when(relationalColumn.isOptional()).thenReturn(false);
32+
when(relationalColumn.hasDefaultValue()).thenReturn(false);
33+
actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn);
34+
Assertions.assertNull(actualColumnDefaultValue);
35+
36+
when(relationalColumn.isOptional()).thenReturn(false);
37+
when(relationalColumn.hasDefaultValue()).thenReturn(true);
38+
String expectedColumnDefaultValue = "default value";
39+
when(relationalColumn.defaultValue()).thenReturn(expectedColumnDefaultValue);
40+
actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn);
41+
Assertions.assertEquals(actualColumnDefaultValue, expectedColumnDefaultValue);
42+
}
43+
44+
@Test
45+
public void convertLocalDate() {
46+
LocalDate localDate = LocalDate.of(2021, 1, 1);
47+
48+
String actual = DebeziumConverterUtils.convertDate(localDate);
49+
Assertions.assertEquals("2021-01-01T00:00:00Z", actual);
50+
}
51+
52+
@Test
53+
public void convertTLocalTime() {
54+
LocalTime localTime = LocalTime.of(8, 1, 1);
55+
String actual = DebeziumConverterUtils.convertDate(localTime);
56+
Assertions.assertEquals("08:01:01", actual);
57+
}
58+
59+
@Test
60+
public void convertLocalDateTime() {
61+
LocalDateTime localDateTime = LocalDateTime.of(2021, 1, 1, 8, 1, 1);
62+
63+
String actual = DebeziumConverterUtils.convertDate(localDateTime);
64+
Assertions.assertEquals("2021-01-01T08:01:01Z", actual);
65+
}
66+
67+
@Test
68+
@Disabled
69+
public void convertDuration() {
70+
Duration duration = Duration.ofHours(100_000);
71+
72+
String actual = DebeziumConverterUtils.convertDate(duration);
73+
Assertions.assertEquals("1981-05-29T20:00:00Z", actual);
74+
}
75+
76+
@Test
77+
public void convertTimestamp() {
78+
LocalDateTime localDateTime = LocalDateTime.of(2021, 1, 1, 8, 1, 1);
79+
Timestamp timestamp = Timestamp.valueOf(localDateTime);
80+
81+
String actual = DebeziumConverterUtils.convertDate(timestamp);
82+
Assertions.assertEquals("2021-01-01T08:01:01Z", actual);
83+
}
84+
85+
@Test
86+
@Disabled
87+
public void convertNumber() {
88+
Number number = 100_000;
89+
90+
String actual = DebeziumConverterUtils.convertDate(number);
91+
Assertions.assertEquals("1970-01-01T03:01:40Z", actual);
92+
}
93+
94+
@Test
95+
public void convertStringDateFormat() {
96+
String stringValue = "2021-01-01T00:00:00Z";
97+
98+
String actual = DebeziumConverterUtils.convertDate(stringValue);
99+
Assertions.assertEquals("2021-01-01T00:00:00Z", actual);
100+
}
101+
102+
}

airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.1.0
11+
LABEL io.airbyte.version=0.1.1
1212
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt

0 commit comments

Comments
 (0)