Skip to content

Commit ee06cd5

Browse files
committed
postgres/cdk-td-changes
1 parent eca8629 commit ee06cd5

File tree

6 files changed

+66
-61
lines changed

6 files changed

+66
-61
lines changed

airbyte-integrations/connectors/destination-postgres/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
airbyteJavaConnector {
66
cdkVersionRequired = '0.20.4'
77
features = ['db-destinations', 'datastore-postgres', 'typing-deduping']
8-
useLocalCdk = false
8+
useLocalCdk = true
99
}
1010

1111
application {

airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414
import com.google.common.collect.ImmutableMap;
1515
import io.airbyte.cdk.db.factory.DataSourceFactory;
1616
import io.airbyte.cdk.db.factory.DatabaseDriver;
17+
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
1718
import io.airbyte.cdk.db.jdbc.JdbcUtils;
1819
import io.airbyte.cdk.integrations.base.Destination;
1920
import io.airbyte.cdk.integrations.base.IntegrationRunner;
2021
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
2122
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
23+
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
2224
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
2325
import io.airbyte.commons.json.Jsons;
26+
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDestinationHandler;
2427
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator;
2528
import java.io.UnsupportedEncodingException;
2629
import java.net.URLEncoder;
@@ -127,6 +130,11 @@ protected JdbcSqlGenerator getSqlGenerator() {
127130
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
128131
}
129132

133+
@Override
134+
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
135+
return new PostgresDestinationHandler(databaseName, database);
136+
}
137+
130138
@Override
131139
public boolean isV2Destination() {
132140
return true;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package io.airbyte.integrations.destination.postgres.typing_deduping;
2+
3+
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
4+
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
5+
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
6+
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
7+
import io.airbyte.integrations.base.destination.typing_deduping.Array;
8+
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
9+
import io.airbyte.integrations.base.destination.typing_deduping.Union;
10+
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
11+
12+
public class PostgresDestinationHandler extends JdbcDestinationHandler {
13+
14+
public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase) {
15+
super(databaseName, jdbcDatabase);
16+
}
17+
18+
@Override
19+
protected String toJdbcTypeName(AirbyteType airbyteType) {
20+
// This is mostly identical to the postgres implementation, but swaps jsonb to super
21+
if (airbyteType instanceof final AirbyteProtocolType airbyteProtocolType) {
22+
return toJdbcTypeName(airbyteProtocolType);
23+
}
24+
return switch (airbyteType.getTypeName()) {
25+
case Struct.TYPE, UnsupportedOneOf.TYPE, Array.TYPE -> "jsonb";
26+
// No nested Unions supported so this will definitely not result in infinite recursion.
27+
case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType());
28+
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + airbyteType);
29+
};
30+
}
31+
32+
private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
33+
return switch (airbyteProtocolType) {
34+
case STRING -> "varchar";
35+
case NUMBER -> "numeric";
36+
case INTEGER -> "int8";
37+
case BOOLEAN -> "bool";
38+
case TIMESTAMP_WITH_TIMEZONE -> "timestamptz";
39+
case TIMESTAMP_WITHOUT_TIMEZONE -> "timestamp";
40+
case TIME_WITH_TIMEZONE -> "timetz";
41+
case TIME_WITHOUT_TIMEZONE -> "time";
42+
case DATE -> "date";
43+
case UNKNOWN -> "jsonb";
44+
};
45+
}
46+
}

airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,6 @@ public class PostgresSqlGenerator extends JdbcSqlGenerator {
5454

5555
public static final DataType<?> JSONB_TYPE = new DefaultDataType<>(null, Object.class, "jsonb");
5656

57-
private static final Map<String, String> POSTGRES_TYPE_NAME_TO_JDBC_TYPE = ImmutableMap.of(
58-
"numeric", "decimal",
59-
"int8", "bigint",
60-
"bool", "boolean",
61-
"timestamptz", "timestamp with time zone",
62-
"timetz", "time with time zone");
63-
6457
public PostgresSqlGenerator(final NamingConventionTransformer namingTransformer) {
6558
super(namingTransformer);
6659
}
@@ -309,29 +302,6 @@ protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Op
309302
.orderBy(orderedFields).as(ROW_NUMBER_COLUMN_NAME);
310303
}
311304

312-
@Override
313-
public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) {
314-
// Check that the columns match, with special handling for the metadata columns.
315-
// This is mostly identical to the redshift implementation, but swaps super to jsonb
316-
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
317-
.collect(LinkedHashMap::new,
318-
(map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue()).getTypeName()),
319-
LinkedHashMap::putAll);
320-
final LinkedHashMap<String, String> actualColumns = existingTable.columns().entrySet().stream()
321-
.filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream()
322-
.noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey())))
323-
.collect(LinkedHashMap::new,
324-
(map, column) -> map.put(column.getKey(), jdbcTypeNameFromPostgresTypeName(column.getValue().type())),
325-
LinkedHashMap::putAll);
326-
327-
final boolean sameColumns = actualColumns.equals(intendedColumns)
328-
&& "varchar".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).type())
329-
&& "timestamptz".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT).type())
330-
&& "jsonb".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META).type());
331-
332-
return sameColumns;
333-
}
334-
335305
/**
336306
* Extract a raw field, leaving it as jsonb
337307
*/
@@ -343,8 +313,4 @@ private Field<String> jsonTypeof(final Field<?> field) {
343313
return function("JSONB_TYPEOF", SQLDataType.VARCHAR, field);
344314
}
345315

346-
private static String jdbcTypeNameFromPostgresTypeName(final String redshiftType) {
347-
return POSTGRES_TYPE_NAME_TO_JDBC_TYPE.getOrDefault(redshiftType, redshiftType);
348-
}
349-
350316
}

airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator.JSONB_TYPE;
88
import static org.junit.jupiter.api.Assertions.assertAll;
99
import static org.junit.jupiter.api.Assertions.assertEquals;
10+
import static org.junit.jupiter.api.Assertions.assertFalse;
1011
import static org.junit.jupiter.api.Assertions.assertTrue;
1112

1213
import com.fasterxml.jackson.databind.JsonNode;
@@ -18,10 +19,12 @@
1819
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
1920
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
2021
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
22+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
2123
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
2224
import io.airbyte.integrations.destination.postgres.PostgresDestination;
2325
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer;
2426
import io.airbyte.integrations.destination.postgres.PostgresTestDatabase;
27+
import java.util.List;
2528
import java.util.Optional;
2629
import javax.sql.DataSource;
2730
import org.jooq.DataType;
@@ -76,8 +79,8 @@ protected JdbcSqlGenerator getSqlGenerator() {
7679
}
7780

7881
@Override
79-
protected DestinationHandler<TableDefinition> getDestinationHandler() {
80-
return new JdbcDestinationHandler(databaseName, database);
82+
protected DestinationHandler getDestinationHandler() {
83+
return new PostgresDestinationHandler(databaseName, database);
8184
}
8285

8386
@Override
@@ -96,29 +99,11 @@ public void testCreateTableIncremental() throws Exception {
9699
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
97100
destinationHandler.execute(sql);
98101

99-
final Optional<TableDefinition> existingTable = destinationHandler.findExistingTable(incrementalDedupStream.id());
100-
101-
assertTrue(existingTable.isPresent());
102-
assertAll(
103-
() -> assertEquals("varchar", existingTable.get().columns().get("_airbyte_raw_id").type()),
104-
() -> assertEquals("timestamptz", existingTable.get().columns().get("_airbyte_extracted_at").type()),
105-
() -> assertEquals("jsonb", existingTable.get().columns().get("_airbyte_meta").type()),
106-
() -> assertEquals("int8", existingTable.get().columns().get("id1").type()),
107-
() -> assertEquals("int8", existingTable.get().columns().get("id2").type()),
108-
() -> assertEquals("timestamptz", existingTable.get().columns().get("updated_at").type()),
109-
() -> assertEquals("jsonb", existingTable.get().columns().get("struct").type()),
110-
() -> assertEquals("jsonb", existingTable.get().columns().get("array").type()),
111-
() -> assertEquals("varchar", existingTable.get().columns().get("string").type()),
112-
() -> assertEquals("numeric", existingTable.get().columns().get("number").type()),
113-
() -> assertEquals("int8", existingTable.get().columns().get("integer").type()),
114-
() -> assertEquals("bool", existingTable.get().columns().get("boolean").type()),
115-
() -> assertEquals("timestamptz", existingTable.get().columns().get("timestamp_with_timezone").type()),
116-
() -> assertEquals("timestamp", existingTable.get().columns().get("timestamp_without_timezone").type()),
117-
() -> assertEquals("timetz", existingTable.get().columns().get("time_with_timezone").type()),
118-
() -> assertEquals("time", existingTable.get().columns().get("time_without_timezone").type()),
119-
() -> assertEquals("date", existingTable.get().columns().get("date").type()),
120-
() -> assertEquals("jsonb", existingTable.get().columns().get("unknown").type()));
121-
// TODO assert on table indexing, etc.
102+
List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
103+
assertEquals(1, initialStates.size());
104+
final DestinationInitialState initialState = initialStates.getFirst();
105+
assertTrue(initialState.isFinalTablePresent());
106+
assertFalse(initialState.isSchemaMismatch());
122107
}
123108

124109
}

airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ private String generateBigString() {
4444
}
4545

4646
@Override
47-
protected SqlGenerator<?> getSqlGenerator() {
47+
protected SqlGenerator getSqlGenerator() {
4848
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
4949
}
5050

0 commit comments

Comments
 (0)