Skip to content

Commit d04a01a

Browse files
VitaliiMaltsevetsybaevoctavia-squidington-iii
authored
Source Postgres: Handle Arrays data types (#16990)
* Source Postgres: Handle Arrays data types * bump version * updated items mapping * updated mapping of common types * add datatype tests * removed redundant variables * Fixed getSpec schema test * removed redundant variables * refactoring * updated json schema mappings * updated debezium converters * removed unused logging * refactoring * refactoring and added comments * fixed checkstyle * refactoring * bump version * auto-bump connector version Co-authored-by: ievgeniit <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 563804b commit d04a01a

File tree

14 files changed

+870
-94
lines changed

14 files changed

+870
-94
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1221,7 +1221,7 @@
12211221
- name: Postgres
12221222
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12231223
dockerRepository: airbyte/source-postgres
1224-
dockerImageTag: 1.0.26
1224+
dockerImageTag: 1.0.27
12251225
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
12261226
icon: postgresql.svg
12271227
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11150,7 +11150,7 @@
1115011150
supportsNormalization: false
1115111151
supportsDBT: false
1115211152
supported_destination_sync_modes: []
11153-
- dockerImage: "airbyte/source-postgres:1.0.26"
11153+
- dockerImage: "airbyte/source-postgres:1.0.27"
1115411154
spec:
1115511155
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
1115611156
connectionSpecification:
@@ -11387,7 +11387,7 @@
1138711387
method:
1138811388
type: "string"
1138911389
const: "CDC"
11390-
order: 0
11390+
order: 1
1139111391
plugin:
1139211392
type: "string"
1139311393
title: "Plugin"
@@ -11400,21 +11400,21 @@
1140011400
- "pgoutput"
1140111401
- "wal2json"
1140211402
const: "pgoutput"
11403-
order: 1
11403+
order: 2
1140411404
replication_slot:
1140511405
type: "string"
1140611406
title: "Replication Slot"
1140711407
description: "A plugin logical replication slot. Read about <a href=\"\
1140811408
https://docs.airbyte.com/integrations/sources/postgres#step-3-create-replication-slot\"\
1140911409
>replication slots</a>."
11410-
order: 2
11410+
order: 3
1141111411
publication:
1141211412
type: "string"
1141311413
title: "Publication"
1141411414
description: "A Postgres publication used for consuming changes. Read\
1141511415
\ about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#step-4-create-publications-and-replication-identities-for-tables\"\
1141611416
>publications and replication identities</a>."
11417-
order: 3
11417+
order: 4
1141811418
initial_waiting_seconds:
1141911419
type: "integer"
1142011420
title: "Initial Waiting Time in Seconds (Advanced)"
@@ -11424,7 +11424,7 @@
1142411424
\ <a href=\"https://docs.airbyte.com/integrations/sources/postgres#step-5-optional-set-up-initial-waiting-time\"\
1142511425
>initial waiting time</a>."
1142611426
default: 300
11427-
order: 4
11427+
order: 5
1142811428
min: 120
1142911429
max: 1200
1143011430
tunnel_method:

airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java

Lines changed: 118 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,38 @@
44

55
package io.airbyte.integrations.debezium.internals;
66

7+
import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER;
8+
import static io.airbyte.db.jdbc.DateTimeConverter.convertToDate;
9+
import static io.airbyte.db.jdbc.DateTimeConverter.convertToTime;
10+
import static io.airbyte.db.jdbc.DateTimeConverter.convertToTimestamp;
11+
import static io.airbyte.db.jdbc.DateTimeConverter.convertToTimestampWithTimezone;
12+
import static org.apache.kafka.connect.data.Schema.OPTIONAL_BOOLEAN_SCHEMA;
13+
import static org.apache.kafka.connect.data.Schema.OPTIONAL_FLOAT64_SCHEMA;
14+
import static org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA;
15+
716
import io.airbyte.db.jdbc.DateTimeConverter;
817
import io.debezium.spi.converter.CustomConverter;
918
import io.debezium.spi.converter.RelationalColumn;
1019
import io.debezium.time.Conversions;
1120
import java.math.BigDecimal;
1221
import java.nio.charset.StandardCharsets;
22+
import java.sql.SQLException;
1323
import java.time.LocalDate;
1424
import java.time.LocalTime;
25+
import java.time.OffsetTime;
26+
import java.time.format.DateTimeFormatter;
27+
import java.util.ArrayList;
1528
import java.util.Arrays;
29+
import java.util.Base64;
30+
import java.util.List;
1631
import java.util.Locale;
32+
import java.util.Objects;
1733
import java.util.Properties;
1834
import java.util.concurrent.TimeUnit;
35+
import java.util.stream.Collectors;
1936
import org.apache.commons.codec.binary.Hex;
2037
import org.apache.kafka.connect.data.SchemaBuilder;
38+
import org.postgresql.jdbc.PgArray;
2139
import org.postgresql.util.PGInterval;
2240
import org.slf4j.Logger;
2341
import org.slf4j.LoggerFactory;
@@ -33,6 +51,7 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio
3351
private final String[] TEXT_TYPES =
3452
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY", "PG_LSN"};
3553
private final String[] NUMERIC_TYPES = {"NUMERIC", "DECIMAL"};
54+
private final String[] ARRAY_TYPES = {"_NAME", "_NUMERIC", "_BYTEA", "_MONEY", "_BIT", "_DATE", "_TIME", "_TIMETZ", "_TIMESTAMP", "_TIMESTAMPTZ"};
3655
private final String BYTEA_TYPE = "BYTEA";
3756

3857
@Override
@@ -52,9 +71,22 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
5271
registerBytea(field, registration);
5372
} else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
5473
registerNumber(field, registration);
74+
} else if (Arrays.stream(ARRAY_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
75+
registerArray(field, registration);
5576
}
5677
}
5778

79+
private void registerArray(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
80+
final String fieldType = field.typeName().toUpperCase();
81+
final SchemaBuilder arraySchema = switch (fieldType) {
82+
case "_NUMERIC", "_MONEY" -> SchemaBuilder.array(OPTIONAL_FLOAT64_SCHEMA);
83+
case "_NAME", "_DATE", "_TIME", "_TIMESTAMP", "_TIMESTAMPTZ", "_TIMETZ", "_BYTEA" -> SchemaBuilder.array(OPTIONAL_STRING_SCHEMA);
84+
case "_BIT" -> SchemaBuilder.array(OPTIONAL_BOOLEAN_SCHEMA);
85+
default -> SchemaBuilder.array(OPTIONAL_STRING_SCHEMA);
86+
};
87+
registration.register(arraySchema, x -> convertArray(x, field));
88+
}
89+
5890
private void registerNumber(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
5991
registration.register(SchemaBuilder.string().optional(), x -> {
6092
if (x == null) {
@@ -106,6 +138,72 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat
106138
});
107139
}
108140

141+
private Object convertArray(Object x, RelationalColumn field) {
142+
final String fieldType = field.typeName().toUpperCase();
143+
Object[] values = new Object[0];
144+
try {
145+
values = (Object[]) ((PgArray) x).getArray();
146+
} catch (SQLException e) {
147+
LOGGER.error("Failed to convert PgArray:" + e);
148+
}
149+
switch (fieldType) {
150+
// debezium currently cannot handle MONEY[] datatype and it's not implemented
151+
case "_MONEY":
152+
// PgArray.getArray() trying to convert to Double instead of PgMoney
153+
// due to incorrect type mapping in the postgres driver
154+
// https://github.com/pgjdbc/pgjdbc/blob/d5ed52ef391670e83ae5265af2f7301c615ce4ca/pgjdbc/src/main/java/org/postgresql/jdbc/TypeInfoCache.java#L88
155+
// and throws an exception, so a custom implementation of converting to String is used to get the
156+
// value as is
157+
final String nativeMoneyValue = ((PgArray) x).toString();
158+
final String substringM = Objects.requireNonNull(nativeMoneyValue).substring(1, nativeMoneyValue.length() - 1);
159+
final char currency = substringM.charAt(0);
160+
final String regex = "\\" + currency;
161+
final List<String> myListM = new ArrayList<>(Arrays.asList(substringM.split(regex)));
162+
return myListM.stream()
163+
// since the separator is the currency sign, all extra characters must be removed except for numbers
164+
// and dots
165+
.map(val -> val.replaceAll("[^\\d.]", ""))
166+
.filter(money -> !money.isEmpty())
167+
.map(Double::valueOf)
168+
.collect(Collectors.toList());
169+
case "_NUMERIC":
170+
return Arrays.stream(values).map(value -> value == null ? null : Double.valueOf(value.toString())).collect(Collectors.toList());
171+
case "_TIME":
172+
return Arrays.stream(values).map(value -> value == null ? null : convertToTime(value)).collect(Collectors.toList());
173+
case "_DATE":
174+
return Arrays.stream(values).map(value -> value == null ? null : convertToDate(value)).collect(Collectors.toList());
175+
case "_TIMESTAMP":
176+
return Arrays.stream(values).map(value -> value == null ? null : convertToTimestamp(value)).collect(Collectors.toList());
177+
case "_TIMESTAMPTZ":
178+
return Arrays.stream(values).map(value -> value == null ? null : convertToTimestampWithTimezone(value)).collect(Collectors.toList());
179+
case "_TIMETZ":
180+
181+
final List<String> timetzArr = new ArrayList<>();
182+
final String nativeValue = ((PgArray) x).toString();
183+
final String substring = Objects.requireNonNull(nativeValue).substring(1, nativeValue.length() - 1);
184+
final List<String> times = new ArrayList<>(Arrays.asList(substring.split(",")));
185+
final DateTimeFormatter format = DateTimeFormatter.ofPattern("HH:mm:ss[.SSSSSS]X");
186+
187+
times.forEach(s -> {
188+
if (s.equalsIgnoreCase("NULL")) {
189+
timetzArr.add(null);
190+
} else {
191+
final OffsetTime parsed = OffsetTime.parse(s, format);
192+
timetzArr.add(parsed.format(TIMETZ_FORMATTER));
193+
}
194+
});
195+
return timetzArr;
196+
case "_BYTEA":
197+
return Arrays.stream(values).map(value -> Base64.getEncoder().encodeToString((byte[]) value)).collect(Collectors.toList());
198+
case "_BIT":
199+
return Arrays.stream(values).map(value -> (Boolean) value).collect(Collectors.toList());
200+
case "_NAME":
201+
return Arrays.stream(values).map(value -> (String) value).collect(Collectors.toList());
202+
default:
203+
return new ArrayList<>();
204+
}
205+
}
206+
109207
private int getTimePrecision(final RelationalColumn field) {
110208
return field.scale().orElse(-1);
111209
}
@@ -127,30 +225,20 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat
127225
case "TIMESTAMP":
128226
if (x instanceof final Long l) {
129227
if (getTimePrecision(field) <= 3) {
130-
return DateTimeConverter.convertToTimestamp(Conversions.toInstantFromMillis(l));
228+
return convertToTimestamp(Conversions.toInstantFromMillis(l));
131229
}
132230
if (getTimePrecision(field) <= 6) {
133-
return DateTimeConverter.convertToTimestamp(Conversions.toInstantFromMicros(l));
231+
return convertToTimestamp(Conversions.toInstantFromMicros(l));
134232
}
135233
}
136-
return DateTimeConverter.convertToTimestamp(x);
234+
return convertToTimestamp(x);
137235
case "DATE":
138236
if (x instanceof Integer) {
139-
return DateTimeConverter.convertToDate(LocalDate.ofEpochDay((Integer) x));
237+
return convertToDate(LocalDate.ofEpochDay((Integer) x));
140238
}
141-
return DateTimeConverter.convertToDate(x);
239+
return convertToDate(x);
142240
case "TIME":
143-
if (x instanceof Long) {
144-
if (getTimePrecision(field) <= 3) {
145-
long l = Math.multiplyExact((Long) x, TimeUnit.MILLISECONDS.toNanos(1));
146-
return DateTimeConverter.convertToTime(LocalTime.ofNanoOfDay(l));
147-
}
148-
if (getTimePrecision(field) <= 6) {
149-
long l = Math.multiplyExact((Long) x, TimeUnit.MICROSECONDS.toNanos(1));
150-
return DateTimeConverter.convertToTime(LocalTime.ofNanoOfDay(l));
151-
}
152-
}
153-
return DateTimeConverter.convertToTime(x);
241+
return resolveTime(field, x);
154242
case "INTERVAL":
155243
return convertInterval((PGInterval) x);
156244
default:
@@ -159,6 +247,20 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat
159247
});
160248
}
161249

250+
private String resolveTime(RelationalColumn field, Object x) {
251+
if (x instanceof Long) {
252+
if (getTimePrecision(field) <= 3) {
253+
long l = Math.multiplyExact((Long) x, TimeUnit.MILLISECONDS.toNanos(1));
254+
return DateTimeConverter.convertToTime(LocalTime.ofNanoOfDay(l));
255+
}
256+
if (getTimePrecision(field) <= 6) {
257+
long l = Math.multiplyExact((Long) x, TimeUnit.MICROSECONDS.toNanos(1));
258+
return DateTimeConverter.convertToTime(LocalTime.ofNanoOfDay(l));
259+
}
260+
}
261+
return DateTimeConverter.convertToTime(x);
262+
}
263+
162264
private String convertInterval(final PGInterval pgInterval) {
163265
final StringBuilder resultInterval = new StringBuilder();
164266
formatDateUnit(resultInterval, pgInterval.getYears(), " year ");

airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void testDataTypes() throws Exception {
117117
testDataHolders.forEach(testDataHolder -> {
118118
if (testCatalog()) {
119119
final AirbyteStream airbyteStream = streams.get(testDataHolder.getNameWithTestPrefix());
120-
final Map<String, String> jsonSchemaTypeMap = (Map<String, String>) Jsons.deserialize(
120+
final Map<String, Object> jsonSchemaTypeMap = (Map<String, Object>) Jsons.deserialize(
121121
airbyteStream.getJsonSchema().get("properties").get(getTestColumnName()).toString(), Map.class);
122122
assertEquals(testDataHolder.getAirbyteType().getJsonSchemaTypeMap(), jsonSchemaTypeMap,
123123
"Expected column type for " + testDataHolder.getNameWithTestPrefix());

airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=1.0.26
19+
LABEL io.airbyte.version=1.0.27
2020
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt

airbyte-integrations/connectors/source-postgres/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=1.0.26
19+
LABEL io.airbyte.version=1.0.27
2020
LABEL io.airbyte.name=airbyte/source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import java.net.URISyntaxException;
5959
import java.nio.file.Path;
6060
import java.sql.Connection;
61-
import java.sql.JDBCType;
6261
import java.sql.PreparedStatement;
6362
import java.sql.SQLException;
6463
import java.time.Duration;
@@ -74,7 +73,7 @@
7473
import org.slf4j.Logger;
7574
import org.slf4j.LoggerFactory;
7675

77-
public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Source {
76+
public class PostgresSource extends AbstractJdbcSource<PostgresType> implements Source {
7877

7978
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class);
8079
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000;
@@ -207,8 +206,8 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {
207206
}
208207

209208
@Override
210-
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabase database) throws Exception {
211-
final List<TableInfo<CommonField<JDBCType>>> rawTables = discoverRawTables(database);
209+
public List<TableInfo<CommonField<PostgresType>>> discoverInternal(final JdbcDatabase database) throws Exception {
210+
final List<TableInfo<CommonField<PostgresType>>> rawTables = discoverRawTables(database);
212211
final Set<AirbyteStreamNameNamespacePair> publicizedTablesInCdc = PostgresCdcCatalogHelper.getPublicizedTables(database);
213212

214213
if (publicizedTablesInCdc.isEmpty()) {
@@ -220,15 +219,15 @@ public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabas
220219
.collect(toList());
221220
}
222221

223-
public List<TableInfo<CommonField<JDBCType>>> discoverRawTables(final JdbcDatabase database) throws Exception {
222+
public List<TableInfo<CommonField<PostgresType>>> discoverRawTables(final JdbcDatabase database) throws Exception {
224223
if (schemas != null && !schemas.isEmpty()) {
225224
// process explicitly selected (from UI) schemas
226-
final List<TableInfo<CommonField<JDBCType>>> internals = new ArrayList<>();
225+
final List<TableInfo<CommonField<PostgresType>>> internals = new ArrayList<>();
227226
for (final String schema : schemas) {
228227
LOGGER.info("Checking schema: {}", schema);
229-
final List<TableInfo<CommonField<JDBCType>>> tables = super.discoverInternal(database, schema);
228+
final List<TableInfo<CommonField<PostgresType>>> tables = super.discoverInternal(database, schema);
230229
internals.addAll(tables);
231-
for (final TableInfo<CommonField<JDBCType>> table : tables) {
230+
for (final TableInfo<CommonField<PostgresType>> table : tables) {
232231
LOGGER.info("Found table: {}.{}", table.getNameSpace(), table.getName());
233232
}
234233
}
@@ -318,7 +317,7 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
318317
@Override
319318
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final JdbcDatabase database,
320319
final ConfiguredAirbyteCatalog catalog,
321-
final Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable,
320+
final Map<String, TableInfo<CommonField<PostgresType>>> tableNameToTable,
322321
final StateManager stateManager,
323322
final Instant emittedAt) {
324323
final JsonNode sourceConfig = database.getSourceConfig();

0 commit comments

Comments
 (0)