Skip to content

Commit 85b52ed

Browse files
gisripatanawatpan
authored andcommitted
Destination Postgres: CDK T+D initial state gathering (airbytehq#35385)
Signed-off-by: Gireesh Sreepathi <[email protected]>
1 parent 80e2ebb commit 85b52ed

File tree

13 files changed

+80
-75
lines changed

13 files changed

+80
-75
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ MavenLocal debugging steps:
166166

167167
| Version | Date | Pull Request | Subject |
168168
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
169+
| 0.23.2 | 2024-02-22 | [\#35385](https://github.com/airbytehq/airbyte/pull/35342) | Bugfix: inverted logic of disableTypeDedupe flag |
170+
| 0.23.1 | 2024-02-22 | [\#35527](https://github.com/airbytehq/airbyte/pull/35527) | reduce shutdown timeouts |
169171
| 0.23.0 | 2024-02-22 | [\#35342](https://github.com/airbytehq/airbyte/pull/35342) | Consolidate and perform upfront gathering of DB metadata state |
170172
| 0.21.4 | 2024-02-21 | [\#35511](https://github.com/airbytehq/airbyte/pull/35511) | Reduce CDC state compression limit to 1MB |
171173
| 0.21.3 | 2024-02-20 | [\#35394](https://github.com/airbytehq/airbyte/pull/35394) | Add Junit progress information to the test logs |
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.23.0
1+
version=0.23.2

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi
317317
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
318318
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
319319
final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database);
320-
final boolean disableTypeDedupe = !config.has(DISABLE_TYPE_DEDUPE) || config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
320+
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
321321
final TyperDeduper typerDeduper;
322322
if (disableTypeDedupe) {
323323
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);

airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.20.4'
6+
cdkVersionRequired = '0.23.2'
77
features = ['db-destinations', 'typing-deduping', 'datastore-postgres']
88
useLocalCdk = false
99
}

airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: database
33
connectorType: destination
44
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
5-
dockerImageTag: 2.0.0
5+
dockerImageTag: 2.0.1
66
dockerRepository: airbyte/destination-postgres-strict-encrypt
77
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
88
githubIssueLabel: destination-postgres

airbyte-integrations/connectors/destination-postgres/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.20.4'
6+
cdkVersionRequired = '0.23.2'
77
features = ['db-destinations', 'datastore-postgres', 'typing-deduping']
88
useLocalCdk = false
99
}

airbyte-integrations/connectors/destination-postgres/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
8-
dockerImageTag: 2.0.0
8+
dockerImageTag: 2.0.1
99
dockerRepository: airbyte/destination-postgres
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
1111
githubIssueLabel: destination-postgres

airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414
import com.google.common.collect.ImmutableMap;
1515
import io.airbyte.cdk.db.factory.DataSourceFactory;
1616
import io.airbyte.cdk.db.factory.DatabaseDriver;
17+
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
1718
import io.airbyte.cdk.db.jdbc.JdbcUtils;
1819
import io.airbyte.cdk.integrations.base.Destination;
1920
import io.airbyte.cdk.integrations.base.IntegrationRunner;
2021
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
2122
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
23+
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
2224
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
2325
import io.airbyte.commons.json.Jsons;
26+
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDestinationHandler;
2427
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator;
2528
import java.io.UnsupportedEncodingException;
2629
import java.net.URLEncoder;
@@ -127,6 +130,11 @@ protected JdbcSqlGenerator getSqlGenerator() {
127130
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
128131
}
129132

133+
@Override
134+
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
135+
return new PostgresDestinationHandler(databaseName, database);
136+
}
137+
130138
@Override
131139
public boolean isV2Destination() {
132140
return true;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.postgres.typing_deduping;
6+
7+
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
8+
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
9+
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
10+
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
11+
import io.airbyte.integrations.base.destination.typing_deduping.Array;
12+
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
13+
import io.airbyte.integrations.base.destination.typing_deduping.Union;
14+
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
15+
16+
public class PostgresDestinationHandler extends JdbcDestinationHandler {
17+
18+
public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase) {
19+
super(databaseName, jdbcDatabase);
20+
}
21+
22+
@Override
23+
protected String toJdbcTypeName(AirbyteType airbyteType) {
24+
// This is mostly identical to the postgres implementation, but swaps jsonb to super
25+
if (airbyteType instanceof final AirbyteProtocolType airbyteProtocolType) {
26+
return toJdbcTypeName(airbyteProtocolType);
27+
}
28+
return switch (airbyteType.getTypeName()) {
29+
case Struct.TYPE, UnsupportedOneOf.TYPE, Array.TYPE -> "jsonb";
30+
// No nested Unions supported so this will definitely not result in infinite recursion.
31+
case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType());
32+
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + airbyteType);
33+
};
34+
}
35+
36+
private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
37+
return switch (airbyteProtocolType) {
38+
case STRING -> "varchar";
39+
case NUMBER -> "numeric";
40+
case INTEGER -> "int8";
41+
case BOOLEAN -> "bool";
42+
case TIMESTAMP_WITH_TIMEZONE -> "timestamptz";
43+
case TIMESTAMP_WITHOUT_TIMEZONE -> "timestamp";
44+
case TIME_WITH_TIMEZONE -> "timetz";
45+
case TIME_WITHOUT_TIMEZONE -> "time";
46+
case DATE -> "date";
47+
case UNKNOWN -> "jsonb";
48+
};
49+
}
50+
51+
}

airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
import static org.jooq.impl.DSL.rowNumber;
2121
import static org.jooq.impl.DSL.val;
2222

23-
import com.google.common.collect.ImmutableMap;
24-
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
2523
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
26-
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
2724
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
2825
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
2926
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
@@ -37,7 +34,6 @@
3734
import java.util.ArrayList;
3835
import java.util.LinkedHashMap;
3936
import java.util.List;
40-
import java.util.Map;
4137
import java.util.Optional;
4238
import java.util.function.Function;
4339
import java.util.stream.Collectors;
@@ -54,13 +50,6 @@ public class PostgresSqlGenerator extends JdbcSqlGenerator {
5450

5551
public static final DataType<?> JSONB_TYPE = new DefaultDataType<>(null, Object.class, "jsonb");
5652

57-
private static final Map<String, String> POSTGRES_TYPE_NAME_TO_JDBC_TYPE = ImmutableMap.of(
58-
"numeric", "decimal",
59-
"int8", "bigint",
60-
"bool", "boolean",
61-
"timestamptz", "timestamp with time zone",
62-
"timetz", "time with time zone");
63-
6453
public PostgresSqlGenerator(final NamingConventionTransformer namingTransformer) {
6554
super(namingTransformer);
6655
}
@@ -309,29 +298,6 @@ protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Op
309298
.orderBy(orderedFields).as(ROW_NUMBER_COLUMN_NAME);
310299
}
311300

312-
@Override
313-
public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) {
314-
// Check that the columns match, with special handling for the metadata columns.
315-
// This is mostly identical to the redshift implementation, but swaps super to jsonb
316-
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
317-
.collect(LinkedHashMap::new,
318-
(map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue()).getTypeName()),
319-
LinkedHashMap::putAll);
320-
final LinkedHashMap<String, String> actualColumns = existingTable.columns().entrySet().stream()
321-
.filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream()
322-
.noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey())))
323-
.collect(LinkedHashMap::new,
324-
(map, column) -> map.put(column.getKey(), jdbcTypeNameFromPostgresTypeName(column.getValue().type())),
325-
LinkedHashMap::putAll);
326-
327-
final boolean sameColumns = actualColumns.equals(intendedColumns)
328-
&& "varchar".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).type())
329-
&& "timestamptz".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT).type())
330-
&& "jsonb".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META).type());
331-
332-
return sameColumns;
333-
}
334-
335301
/**
336302
* Extract a raw field, leaving it as jsonb
337303
*/
@@ -343,8 +309,4 @@ private Field<String> jsonTypeof(final Field<?> field) {
343309
return function("JSONB_TYPEOF", SQLDataType.VARCHAR, field);
344310
}
345311

346-
private static String jdbcTypeNameFromPostgresTypeName(final String redshiftType) {
347-
return POSTGRES_TYPE_NAME_TO_JDBC_TYPE.getOrDefault(redshiftType, redshiftType);
348-
}
349-
350312
}

airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,23 @@
55
package io.airbyte.integrations.destination.postgres.typing_deduping;
66

77
import static io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator.JSONB_TYPE;
8-
import static org.junit.jupiter.api.Assertions.assertAll;
98
import static org.junit.jupiter.api.Assertions.assertEquals;
9+
import static org.junit.jupiter.api.Assertions.assertFalse;
1010
import static org.junit.jupiter.api.Assertions.assertTrue;
1111

1212
import com.fasterxml.jackson.databind.JsonNode;
1313
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
1414
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
1515
import io.airbyte.cdk.db.jdbc.JdbcUtils;
16-
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
17-
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
1816
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
1917
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
2018
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
19+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
2120
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
2221
import io.airbyte.integrations.destination.postgres.PostgresDestination;
2322
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer;
2423
import io.airbyte.integrations.destination.postgres.PostgresTestDatabase;
25-
import java.util.Optional;
24+
import java.util.List;
2625
import javax.sql.DataSource;
2726
import org.jooq.DataType;
2827
import org.jooq.Field;
@@ -76,8 +75,8 @@ protected JdbcSqlGenerator getSqlGenerator() {
7675
}
7776

7877
@Override
79-
protected DestinationHandler<TableDefinition> getDestinationHandler() {
80-
return new JdbcDestinationHandler(databaseName, database);
78+
protected DestinationHandler getDestinationHandler() {
79+
return new PostgresDestinationHandler(databaseName, database);
8180
}
8281

8382
@Override
@@ -96,29 +95,11 @@ public void testCreateTableIncremental() throws Exception {
9695
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
9796
destinationHandler.execute(sql);
9897

99-
final Optional<TableDefinition> existingTable = destinationHandler.findExistingTable(incrementalDedupStream.id());
100-
101-
assertTrue(existingTable.isPresent());
102-
assertAll(
103-
() -> assertEquals("varchar", existingTable.get().columns().get("_airbyte_raw_id").type()),
104-
() -> assertEquals("timestamptz", existingTable.get().columns().get("_airbyte_extracted_at").type()),
105-
() -> assertEquals("jsonb", existingTable.get().columns().get("_airbyte_meta").type()),
106-
() -> assertEquals("int8", existingTable.get().columns().get("id1").type()),
107-
() -> assertEquals("int8", existingTable.get().columns().get("id2").type()),
108-
() -> assertEquals("timestamptz", existingTable.get().columns().get("updated_at").type()),
109-
() -> assertEquals("jsonb", existingTable.get().columns().get("struct").type()),
110-
() -> assertEquals("jsonb", existingTable.get().columns().get("array").type()),
111-
() -> assertEquals("varchar", existingTable.get().columns().get("string").type()),
112-
() -> assertEquals("numeric", existingTable.get().columns().get("number").type()),
113-
() -> assertEquals("int8", existingTable.get().columns().get("integer").type()),
114-
() -> assertEquals("bool", existingTable.get().columns().get("boolean").type()),
115-
() -> assertEquals("timestamptz", existingTable.get().columns().get("timestamp_with_timezone").type()),
116-
() -> assertEquals("timestamp", existingTable.get().columns().get("timestamp_without_timezone").type()),
117-
() -> assertEquals("timetz", existingTable.get().columns().get("time_with_timezone").type()),
118-
() -> assertEquals("time", existingTable.get().columns().get("time_without_timezone").type()),
119-
() -> assertEquals("date", existingTable.get().columns().get("date").type()),
120-
() -> assertEquals("jsonb", existingTable.get().columns().get("unknown").type()));
121-
// TODO assert on table indexing, etc.
98+
List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
99+
assertEquals(1, initialStates.size());
100+
final DestinationInitialState initialState = initialStates.getFirst();
101+
assertTrue(initialState.isFinalTablePresent());
102+
assertFalse(initialState.isSchemaMismatch());
122103
}
123104

124105
}

airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ private String generateBigString() {
4444
}
4545

4646
@Override
47-
protected SqlGenerator<?> getSqlGenerator() {
47+
protected SqlGenerator getSqlGenerator() {
4848
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
4949
}
5050

docs/integrations/destinations/postgres.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ Now that you have set up the Postgres destination connector, check out the follo
193193

194194
| Version | Date | Pull Request | Subject |
195195
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------|
196+
| 2.0.1 | 2024-02-22 | [35385](https://github.com/airbytehq/airbyte/pull/35385) | Upgrade CDK to 0.23.0; Gathering required initial state upfront |
196197
| 2.0.0 | 2024-02-09 | [35042](https://github.com/airbytehq/airbyte/pull/35042) | GA release V2 destinations format. |
197198
| 0.6.3 | 2024-02-06 | [34891](https://github.com/airbytehq/airbyte/pull/34891) | Remove varchar limit, use system defaults |
198199
| 0.6.2 | 2024-01-30 | [34683](https://github.com/airbytehq/airbyte/pull/34683) | CDK Upgrade 0.16.3; Fix dependency mismatches in slf4j lib |
@@ -220,4 +221,4 @@ Now that you have set up the Postgres destination connector, check out the follo
220221
| 0.3.13 | 2021-12-01 | [\#8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key |
221222
| 0.3.12 | 2021-11-08 | [\#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count |
222223
| 0.3.11 | 2021-09-07 | [\#5743](https://github.com/airbytehq/airbyte/pull/5743) | Add SSH Tunnel support |
223-
| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing |
224+
| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing |

0 commit comments

Comments
 (0)