Skip to content

Commit 8b83f14

Browse files
edgaogisripa
andauthored
Destination postgres: upgrade cdk (#35528)
Signed-off-by: Gireesh Sreepathi <[email protected]> Co-authored-by: Gireesh Sreepathi <[email protected]>
1 parent 16c00da commit 8b83f14

File tree

11 files changed

+103
-81
lines changed

11 files changed

+103
-81
lines changed

airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
plugins {
22
id 'airbyte-java-connector'
3+
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
34
}
45

56
airbyteJavaConnector {
6-
cdkVersionRequired = '0.23.2'
7+
cdkVersionRequired = '0.23.11'
78
features = ['db-destinations', 'typing-deduping', 'datastore-postgres']
89
useLocalCdk = false
910
}

airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: database
33
connectorType: destination
44
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
5-
dockerImageTag: 2.0.2
5+
dockerImageTag: 2.0.3
66
dockerRepository: airbyte/destination-postgres-strict-encrypt
77
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
88
githubIssueLabel: destination-postgres

airbyte-integrations/connectors/destination-postgres/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
plugins {
22
id 'airbyte-java-connector'
3+
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
34
}
45

56
airbyteJavaConnector {
6-
cdkVersionRequired = '0.23.2'
7+
cdkVersionRequired = '0.23.11'
78
features = ['db-destinations', 'datastore-postgres', 'typing-deduping']
89
useLocalCdk = false
910
}

airbyte-integrations/connectors/destination-postgres/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
8-
dockerImageTag: 2.0.2
8+
dockerImageTag: 2.0.3
99
dockerRepository: airbyte/destination-postgres
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
1111
githubIssueLabel: destination-postgres

airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@
2626
import io.airbyte.commons.json.Jsons;
2727
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDestinationHandler;
2828
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator;
29-
import java.io.UnsupportedEncodingException;
29+
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresState;
3030
import java.net.URLEncoder;
31+
import java.nio.charset.StandardCharsets;
3132
import java.time.Duration;
3233
import java.util.HashMap;
3334
import java.util.Map;
@@ -99,12 +100,7 @@ public JsonNode toJdbcConfig(final JsonNode config) {
99100

100101
String encodedDatabase = config.get(JdbcUtils.DATABASE_KEY).asText();
101102
if (encodedDatabase != null) {
102-
try {
103-
encodedDatabase = URLEncoder.encode(encodedDatabase, "UTF-8");
104-
} catch (final UnsupportedEncodingException e) {
105-
// Should never happen
106-
e.printStackTrace();
107-
}
103+
encodedDatabase = URLEncoder.encode(encodedDatabase, StandardCharsets.UTF_8);
108104
}
109105
final String jdbcUrl = String.format("jdbc:postgresql://%s:%s/%s?",
110106
config.get(JdbcUtils.HOST_KEY).asText(),
@@ -133,8 +129,8 @@ protected JdbcSqlGenerator getSqlGenerator() {
133129
}
134130

135131
@Override
136-
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
137-
return new PostgresDestinationHandler(databaseName, database);
132+
protected JdbcDestinationHandler<PostgresState> getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) {
133+
return new PostgresDestinationHandler(databaseName, database, rawTableSchema);
138134
}
139135

140136
@Override

airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDestinationHandler.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.destination.postgres.typing_deduping;
66

7+
import com.fasterxml.jackson.databind.JsonNode;
78
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
89
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
910
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
@@ -12,11 +13,12 @@
1213
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
1314
import io.airbyte.integrations.base.destination.typing_deduping.Union;
1415
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
16+
import org.jooq.SQLDialect;
1517

16-
public class PostgresDestinationHandler extends JdbcDestinationHandler {
18+
public class PostgresDestinationHandler extends JdbcDestinationHandler<PostgresState> {
1719

18-
public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase) {
19-
super(databaseName, jdbcDatabase);
20+
public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase, String rawTableSchema) {
21+
super(databaseName, jdbcDatabase, rawTableSchema, SQLDialect.POSTGRES);
2022
}
2123

2224
@Override
@@ -33,6 +35,12 @@ protected String toJdbcTypeName(AirbyteType airbyteType) {
3335
};
3436
}
3537

38+
@Override
39+
protected PostgresState toDestinationState(JsonNode json) {
40+
return new PostgresState(
41+
json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean());
42+
}
43+
3644
private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
3745
return switch (airbyteProtocolType) {
3846
case STRING -> "varchar";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.postgres.typing_deduping
6+
7+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
8+
9+
data class PostgresState(val needsSoftReset: Boolean) : MinimumDestinationState {
10+
override fun needsSoftReset(): Boolean {
11+
return needsSoftReset
12+
}
13+
14+
override fun <T : MinimumDestinationState> withSoftReset(needsSoftReset: Boolean): T {
15+
return copy(needsSoftReset = needsSoftReset) as T
16+
}
17+
}

airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
1717
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
1818
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
19-
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
19+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
2020
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
2121
import io.airbyte.integrations.destination.postgres.PostgresDestination;
2222
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer;
@@ -31,7 +31,7 @@
3131
import org.junit.jupiter.api.BeforeAll;
3232
import org.junit.jupiter.api.Test;
3333

34-
public class PostgresSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest {
34+
public class PostgresSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest<PostgresState> {
3535

3636
private static PostgresTestDatabase testContainer;
3737
private static String databaseName;
@@ -75,8 +75,8 @@ protected JdbcSqlGenerator getSqlGenerator() {
7575
}
7676

7777
@Override
78-
protected DestinationHandler getDestinationHandler() {
79-
return new PostgresDestinationHandler(databaseName, database);
78+
protected DestinationHandler<PostgresState> getDestinationHandler() {
79+
return new PostgresDestinationHandler(databaseName, database, namespace);
8080
}
8181

8282
@Override
@@ -95,11 +95,11 @@ public void testCreateTableIncremental() throws Exception {
9595
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
9696
destinationHandler.execute(sql);
9797

98-
List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
99-
assertEquals(1, initialStates.size());
100-
final DestinationInitialState initialState = initialStates.getFirst();
101-
assertTrue(initialState.isFinalTablePresent());
102-
assertFalse(initialState.isSchemaMismatch());
98+
List<DestinationInitialStatus<PostgresState>> initialStatuses = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
99+
assertEquals(1, initialStatuses.size());
100+
final DestinationInitialStatus<PostgresState> initialStatus = initialStatuses.getFirst();
101+
assertTrue(initialStatus.isFinalTablePresent());
102+
assertFalse(initialStatus.isSchemaMismatch());
103103
}
104104

105105
}

airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresContainerFactory.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ protected PostgreSQLContainer<?> createNewContainer(DockerImageName imageName) {
2626
/**
2727
* Apply the postgresql.conf file that we've packaged as a resource.
2828
*/
29-
public void withConf(PostgreSQLContainer<?> container) {
29+
public static void withConf(PostgreSQLContainer<?> container) {
3030
container
3131
.withCopyFileToContainer(
3232
MountableFile.forClasspathResource("postgresql.conf"),
@@ -37,21 +37,14 @@ public void withConf(PostgreSQLContainer<?> container) {
3737
/**
3838
* Create a new network and bind it to the container.
3939
*/
40-
public void withNetwork(PostgreSQLContainer<?> container) {
40+
public static void withNetwork(PostgreSQLContainer<?> container) {
4141
container.withNetwork(Network.newNetwork());
4242
}
4343

44-
/**
45-
* Configure postgres with wal_level=logical.
46-
*/
47-
public void withWalLevelLogical(PostgreSQLContainer<?> container) {
48-
container.withCommand("postgres -c wal_level=logical");
49-
}
50-
5144
/**
5245
* Generate SSL certificates and tell postgres to enable SSL and use them.
5346
*/
54-
public void withCert(PostgreSQLContainer<?> container) {
47+
public static void withCert(PostgreSQLContainer<?> container) {
5548
container.start();
5649
String[] commands = {
5750
"psql -U test -c \"CREATE USER postgres WITH PASSWORD 'postgres';\"",
@@ -97,7 +90,7 @@ public void withCert(PostgreSQLContainer<?> container) {
9790
/**
9891
* Tell postgres to enable SSL.
9992
*/
100-
public void withSSL(PostgreSQLContainer<?> container) {
93+
public static void withSSL(PostgreSQLContainer<?> container) {
10194
container.withCommand("postgres " +
10295
"-c ssl=on " +
10396
"-c ssl_cert_file=/var/lib/postgresql/server.crt " +
@@ -107,7 +100,7 @@ public void withSSL(PostgreSQLContainer<?> container) {
107100
/**
108101
* Configure postgres with client_encoding=sql_ascii.
109102
*/
110-
public void withASCII(PostgreSQLContainer<?> container) {
103+
public static void withASCII(PostgreSQLContainer<?> container) {
111104
container.withCommand("postgres -c client_encoding=sql_ascii");
112105
}
113106

airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/PostgresTestDatabase.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
import com.google.common.collect.ImmutableMap;
88
import io.airbyte.cdk.db.factory.DatabaseDriver;
99
import io.airbyte.cdk.db.jdbc.JdbcUtils;
10+
import io.airbyte.cdk.testutils.ContainerFactory.NamedContainerModifier;
1011
import io.airbyte.cdk.testutils.TestDatabase;
1112
import io.airbyte.commons.json.Jsons;
1213
import java.io.IOException;
1314
import java.io.UncheckedIOException;
1415
import java.util.List;
16+
import java.util.function.Consumer;
1517
import java.util.stream.Stream;
1618
import org.jooq.SQLDialect;
1719
import org.testcontainers.containers.PostgreSQLContainer;
@@ -39,27 +41,30 @@ private BaseImage(String reference) {
3941

4042
}
4143

42-
public static enum ContainerModifier {
44+
public enum ContainerModifier implements NamedContainerModifier<PostgreSQLContainer<?>> {
4345

44-
ASCII("withASCII"),
45-
CONF("withConf"),
46-
NETWORK("withNetwork"),
47-
SSL("withSSL"),
48-
WAL_LEVEL_LOGICAL("withWalLevelLogical"),
49-
CERT("withCert"),
46+
ASCII(PostgresContainerFactory::withASCII),
47+
CONF(PostgresContainerFactory::withConf),
48+
NETWORK(PostgresContainerFactory::withNetwork),
49+
SSL(PostgresContainerFactory::withSSL),
50+
CERT(PostgresContainerFactory::withCert),
5051
;
5152

52-
private String methodName;
53+
private Consumer<PostgreSQLContainer<?>> modifer;
5354

54-
private ContainerModifier(String methodName) {
55-
this.methodName = methodName;
55+
private ContainerModifier(final Consumer<PostgreSQLContainer<?>> modifer) {
56+
this.modifer = modifer;
57+
}
58+
59+
@Override
60+
public Consumer<PostgreSQLContainer<?>> modifier() {
61+
return modifer;
5662
}
5763

5864
}
5965

6066
static public PostgresTestDatabase in(BaseImage baseImage, ContainerModifier... modifiers) {
61-
String[] methodNames = Stream.of(modifiers).map(im -> im.methodName).toList().toArray(new String[0]);
62-
final var container = new PostgresContainerFactory().shared(baseImage.reference, methodNames);
67+
final var container = new PostgresContainerFactory().shared(baseImage.reference, modifiers);
6368
return new PostgresTestDatabase(container).initialized();
6469
}
6570

0 commit comments

Comments
 (0)