Skip to content

Commit 9b9ec1c

Browse files
authored
[source-postgres] : Remove legacy bad values handling code (#37445)
1 parent 01381ae commit 9b9ec1c

File tree

8 files changed

+267
-240
lines changed

8 files changed

+267
-240
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DataTypeUtils.kt

+24
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,30 @@ object DataTypeUtils {
7070
}
7171
}
7272

73+
@JvmStatic
74+
fun <T> throwExceptionIfInvalid(valueProducer: DataTypeSupplier<T>): T? {
75+
return throwExceptionIfInvalid(valueProducer, Function { _: T? -> true })
76+
}
77+
78+
@JvmStatic
79+
fun <T> throwExceptionIfInvalid(
80+
valueProducer: DataTypeSupplier<T>,
81+
isValidFn: Function<T?, Boolean>
82+
): T? {
83+
// Some edge case values (e.g: Infinity, NaN) have no java or JSON equivalent, and will
84+
// throw an
85+
// exception when parsed. We want to parse those
86+
// values as null.
87+
// This method reduces error handling boilerplate.
88+
try {
89+
val value = valueProducer.apply()
90+
return if (isValidFn.apply(value)) value
91+
else throw SQLException("Given value is not valid.")
92+
} catch (e: SQLException) {
93+
return null
94+
}
95+
}
96+
7397
@JvmStatic
7498
fun toISO8601StringWithMicroseconds(instant: Instant): String {
7599
val dateWithMilliseconds = dateFormatMillisPattern.format(Date.from(instant))

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
4646
// convert to java types that will convert into reasonable json.
4747
copyToJsonField(queryContext, i, jsonNode)
4848
} catch (e: java.lang.Exception) {
49+
jsonNode.putNull(columnName)
4950
LOGGER.info(
5051
"Failed to serialize column: {}, of type {}, with error {}",
5152
columnName,
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.30.6
1+
version=0.30.7

airbyte-integrations/connectors/source-postgres/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ java {
1212
}
1313

1414
airbyteJavaConnector {
15-
cdkVersionRequired = '0.29.13'
15+
cdkVersionRequired = '0.30.7'
1616
features = ['db-sources', 'datastore-postgres']
1717
useLocalCdk = false
1818
}

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.3.26
12+
dockerImageTag: 3.3.27
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ private void putBigDecimalArray(final ObjectNode node, final String columnName,
347347
final ArrayNode arrayNode = Jsons.arrayNode();
348348
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
349349
while (arrayResultSet.next()) {
350-
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> arrayResultSet.getBigDecimal(2));
350+
final BigDecimal bigDecimal = DataTypeUtils.throwExceptionIfInvalid(() -> arrayResultSet.getBigDecimal(2));
351351
if (bigDecimal != null) {
352352
arrayNode.add(bigDecimal);
353353
} else {
@@ -361,7 +361,7 @@ private void putBigIntArray(final ObjectNode node, final String columnName, fina
361361
final ArrayNode arrayNode = Jsons.arrayNode();
362362
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
363363
while (arrayResultSet.next()) {
364-
final long value = DataTypeUtils.returnNullIfInvalid(() -> arrayResultSet.getLong(2));
364+
final long value = DataTypeUtils.throwExceptionIfInvalid(() -> arrayResultSet.getLong(2));
365365
arrayNode.add(value);
366366
}
367367
node.set(columnName, arrayNode);
@@ -371,7 +371,7 @@ private void putDoubleArray(final ObjectNode node, final String columnName, fina
371371
final ArrayNode arrayNode = Jsons.arrayNode();
372372
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
373373
while (arrayResultSet.next()) {
374-
arrayNode.add(DataTypeUtils.returnNullIfInvalid(() -> arrayResultSet.getDouble(2), Double::isFinite));
374+
arrayNode.add(DataTypeUtils.throwExceptionIfInvalid(() -> arrayResultSet.getDouble(2), Double::isFinite));
375375
}
376376
node.set(columnName, arrayNode);
377377
}
@@ -381,7 +381,8 @@ private void putMoneyArray(final ObjectNode node, final String columnName, final
381381
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
382382
while (arrayResultSet.next()) {
383383
final String moneyValue = parseMoneyValue(arrayResultSet.getString(2));
384-
arrayNode.add(DataTypeUtils.returnNullIfInvalid(() -> DataTypeUtils.returnNullIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite)));
384+
arrayNode.add(
385+
DataTypeUtils.throwExceptionIfInvalid(() -> DataTypeUtils.throwExceptionIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite)));
385386
}
386387
node.set(columnName, arrayNode);
387388
}
@@ -612,7 +613,7 @@ protected <T extends PGobject> void putObject(final ObjectNode node,
612613

613614
@Override
614615
protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
615-
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
616+
final BigDecimal bigDecimal = DataTypeUtils.throwExceptionIfInvalid(() -> resultSet.getBigDecimal(index));
616617
if (bigDecimal != null) {
617618
node.put(columnName, bigDecimal);
618619
} else {
@@ -633,7 +634,7 @@ protected void putDouble(final ObjectNode node, final String columnName, final R
633634

634635
private void putMoney(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
635636
final String moneyValue = parseMoneyValue(resultSet.getString(index));
636-
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite));
637+
node.put(columnName, DataTypeUtils.throwExceptionIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite));
637638
}
638639

639640
private void putHstoreAsJson(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ protected List<AirbyteMessage> runRead(final ConfiguredAirbyteCatalog configured
3636
@Override
3737
protected void postSetup() throws Exception {
3838
final Database database = setupDatabase();
39-
for (final TestDataHolder test : getTestDataHolders()) {
39+
for (final TestDataHolder test : testDataHolders) {
4040
database.query(ctx -> {
4141
ctx.fetch(test.getCreateSqlQuery());
4242
return null;
@@ -56,7 +56,7 @@ protected void postSetup() throws Exception {
5656
if (stateAfterFirstSync == null) {
5757
throw new RuntimeException("stateAfterFirstSync should not be null");
5858
}
59-
for (final TestDataHolder test : getTestDataHolders()) {
59+
for (final TestDataHolder test : testDataHolders) {
6060
database.query(ctx -> {
6161
test.getInsertSqlQueries().forEach(ctx::fetch);
6262
return null;

0 commit comments

Comments
 (0)