Skip to content

Commit 90884d0

Browse files
Postgres Source : Support JSONB datatype (#21695)
* Postgres Source Jsonb updated schema with oneOf definition * updated json schema definition * updated json schema definition * updated tests * refactoring * fixed checkstyle * fixed checkstyle * updated values mapping * updated test cases and refactoring * updated test cases * refactoring * added jsonb[] support * refactoring * updated json schema * reverted to schema with oneOf * updated airbyte-protocol version * deleted protocol files * bump version * auto-bump connector version * manual bump of postgres-source version * Automated Change --------- Co-authored-by: Octavia Squidington III <[email protected]> Co-authored-by: VitaliiMaltsev <[email protected]>
1 parent b7808a3 commit 90884d0

File tree

17 files changed

+149
-19
lines changed

17 files changed

+149
-19
lines changed

airbyte-config/config-models/src/test/java/io/airbyte/config/DataTypeEnumTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class DataTypeEnumTest {
1818
@Test
1919
void testConversionFromJsonSchemaPrimitiveToDataType() {
2020
assertEquals(5, DataType.class.getEnumConstants().length);
21-
assertEquals(16, JsonSchemaPrimitive.class.getEnumConstants().length);
21+
assertEquals(17, JsonSchemaPrimitive.class.getEnumConstants().length);
2222

2323
assertEquals(DataType.STRING, DataType.fromValue(JsonSchemaPrimitive.STRING.toString().toLowerCase()));
2424
assertEquals(DataType.NUMBER, DataType.fromValue(JsonSchemaPrimitive.NUMBER.toString().toLowerCase()));

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
- name: AlloyDB for PostgreSQL
4646
sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
4747
dockerRepository: airbyte/source-alloydb
48-
dockerImageTag: 1.0.48
48+
dockerImageTag: 1.0.49
4949
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
5050
icon: alloydb.svg
5151
sourceType: database
@@ -1428,7 +1428,7 @@
14281428
- name: Postgres
14291429
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
14301430
dockerRepository: airbyte/source-postgres
1431-
dockerImageTag: 1.0.49
1431+
dockerImageTag: 1.0.50
14321432
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
14331433
icon: postgresql.svg
14341434
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@
370370
supportsNormalization: false
371371
supportsDBT: false
372372
supported_destination_sync_modes: []
373-
- dockerImage: "airbyte/source-alloydb:1.0.48"
373+
- dockerImage: "airbyte/source-alloydb:1.0.49"
374374
spec:
375375
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
376376
connectionSpecification:
@@ -11623,7 +11623,7 @@
1162311623
supportsNormalization: false
1162411624
supportsDBT: false
1162511625
supported_destination_sync_modes: []
11626-
- dockerImage: "airbyte/source-postgres:1.0.49"
11626+
- dockerImage: "airbyte/source-postgres:1.0.50"
1162711627
spec:
1162811628
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
1162911629
connectionSpecification:

airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java

+19
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import static org.apache.kafka.connect.data.Schema.OPTIONAL_FLOAT64_SCHEMA;
1414
import static org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA;
1515

16+
import com.fasterxml.jackson.core.JsonProcessingException;
17+
import com.fasterxml.jackson.databind.ObjectMapper;
1618
import io.airbyte.db.jdbc.DateTimeConverter;
1719
import io.debezium.spi.converter.CustomConverter;
1820
import io.debezium.spi.converter.RelationalColumn;
@@ -53,6 +55,8 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio
5355
private final String[] NUMERIC_TYPES = {"NUMERIC", "DECIMAL"};
5456
private final String[] ARRAY_TYPES = {"_NAME", "_NUMERIC", "_BYTEA", "_MONEY", "_BIT", "_DATE", "_TIME", "_TIMETZ", "_TIMESTAMP", "_TIMESTAMPTZ"};
5557
private final String BYTEA_TYPE = "BYTEA";
58+
private final String JSONB_TYPE = "JSONB";
59+
private final ObjectMapper objectMapper = new ObjectMapper();
5660

5761
@Override
5862
public void configure(final Properties props) {}
@@ -69,13 +73,28 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
6973
registerMoney(field, registration);
7074
} else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) {
7175
registerBytea(field, registration);
76+
} else if (JSONB_TYPE.equalsIgnoreCase(field.typeName())) {
77+
registerJsonb(field, registration);
7278
} else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
7379
registerNumber(field, registration);
7480
} else if (Arrays.stream(ARRAY_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
7581
registerArray(field, registration);
7682
}
7783
}
7884

85+
private void registerJsonb(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
86+
registration.register(SchemaBuilder.string().optional(), x -> {
87+
if (x == null) {
88+
return DebeziumConverterUtils.convertDefaultValue(field);
89+
}
90+
try {
91+
return objectMapper.readTree(x.toString()).toString();
92+
} catch (JsonProcessingException e) {
93+
throw new RuntimeException("Could not parse 'jsonb' value:" + e);
94+
}
95+
});
96+
}
97+
7998
private void registerArray(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
8099
final String fieldType = field.typeName().toUpperCase();
81100
final SchemaBuilder arraySchema = switch (fieldType) {

airbyte-integrations/connectors/source-alloydb-strict-encrypt/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb-strict-encrypt
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=1.0.48
19+
LABEL io.airbyte.version=1.0.49
2020
LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt

airbyte-integrations/connectors/source-alloydb/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=1.0.48
19+
LABEL io.airbyte.version=1.0.49
2020
LABEL io.airbyte.name=airbyte/source-alloydb

airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=1.0.49
19+
LABEL io.airbyte.version=1.0.50
2020
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt

airbyte-integrations/connectors/source-postgres/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=1.0.49
19+
LABEL io.airbyte.version=1.0.50
2020
LABEL io.airbyte.name=airbyte/source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java

+36-1
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
173173
case "path" -> putObject(json, columnName, resultSet, colIndex, PGpath.class);
174174
case "point" -> putObject(json, columnName, resultSet, colIndex, PGpoint.class);
175175
case "polygon" -> putObject(json, columnName, resultSet, colIndex, PGpolygon.class);
176+
case "jsonb" -> putJsonb(json, columnName, resultSet, colIndex);
176177
case "_varchar", "_char", "_bpchar", "_text", "_name" -> putArray(json, columnName, resultSet, colIndex);
177178
case "_int2", "_int4", "_int8", "_oid" -> putLongArray(json, columnName, resultSet, colIndex);
178179
case "_numeric", "_decimal" -> putBigDecimalArray(json, columnName, resultSet, colIndex);
@@ -186,6 +187,7 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
186187
case "_timestamp" -> putTimestampArray(json, columnName, resultSet, colIndex);
187188
case "_timetz" -> putTimeTzArray(json, columnName, resultSet, colIndex);
188189
case "_time" -> putTimeArray(json, columnName, resultSet, colIndex);
190+
case "_jsonb" -> putJsonbArray(json, columnName, resultSet, colIndex);
189191
default -> {
190192
switch (columnInfo.columnType) {
191193
case BOOLEAN -> json.put(columnName, value.equalsIgnoreCase("t"));
@@ -209,6 +211,33 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
209211
}
210212
}
211213

214+
private void putJsonbArray(ObjectNode node, String columnName, ResultSet resultSet, int colIndex) throws SQLException {
215+
final ArrayNode arrayNode = Jsons.arrayNode();
216+
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
217+
218+
while (arrayResultSet.next()) {
219+
final PGobject object = getObject(arrayResultSet, colIndex, PGobject.class);
220+
final JsonNode value;
221+
try {
222+
value = new ObjectMapper().readTree(object.getValue());
223+
} catch (JsonProcessingException e) {
224+
throw new RuntimeException("Could not parse 'jsonb' value:" + e);
225+
}
226+
arrayNode.add(value);
227+
}
228+
node.set(columnName, arrayNode);
229+
}
230+
231+
private void putJsonb(ObjectNode node, String columnName, ResultSet resultSet, int colIndex) throws SQLException {
232+
final PGobject object = getObject(resultSet, colIndex, PGobject.class);
233+
234+
try {
235+
node.put(columnName, new ObjectMapper().readTree(object.getValue()));
236+
} catch (JsonProcessingException e) {
237+
throw new RuntimeException("Could not parse 'jsonb' value:" + e);
238+
}
239+
}
240+
212241
private void putTimeArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int colIndex) throws SQLException {
213242
final ArrayNode arrayNode = Jsons.arrayNode();
214243
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
@@ -393,11 +422,13 @@ public PostgresType getDatabaseFieldType(final JsonNode field) {
393422
case "_time" -> PostgresType.TIME_ARRAY;
394423
case "_date" -> PostgresType.DATE_ARRAY;
395424
case "_bytea" -> PostgresType.BYTEA_ARRAY;
425+
case "_jsonb" -> PostgresType.JSONB_ARRAY;
396426
case "bool", "boolean" -> PostgresType.BOOLEAN;
397427
// BYTEA is variable length binary string with hex output format by default (e.g. "\x6b707a").
398428
// It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR.
399429
// https://www.postgresql.org/docs/14/datatype-binary.html
400430
case "bytea" -> PostgresType.VARCHAR;
431+
case "jsonb" -> PostgresType.JSONB;
401432
case TIMESTAMPTZ -> PostgresType.TIMESTAMP_WITH_TIMEZONE;
402433
case TIMETZ -> PostgresType.TIME_WITH_TIMEZONE;
403434
default -> PostgresType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt(), POSTGRES_TYPE_DICT);
@@ -496,7 +527,10 @@ public JsonSchemaType getAirbyteType(final PostgresType jdbcType) {
496527
case DATE_ARRAY -> JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
497528
.withItems(JsonSchemaType.STRING_DATE)
498529
.build();
499-
530+
case JSONB_ARRAY -> JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
531+
.withItems(JsonSchemaType.JSONB)
532+
.build();
533+
case JSONB -> JsonSchemaType.JSONB;
500534
case DATE -> JsonSchemaType.STRING_DATE;
501535
case TIME -> JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE;
502536
case TIME_WITH_TIMEZONE -> JsonSchemaType.STRING_TIME_WITH_TIMEZONE;
@@ -591,6 +625,7 @@ private ColumnInfo getColumnInfo(final int colIndex, final PgResultSetMetaData m
591625
}
592626

593627
private static class ColumnInfo {
628+
594629
public String columnTypeName;
595630
public PostgresType columnType;
596631

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresType.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ public enum PostgresType implements SQLType {
7070
OID_ARRAY(Types.ARRAY),
7171
FLOAT4_ARRAY(Types.ARRAY),
7272
FLOAT8_ARRAY(Types.ARRAY),
73-
BYTEA_ARRAY(Types.ARRAY);
73+
BYTEA_ARRAY(Types.ARRAY),
74+
JSONB_ARRAY(Types.ARRAY),
75+
JSONB(Types.JAVA_OBJECT);
7476

7577
/**
7678
* The Integer value for the JDBCType. It maps to a value in {@code Types.java}
@@ -122,15 +124,15 @@ public Integer getVendorTypeNumber() {
122124
* {@code Types} value
123125
* @see Types
124126
*/
125-
public static PostgresType valueOf(final int type, final Map<Integer, PostgresType> postgresTypeMap) {
127+
public static PostgresType valueOf(final int type, final Map<Integer, PostgresType> postgresTypeMap) {
126128
if (postgresTypeMap.containsKey(type)) {
127129
return postgresTypeMap.get(type);
128130
}
129131
throw new IllegalArgumentException("Type:" + type + " is not a valid "
130132
+ "Types.java value.");
131133
}
132134

133-
public static PostgresType safeGetJdbcType(final int columnTypeInt, final Map<Integer, PostgresType> postgresTypeMap) {
135+
public static PostgresType safeGetJdbcType(final int columnTypeInt, final Map<Integer, PostgresType> postgresTypeMap) {
134136
try {
135137
return PostgresType.valueOf(columnTypeInt, postgresTypeMap);
136138
} catch (final Exception e) {

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java

+35-3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,18 @@ public boolean testCatalog() {
4646
return true;
4747
}
4848

49+
protected String getValueFromJsonNode(final JsonNode jsonNode) {
50+
if (jsonNode != null) {
51+
if (jsonNode.isArray() || jsonNode.isObject()) {
52+
return jsonNode.toString();
53+
}
54+
55+
String value = jsonNode.asText();
56+
return (value != null && value.equals("null") ? null : value);
57+
}
58+
return null;
59+
}
60+
4961
// Test cases are sorted alphabetically based on the source type
5062
// See https://www.postgresql.org/docs/14/datatype.html
5163
@Override
@@ -253,9 +265,12 @@ protected void initTests() {
253265
addDataTypeTestData(
254266
TestDataHolder.builder()
255267
.sourceType("jsonb")
256-
.airbyteType(JsonSchemaType.STRING)
257-
.addInsertValues("null", "'[1, 2, 3]'::jsonb")
258-
.addExpectedValues(null, "[1, 2, 3]")
268+
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.JSONB)
269+
.withLegacyAirbyteTypeProperty("json")
270+
.build())
271+
.addInsertValues("null", "'10000'::jsonb", "'true'::jsonb", "'[1,2,3]'::jsonb",
272+
"'{\"Janet\": 1, \"Melissa\": {\"loves\": \"trees\", \"married\": true}}'::jsonb")
273+
.addExpectedValues(null, "10000", "true", "[1,2,3]", "{\"Janet\":1,\"Melissa\":{\"loves\":\"trees\",\"married\":true}}")
259274
.build());
260275

261276
addDataTypeTestData(
@@ -578,6 +593,23 @@ protected void initTests() {
578593

579594
addTimeWithTimeZoneTest();
580595
addArraysTestData();
596+
addJsonbArrayTest();
597+
}
598+
599+
protected void addJsonbArrayTest() {
600+
601+
addDataTypeTestData(
602+
TestDataHolder.builder()
603+
.sourceType("jsonb_array")
604+
.fullSourceDataType("JSONB[]")
605+
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
606+
.withItems(JsonSchemaType.JSONB)
607+
.build())
608+
.addInsertValues(
609+
"ARRAY['[1,2,1]', 'false']::jsonb[]",
610+
"ARRAY['{\"letter\":\"A\", \"digit\":30}', '{\"letter\":\"B\", \"digit\":31}']::jsonb[]")
611+
.addExpectedValues("[[1,2,1],false]", "[{\"digit\":30,\"letter\":\"A\"},{\"digit\":31,\"letter\":\"B\"}]")
612+
.build());
581613
}
582614

583615
protected void addTimeWithTimeZoneTest() {

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcInitialSnapshotPostgresSourceDatatypeTest.java

+21
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@
1111
import io.airbyte.db.factory.DSLContextFactory;
1212
import io.airbyte.db.factory.DatabaseDriver;
1313
import io.airbyte.db.jdbc.JdbcUtils;
14+
import io.airbyte.integrations.standardtest.source.TestDataHolder;
1415
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
1516
import io.airbyte.integrations.util.HostPortResolver;
17+
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil;
18+
import io.airbyte.protocol.models.JsonSchemaType;
1619
import java.util.List;
1720
import org.jooq.SQLDialect;
1821
import org.testcontainers.containers.PostgreSQLContainer;
@@ -95,6 +98,24 @@ protected void tearDown(final TestDestinationEnv testEnv) {
9598
container.close();
9699
}
97100

101+
@Override
102+
protected void addJsonbArrayTest() {
103+
104+
addDataTypeTestData(
105+
TestDataHolder.builder()
106+
.sourceType("jsonb_array")
107+
.fullSourceDataType("JSONB[]")
108+
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitiveUtil.JsonSchemaPrimitive.ARRAY)
109+
.withItems(JsonSchemaType.JSONB)
110+
.build())
111+
.addInsertValues(
112+
"ARRAY['[1,2,1]', 'false']::jsonb[]",
113+
"ARRAY['{\"letter\":\"A\", \"digit\":30}', '{\"letter\":\"B\", \"digit\":31}']::jsonb[]")
114+
.addExpectedValues("[\"[1, 2, 1]\",\"false\"]",
115+
"[\"{\\\"digit\\\": 30, \\\"letter\\\": \\\"A\\\"}\",\"{\\\"digit\\\": 31, \\\"letter\\\": \\\"B\\\"}\"]")
116+
.build());
117+
}
118+
98119
public boolean testCatalog() {
99120
return true;
100121
}

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java

+19
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.airbyte.integrations.standardtest.source.TestDataHolder;
1515
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
1616
import io.airbyte.integrations.util.HostPortResolver;
17+
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil;
1718
import io.airbyte.protocol.models.JsonSchemaType;
1819
import io.airbyte.protocol.models.v0.AirbyteMessage;
1920
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
@@ -186,4 +187,22 @@ protected void addTimestampWithInfinityValuesTest() {
186187
}
187188
}
188189

190+
@Override
191+
protected void addJsonbArrayTest() {
192+
193+
addDataTypeTestData(
194+
TestDataHolder.builder()
195+
.sourceType("jsonb_array")
196+
.fullSourceDataType("JSONB[]")
197+
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitiveUtil.JsonSchemaPrimitive.ARRAY)
198+
.withItems(JsonSchemaType.JSONB)
199+
.build())
200+
.addInsertValues(
201+
"ARRAY['[1,2,1]', 'false']::jsonb[]",
202+
"ARRAY['{\"letter\":\"A\", \"digit\":30}', '{\"letter\":\"B\", \"digit\":31}']::jsonb[]")
203+
.addExpectedValues("[\"[1, 2, 1]\",\"false\"]",
204+
"[\"{\\\"digit\\\": 30, \\\"letter\\\": \\\"A\\\"}\",\"{\\\"digit\\\": 31, \\\"letter\\\": \\\"B\\\"}\"]")
205+
.build());
206+
}
207+
189208
}

0 commit comments

Comments
 (0)