Skip to content

[source-postgres] : Remove legacy bad values handling code #37445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ object DataTypeUtils {
}
}

@JvmStatic
fun <T> throwExceptionIfInvalid(valueProducer: DataTypeSupplier<T>): T? {
return throwExceptionIfInvalid(valueProducer, Function { _: T? -> true })
}

@JvmStatic
fun <T> throwExceptionIfInvalid(
valueProducer: DataTypeSupplier<T>,
isValidFn: Function<T?, Boolean>
): T? {
// Some edge case values (e.g: Infinity, NaN) have no java or JSON equivalent, and will
// throw an
// exception when parsed. We want to parse those
// values as null.
// This method reduces error handling boilerplate.
try {
val value = valueProducer.apply()
return if (isValidFn.apply(value)) value
else throw SQLException("Given value is not valid.")
} catch (e: SQLException) {
return null
}
}

@JvmStatic
fun toISO8601StringWithMicroseconds(instant: Instant): String {
val dateWithMilliseconds = dateFormatMillisPattern.format(Date.from(instant))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode)
} catch (e: java.lang.Exception) {
jsonNode.putNull(columnName)
LOGGER.info(
"Failed to serialize column: {}, of type {}, with error {}",
columnName,
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.30.6
version=0.30.7
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ java {
airbyteJavaConnector {
cdkVersionRequired = '0.29.13'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
useLocalCdk = true
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.26
dockerImageTag: 3.3.27
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private void putBigDecimalArray(final ObjectNode node, final String columnName,
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> arrayResultSet.getBigDecimal(2));
final BigDecimal bigDecimal = DataTypeUtils.throwExceptionIfInvalid(() -> arrayResultSet.getBigDecimal(2));
if (bigDecimal != null) {
arrayNode.add(bigDecimal);
} else {
Expand All @@ -361,7 +361,7 @@ private void putBigIntArray(final ObjectNode node, final String columnName, fina
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
final long value = DataTypeUtils.returnNullIfInvalid(() -> arrayResultSet.getLong(2));
final long value = DataTypeUtils.throwExceptionIfInvalid(() -> arrayResultSet.getLong(2));
arrayNode.add(value);
}
node.set(columnName, arrayNode);
Expand All @@ -371,7 +371,7 @@ private void putDoubleArray(final ObjectNode node, final String columnName, fina
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
arrayNode.add(DataTypeUtils.returnNullIfInvalid(() -> arrayResultSet.getDouble(2), Double::isFinite));
arrayNode.add(DataTypeUtils.throwExceptionIfInvalid(() -> arrayResultSet.getDouble(2), Double::isFinite));
}
node.set(columnName, arrayNode);
}
Expand All @@ -381,7 +381,8 @@ private void putMoneyArray(final ObjectNode node, final String columnName, final
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
final String moneyValue = parseMoneyValue(arrayResultSet.getString(2));
arrayNode.add(DataTypeUtils.returnNullIfInvalid(() -> DataTypeUtils.returnNullIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite)));
arrayNode.add(
DataTypeUtils.throwExceptionIfInvalid(() -> DataTypeUtils.throwExceptionIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite)));
}
node.set(columnName, arrayNode);
}
Expand Down Expand Up @@ -612,7 +613,7 @@ protected <T extends PGobject> void putObject(final ObjectNode node,

@Override
protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
final BigDecimal bigDecimal = DataTypeUtils.throwExceptionIfInvalid(() -> resultSet.getBigDecimal(index));
if (bigDecimal != null) {
node.put(columnName, bigDecimal);
} else {
Expand All @@ -633,7 +634,7 @@ protected void putDouble(final ObjectNode node, final String columnName, final R

private void putMoney(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final String moneyValue = parseMoneyValue(resultSet.getString(index));
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite));
node.put(columnName, DataTypeUtils.throwExceptionIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite));
}

private void putHstoreAsJson(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ protected List<AirbyteMessage> runRead(final ConfiguredAirbyteCatalog configured
@Override
protected void postSetup() throws Exception {
final Database database = setupDatabase();
for (final TestDataHolder test : getTestDataHolders()) {
for (final TestDataHolder test : testDataHolders) {
database.query(ctx -> {
ctx.fetch(test.getCreateSqlQuery());
return null;
Expand All @@ -56,7 +56,7 @@ protected void postSetup() throws Exception {
if (stateAfterFirstSync == null) {
throw new RuntimeException("stateAfterFirstSync should not be null");
}
for (final TestDataHolder test : getTestDataHolders()) {
for (final TestDataHolder test : testDataHolders) {
database.query(ctx -> {
test.getInsertSqlQueries().forEach(ctx::fetch);
return null;
Expand Down
Loading
Loading