-
Notifications
You must be signed in to change notification settings - Fork 4.6k
🐛 Fix data type tests in CdcPostgresSourceDatatypeTest #7339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
sashaNeshcheret
merged 22 commits into
master
from
oneshcheret/5382-fix-data-type-cdc-postgres
Nov 2, 2021
Merged
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
ff3a9f6
Fix data type tests in CdcPostgresSourceComprehensiveTest
sashaNeshcheret 8d3373f
update style format
sashaNeshcheret 6b13bc4
bump version for postgres source
sashaNeshcheret 3939003
bump version for postgres source in json definition
sashaNeshcheret 9136f21
remove unnecessary comments from test and bump version for postgres s…
sashaNeshcheret 6627c7a
Merge branch 'master' into oneshcheret/5382-fix-data-type-cdc-postgres
sashaNeshcheret e7e9a63
resolved potential conflicts with debezium utils in mssql converter i…
sashaNeshcheret 8a6eba4
Merge remote-tracking branch 'origin/master' into oneshcheret/5382-fi…
sashaNeshcheret aeef608
Merge remote-tracking branch 'origin/oneshcheret/5382-fix-data-type-c…
sashaNeshcheret 83f8bdc
resolved potential conflicts with debezium utils in mssql converter i…
sashaNeshcheret 40ab420
Update notes for money type in postgres.md
sashaNeshcheret 296bc6f
Update docs/integrations/sources/postgres.md
sashaNeshcheret 34e51e5
added test cases for converting data values for postgres cdc, remove …
sashaNeshcheret f25fd54
remove redundant void message from test
sashaNeshcheret b5107f6
Merge remote-tracking branch 'origin/master' into oneshcheret/5382-fi…
sashaNeshcheret b1c592e
update style format
sashaNeshcheret 2857a1a
fix time zone in DebeziumConverterUtilsTest
sashaNeshcheret 4c092ff
set utc time zone in DataTypeUtils
sashaNeshcheret 6f059a3
Merge remote-tracking branch 'origin/master' into oneshcheret/5382-fi…
sashaNeshcheret 6a1906e
set utc time zone for date format
sashaNeshcheret 019a724
revert changes regarding timezone in date format, disable tests with …
sashaNeshcheret 941102c
Merge remote-tracking branch 'origin/master' into oneshcheret/5382-fi…
sashaNeshcheret File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 61 additions & 0 deletions
61
...zium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.debezium.internals; | ||
|
||
import io.airbyte.db.DataTypeUtils; | ||
import io.debezium.spi.converter.RelationalColumn; | ||
import java.sql.Timestamp; | ||
import java.time.Duration; | ||
import java.time.LocalDate; | ||
import java.time.LocalDateTime; | ||
import java.time.format.DateTimeParseException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class DebeziumConverterUtils { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumConverterUtils.class); | ||
|
||
public static String convertDate(Object x) { | ||
/** | ||
* While building this custom converter we were not sure what type debezium could return cause there | ||
* is no mention of it in the documentation. Secondly if you take a look at | ||
* {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter#converterFor(io.debezium.spi.converter.RelationalColumn, io.debezium.spi.converter.CustomConverter.ConverterRegistration)} | ||
* method, even it is handling multiple data types but its not clear under what circumstances which | ||
* data type would be returned. I just went ahead and handled the data types that made sense. | ||
* Secondly, we use LocalDateTime to handle this cause it represents DATETIME datatype in JAVA | ||
*/ | ||
if (x instanceof LocalDateTime) { | ||
return DataTypeUtils.toISO8601String((LocalDateTime) x); | ||
} else if (x instanceof LocalDate) { | ||
return DataTypeUtils.toISO8601String((LocalDate) x); | ||
} else if (x instanceof Duration) { | ||
return DataTypeUtils.toISO8601String((Duration) x); | ||
} else if (x instanceof Timestamp) { | ||
return DataTypeUtils.toISO8601String(((Timestamp) x).toLocalDateTime()); | ||
} else if (x instanceof Number) { | ||
return DataTypeUtils.toISO8601String(new Timestamp(((Number) x).longValue()).toLocalDateTime()); | ||
} else if (x instanceof String) { | ||
try { | ||
return LocalDateTime.parse((String) x).toString(); | ||
} catch (final DateTimeParseException e) { | ||
LOGGER.warn("Cannot convert value '{}' to LocalDateTime type", x); | ||
return x.toString(); | ||
} | ||
} | ||
LOGGER.warn("Uncovered date class type '{}'. Use default converter", x.getClass().getName()); | ||
return x.toString(); | ||
} | ||
|
||
public static Object getDefaultValue(RelationalColumn field) { | ||
if (field.isOptional()) { | ||
return null; | ||
} else if (field.hasDefaultValue()) { | ||
return field.defaultValue(); | ||
} | ||
return null; | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
132 changes: 132 additions & 0 deletions
132
.../debezium/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.debezium.internals; | ||
|
||
import io.debezium.spi.converter.CustomConverter; | ||
import io.debezium.spi.converter.RelationalColumn; | ||
import java.math.BigDecimal; | ||
import java.util.Arrays; | ||
import java.util.Properties; | ||
import org.apache.kafka.connect.data.SchemaBuilder; | ||
import org.postgresql.util.PGInterval; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class PostgresConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class); | ||
|
||
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"}; | ||
private final String[] BIT_TYPES = {"BIT", "VARBIT"}; | ||
private final String[] MONEY_ITEM_TYPE = {"MONEY"}; | ||
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"}; | ||
private final String[] TEXT_TYPES = {"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR"}; | ||
|
||
@Override | ||
public void configure(Properties props) {} | ||
|
||
@Override | ||
public void converterFor(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) { | ||
if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { | ||
registerDate(field, registration); | ||
} else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName())) | ||
|| Arrays.stream(GEOMETRICS_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName())) | ||
|| Arrays.stream(BIT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { | ||
registerText(field, registration); | ||
} else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { | ||
registerMoney(field, registration); | ||
} | ||
} | ||
|
||
private void registerText(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) { | ||
registration.register(SchemaBuilder.string(), x -> { | ||
if (x == null) { | ||
return DebeziumConverterUtils.getDefaultValue(field); | ||
} | ||
|
||
if (x instanceof byte[]) { | ||
return new String((byte[]) x); | ||
} else { | ||
return x.toString(); | ||
} | ||
}); | ||
} | ||
|
||
private void registerDate(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) { | ||
registration.register(SchemaBuilder.string(), x -> { | ||
if (x == null) { | ||
return DebeziumConverterUtils.getDefaultValue(field); | ||
} else if (x instanceof PGInterval) { | ||
return convertInterval((PGInterval) x); | ||
} else { | ||
return DebeziumConverterUtils.convertDate(x); | ||
} | ||
}); | ||
} | ||
|
||
private String convertInterval(PGInterval pgInterval) { | ||
StringBuilder resultInterval = new StringBuilder(); | ||
formatDateUnit(resultInterval, pgInterval.getYears(), " year "); | ||
formatDateUnit(resultInterval, pgInterval.getMonths(), " mons "); | ||
formatDateUnit(resultInterval, pgInterval.getDays(), " days "); | ||
|
||
formatTimeValues(resultInterval, pgInterval); | ||
return resultInterval.toString(); | ||
} | ||
|
||
private void registerMoney(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) { | ||
registration.register(SchemaBuilder.string(), x -> { | ||
if (x == null) { | ||
return DebeziumConverterUtils.getDefaultValue(field); | ||
} else if (x instanceof Double) { | ||
BigDecimal result = BigDecimal.valueOf((Double) x); | ||
if (result.compareTo(new BigDecimal("999999999999999")) == 1 | ||
|| result.compareTo(new BigDecimal("-999999999999999")) == -1) { | ||
return null; | ||
} | ||
return result.toString(); | ||
} else { | ||
return x.toString(); | ||
} | ||
}); | ||
} | ||
|
||
private void formatDateUnit(StringBuilder resultInterval, int dateUnit, String s) { | ||
if (dateUnit != 0) { | ||
resultInterval | ||
.append(dateUnit) | ||
.append(s); | ||
} | ||
} | ||
|
||
private void formatTimeValues(StringBuilder resultInterval, PGInterval pgInterval) { | ||
if (isNegativeTime(pgInterval)) { | ||
resultInterval.append("-"); | ||
} | ||
// TODO check if value more or less than Integer.MIN_VALUE Integer.MAX_VALUE, | ||
int hours = Math.abs(pgInterval.getHours()); | ||
int minutes = Math.abs(pgInterval.getMinutes()); | ||
int seconds = Math.abs(pgInterval.getWholeSeconds()); | ||
resultInterval.append(addFirstDigit(hours)); | ||
resultInterval.append(hours); | ||
resultInterval.append(":"); | ||
resultInterval.append(addFirstDigit(minutes)); | ||
resultInterval.append(minutes); | ||
resultInterval.append(":"); | ||
resultInterval.append(addFirstDigit(seconds)); | ||
resultInterval.append(seconds); | ||
} | ||
|
||
private String addFirstDigit(int hours) { | ||
return hours <= 9 ? "0" : ""; | ||
} | ||
|
||
private boolean isNegativeTime(PGInterval pgInterval) { | ||
return pgInterval.getHours() < 0 | ||
|| pgInterval.getMinutes() < 0 | ||
|| pgInterval.getWholeSeconds() < 0; | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.