Skip to content

Commit 4534703

Browse files
authored
🎉Source Postgres: Set up connection - add schema selection (#9360)
* [1435] Source Postgres: Set up connection - added schema selection
1 parent 2b0d0bd commit 4534703

File tree

22 files changed

+122
-28
lines changed

22 files changed

+122
-28
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
33
"name": "Postgres",
44
"dockerRepository": "airbyte/source-postgres",
5-
"dockerImageTag": "0.4.1",
5+
"dockerImageTag": "0.4.2",
66
"documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres",
77
"icon": "postgresql.svg"
88
}

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@
537537
- name: Postgres
538538
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
539539
dockerRepository: airbyte/source-postgres
540-
dockerImageTag: 0.4.1
540+
dockerImageTag: 0.4.2
541541
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
542542
icon: postgresql.svg
543543
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+16-5
Original file line numberDiff line numberDiff line change
@@ -5503,7 +5503,7 @@
55035503
supportsNormalization: false
55045504
supportsDBT: false
55055505
supported_destination_sync_modes: []
5506-
- dockerImage: "airbyte/source-postgres:0.4.1"
5506+
- dockerImage: "airbyte/source-postgres:0.4.2"
55075507
spec:
55085508
documentationUrl: "https://docs.airbyte.io/integrations/sources/postgres"
55095509
connectionSpecification:
@@ -5537,28 +5537,39 @@
55375537
description: "Name of the database."
55385538
type: "string"
55395539
order: 2
5540+
schemas:
5541+
title: "Schemas"
5542+
description: "The list of schemas to sync from. Defaults to user. Case sensitive."
5543+
type: "array"
5544+
items:
5545+
type: "string"
5546+
minItems: 0
5547+
uniqueItems: true
5548+
default:
5549+
- "public"
5550+
order: 3
55405551
username:
55415552
title: "User"
55425553
description: "Username to use to access the database."
55435554
type: "string"
5544-
order: 3
5555+
order: 4
55455556
password:
55465557
title: "Password"
55475558
description: "Password associated with the username."
55485559
type: "string"
55495560
airbyte_secret: true
5550-
order: 4
5561+
order: 5
55515562
ssl:
55525563
title: "Connect using SSL"
55535564
description: "Encrypt client/server communications for increased security."
55545565
type: "boolean"
55555566
default: false
5556-
order: 5
5567+
order: 6
55575568
replication_method:
55585569
type: "object"
55595570
title: "Replication Method"
55605571
description: "Replication method to use for extracting data from the database."
5561-
order: 6
5572+
order: 7
55625573
oneOf:
55635574
- title: "Standard"
55645575
additionalProperties: false

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshBastionContainer.java

+5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.google.common.collect.ImmutableMap;
1212
import io.airbyte.commons.json.Jsons;
1313
import java.io.IOException;
14+
import java.util.List;
1415
import java.util.Objects;
1516
import org.testcontainers.containers.GenericContainer;
1617
import org.testcontainers.containers.JdbcDatabaseContainer;
@@ -57,6 +58,10 @@ public ImmutableMap.Builder<Object, Object> getBasicDbConfigBuider(final JdbcDat
5758
return getBasicDbConfigBuider(db, db.getDatabaseName());
5859
}
5960

61+
public ImmutableMap.Builder<Object, Object> getBasicDbConfigBuider(final JdbcDatabaseContainer<?> db, final List<String> schemas) {
62+
return getBasicDbConfigBuider(db, db.getDatabaseName()).put("schemas", schemas);
63+
}
64+
6065
public ImmutableMap.Builder<Object, Object> getBasicDbConfigBuider(final JdbcDatabaseContainer<?> db, final String schemaName) {
6166
return ImmutableMap.builder()
6267
.put("host", Objects.requireNonNull(db.getContainerInfo().getNetworkSettings()

airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ private void init() {
7777
.put("host", container.getHost())
7878
.put("port", container.getFirstMappedPort())
7979
.put("database", dbName)
80+
.put("schemas", List.of(MODELS_SCHEMA, MODELS_SCHEMA + "_random"))
8081
.put("username", TEST_USER_NAME)
8182
.put("password", TEST_USER_PASSWORD)
8283
.put("replication_method", "CDC")

airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.1.6
19+
LABEL io.airbyte.version=0.1.7
2020
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt

airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/resources/expected_spec.json

+15-3
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,36 @@
2929
"type": "string",
3030
"order": 2
3131
},
32+
"schemas": {
33+
"title": "Schemas",
34+
"description": "The list of schemas to sync from. Defaults to user. Case sensitive.",
35+
"type": "array",
36+
"items": {
37+
"type": "string"
38+
},
39+
"minItems": 0,
40+
"uniqueItems": true,
41+
"default": ["public"],
42+
"order": 3
43+
},
3244
"username": {
3345
"title": "User",
3446
"description": "Username to use to access the database.",
3547
"type": "string",
36-
"order": 3
48+
"order": 4
3749
},
3850
"password": {
3951
"title": "Password",
4052
"description": "Password associated with the username.",
4153
"type": "string",
4254
"airbyte_secret": true,
43-
"order": 4
55+
"order": 5
4456
},
4557
"replication_method": {
4658
"type": "object",
4759
"title": "Replication Method",
4860
"description": "Replication method to use for extracting data from the database.",
49-
"order": 6,
61+
"order": 7,
5062
"oneOf": [
5163
{
5264
"title": "Standard",

airbyte-integrations/connectors/source-postgres/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.4.1
19+
LABEL io.airbyte.version=0.4.2
2020
LABEL io.airbyte.name=airbyte/source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java

+31
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Sour
5050
public static final String CDC_LSN = "_ab_cdc_lsn";
5151

5252
static final String DRIVER_CLASS = "org.postgresql.Driver";
53+
private List<String> schemas;
5354

5455
public static Source sshWrappedSource() {
5556
return new SshWrappedSource(new PostgresSource(), List.of("host"), List.of("port"));
@@ -81,6 +82,17 @@ public JsonNode toDatabaseConfigStatic(final JsonNode config) {
8182
additionalParameters.add("sslmode=require");
8283
}
8384

85+
if (config.has("schemas") && config.get("schemas").isArray()) {
86+
schemas = new ArrayList<>();
87+
for (final JsonNode schema : config.get("schemas")) {
88+
schemas.add(schema.asText());
89+
}
90+
}
91+
92+
if (schemas != null && !schemas.isEmpty()) {
93+
additionalParameters.add("currentSchema=" + String.join(",", schemas));
94+
}
95+
8496
additionalParameters.forEach(x -> jdbcUrl.append(x).append("&"));
8597

8698
final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
@@ -116,6 +128,25 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {
116128
return catalog;
117129
}
118130

131+
@Override
132+
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(JdbcDatabase database) throws Exception {
133+
if (schemas != null && !schemas.isEmpty()) {
134+
// process explicitly selected (from UI) schemas
135+
final List<TableInfo<CommonField<JDBCType>>> internals = new ArrayList<>();
136+
for (String schema : schemas) {
137+
LOGGER.debug("Discovering schema: {}", schema);
138+
internals.addAll(super.discoverInternal(database, schema));
139+
}
140+
for (TableInfo<CommonField<JDBCType>> info : internals) {
141+
LOGGER.debug("Found table (schema: {}): {}", info.getNameSpace(), info.getName());
142+
}
143+
return internals;
144+
} else {
145+
LOGGER.info("No schemas explicitly set on UI to process, so will process all of existing schemas in DB");
146+
return super.discoverInternal(database);
147+
}
148+
}
149+
119150
@Override
120151
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config)
121152
throws Exception {

airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json

+16-4
Original file line numberDiff line numberDiff line change
@@ -29,31 +29,43 @@
2929
"type": "string",
3030
"order": 2
3131
},
32+
"schemas": {
33+
"title": "Schemas",
34+
"description": "The list of schemas to sync from. Defaults to user. Case sensitive.",
35+
"type": "array",
36+
"items": {
37+
"type": "string"
38+
},
39+
"minItems": 0,
40+
"uniqueItems": true,
41+
"default": ["public"],
42+
"order": 3
43+
},
3244
"username": {
3345
"title": "User",
3446
"description": "Username to use to access the database.",
3547
"type": "string",
36-
"order": 3
48+
"order": 4
3749
},
3850
"password": {
3951
"title": "Password",
4052
"description": "Password associated with the username.",
4153
"type": "string",
4254
"airbyte_secret": true,
43-
"order": 4
55+
"order": 5
4456
},
4557
"ssl": {
4658
"title": "Connect using SSL",
4759
"description": "Encrypt client/server communications for increased security.",
4860
"type": "boolean",
4961
"default": false,
50-
"order": 5
62+
"order": 6
5163
},
5264
"replication_method": {
5365
"type": "object",
5466
"title": "Replication Method",
5567
"description": "Replication method to use for extracting data from the database.",
56-
"order": 6,
68+
"order": 7,
5769
"oneOf": [
5870
{
5971
"title": "Standard",

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public abstract class AbstractSshPostgresSourceAcceptanceTest extends SourceAcce
4343
@Override
4444
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
4545
startTestContainers();
46-
config = bastion.getTunnelConfig(getTunnelMethod(), bastion.getBasicDbConfigBuider(db));
46+
config = bastion.getTunnelConfig(getTunnelMethod(), bastion.getBasicDbConfigBuider(db, List.of("public")));
4747
populateDatabaseTestData();
4848

4949
}

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
6767
.put("host", container.getHost())
6868
.put("port", container.getFirstMappedPort())
6969
.put("database", container.getDatabaseName())
70+
.put("schemas", List.of(NAMESPACE))
7071
.put("username", container.getUsername())
7172
.put("password", container.getPassword())
7273
.put("replication_method", replicationMethod)

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
import io.airbyte.integrations.standardtest.source.TestDataHolder;
1414
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
1515
import io.airbyte.protocol.models.JsonSchemaPrimitive;
16+
import java.util.List;
1617
import org.jooq.SQLDialect;
1718
import org.testcontainers.containers.PostgreSQLContainer;
1819
import org.testcontainers.utility.MountableFile;
1920

2021
public class CdcPostgresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
2122

23+
private static final String SCHEMA_NAME = "test";
2224
private static final String SLOT_NAME_BASE = "debezium_slot";
2325
private static final String PUBLICATION = "publication";
2426
private PostgreSQLContainer<?> container;
@@ -47,6 +49,7 @@ protected Database setupDatabase() throws Exception {
4749
.put("host", container.getHost())
4850
.put("port", container.getFirstMappedPort())
4951
.put("database", container.getDatabaseName())
52+
.put("schemas", List.of(SCHEMA_NAME))
5053
.put("username", container.getUsername())
5154
.put("password", container.getPassword())
5255
.put("replication_method", replicationMethod)
@@ -83,7 +86,7 @@ protected Database setupDatabase() throws Exception {
8386

8487
@Override
8588
protected String getNameSpace() {
86-
return "test";
89+
return SCHEMA_NAME;
8790
}
8891

8992
@Override

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
4747
.put("host", container.getHost())
4848
.put("port", container.getFirstMappedPort())
4949
.put("database", container.getDatabaseName())
50+
.put("schemas", Jsons.jsonNode(List.of("public")))
5051
.put("username", container.getUsername())
5152
.put("password", container.getPassword())
5253
.put("ssl", false)

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,13 @@
1616
import java.sql.SQLException;
1717
import java.util.Set;
1818
import org.jooq.SQLDialect;
19-
import org.slf4j.Logger;
20-
import org.slf4j.LoggerFactory;
2119
import org.testcontainers.containers.PostgreSQLContainer;
2220

2321
public class PostgresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
2422

2523
private PostgreSQLContainer<?> container;
2624
private JsonNode config;
27-
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceDatatypeTest.class);
25+
private static final String SCHEMA_NAME = "test";
2826

2927
@Override
3028
protected Database setupDatabase() throws SQLException {
@@ -54,7 +52,7 @@ protected Database setupDatabase() throws SQLException {
5452
SQLDialect.POSTGRES);
5553

5654
database.query(ctx -> {
57-
ctx.execute("CREATE SCHEMA TEST;");
55+
ctx.execute(String.format("CREATE SCHEMA %S;", SCHEMA_NAME));
5856
ctx.execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy');");
5957
ctx.execute("CREATE TYPE inventory_item AS (name text, supplier_id integer, price numeric);");
6058
// In one of the test case, we have some money values with currency symbol. Postgres can only
@@ -74,7 +72,7 @@ protected Database setupDatabase() throws SQLException {
7472

7573
@Override
7674
protected String getNameSpace() {
77-
return "test";
75+
return SCHEMA_NAME;
7876
}
7977

8078
@Override

airbyte-integrations/connectors/source-postgres/src/test-performance/java/io/airbyte/integrations/source/postgres/PostgresRdsSourcePerformanceTest.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@
1010
import io.airbyte.commons.json.Jsons;
1111
import io.airbyte.integrations.standardtest.source.performancetest.AbstractSourcePerformanceTest;
1212
import java.nio.file.Path;
13+
import java.util.List;
1314
import java.util.stream.Stream;
1415
import org.junit.jupiter.api.BeforeAll;
1516
import org.junit.jupiter.params.provider.Arguments;
1617

1718
public class PostgresRdsSourcePerformanceTest extends AbstractSourcePerformanceTest {
1819

1920
private static final String PERFORMANCE_SECRET_CREDS = "secrets/performance-config.json";
21+
private static final List<String> SCHEMAS = List.of("test1000tables240columns200recordsDb",
22+
"newregular25tables50000records", "newsmall1000tableswith10000rows");
2023

2124
@Override
2225
protected String getImageName() {
@@ -35,6 +38,7 @@ protected void setupDatabase(String dbName) {
3538
.put("host", plainConfig.get("host"))
3639
.put("port", plainConfig.get("port"))
3740
.put("database", plainConfig.get("database"))
41+
.put("schemas", SCHEMAS)
3842
.put("username", plainConfig.get("username"))
3943
.put("password", plainConfig.get("password"))
4044
.put("ssl", true)
@@ -53,9 +57,9 @@ protected void setupDatabase(String dbName) {
5357
@BeforeAll
5458
public static void beforeAll() {
5559
AbstractSourcePerformanceTest.testArgs = Stream.of(
56-
Arguments.of("test1000tables240columns200recordsDb", "test1000tables240columns200recordsDb", 200, 240, 1000),
57-
Arguments.of("newregular25tables50000records", "newregular25tables50000records", 50000, 8, 25),
58-
Arguments.of("newsmall1000tableswith10000rows", "newsmall1000tableswith10000rows", 10000, 8, 1000));
60+
Arguments.of(SCHEMAS.get(0), SCHEMAS.get(0), 200, 240, 1000),
61+
Arguments.of(SCHEMAS.get(1), SCHEMAS.get(1), 50000, 8, 25),
62+
Arguments.of(SCHEMAS.get(2), SCHEMAS.get(2), 10000, 8, 1000));
5963
}
6064

6165
}

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ private JsonNode getConfig(final String dbName) {
9898
.put("host", container.getHost())
9999
.put("port", container.getFirstMappedPort())
100100
.put("database", dbName)
101+
.put("schemas", List.of(MODELS_SCHEMA, MODELS_SCHEMA + "_random"))
101102
.put("username", container.getUsername())
102103
.put("password", container.getPassword())
103104
.put("ssl", false)

0 commit comments

Comments
 (0)