Skip to content

🎉Source-redshift: added an optional field for schema\s selection #9721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 26, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.CommonField;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -22,6 +25,7 @@ public class RedshiftSource extends AbstractJdbcSource<JDBCType> implements Sour

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftSource.class);
public static final String DRIVER_CLASS = "com.amazon.redshift.jdbc.Driver";
private List<String> schemas;

// todo (cgardens) - clean up passing the dialect as null versus explicitly adding the case to the
// constructor.
Expand All @@ -39,7 +43,20 @@ public JsonNode toDatabaseConfig(final JsonNode redshiftConfig) {
redshiftConfig.get("host").asText(),
redshiftConfig.get("port").asText(),
redshiftConfig.get("database").asText()));

if (redshiftConfig.has("schemas") && redshiftConfig.get("schemas").isArray()) {
schemas = new ArrayList<>();
for (final JsonNode schema : redshiftConfig.get("schemas")) {
schemas.add(schema.asText());
}
}

if (schemas != null && !schemas.isEmpty()) {
additionalProperties.add("currentSchema=" + String.join(",", schemas));
}

addSsl(additionalProperties);

builder.put("connection_properties", String.join(";", additionalProperties));

return Jsons.jsonNode(builder
Expand All @@ -51,6 +68,25 @@ private void addSsl(final List<String> additionalProperties) {
additionalProperties.add("sslfactory=com.amazon.redshift.ssl.NonValidatingFactory");
}

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(JdbcDatabase database) throws Exception {
if (schemas != null && !schemas.isEmpty()) {
// process explicitly selected (from UI) schemas
final List<TableInfo<CommonField<JDBCType>>> internals = new ArrayList<>();
for (String schema : schemas) {
LOGGER.debug("Discovering schema: {}", schema);
internals.addAll(super.discoverInternal(database, schema));
}
for (TableInfo<CommonField<JDBCType>> info : internals) {
LOGGER.debug("Found table (schema: {}): {}", info.getNameSpace(), info.getName());
}
return internals;
} else {
LOGGER.info("No schemas explicitly set on UI to process, so will process all of existing schemas in DB");
return super.discoverInternal(database);
}
}

@Override
public Set<String> getExcludedInternalNameSpaces() {
return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,46 @@
"properties": {
"host": {
"description": "Host Endpoint of the Redshift Cluster (must include the cluster-id, region and end with .redshift.amazonaws.com)",
"type": "string"
"type": "string",
"order": 1
},
"port": {
"description": "Port of the database.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 5439,
"examples": ["5439"]
"examples": ["5439"],
"order": 2
},
"database": {
"description": "Name of the database.",
"type": "string",
"examples": ["master"]
"examples": ["master"],
"order": 3
},
"schemas": {
"title": "Schemas",
"description": "The list of schemas to sync from. Defaults to user. Case sensitive.",
"type": "array",
"items": {
"type": "string"
},
"minItems": 0,
"uniqueItems": true,
"examples": "public",
"order": 4
},
"username": {
"description": "Username to use to access the database.",
"type": "string"
"type": "string",
"order": 5
},
"password": {
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true
"airbyte_secret": true,
"order": 6
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
Expand Down Expand Up @@ -40,18 +41,28 @@ protected static JsonNode getStaticConfig() {

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
config = getStaticConfig();
final JsonNode plainConfig = getStaticConfig();

database = Databases.createJdbcDatabase(
config.get("username").asText(),
config.get("password").asText(),
plainConfig.get("username").asText(),
plainConfig.get("password").asText(),
String.format("jdbc:redshift://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()),
plainConfig.get("host").asText(),
plainConfig.get("port").asText(),
plainConfig.get("database").asText()),
RedshiftSource.DRIVER_CLASS);

schemaName = Strings.addRandomSuffix("integration_test", "_", 5).toLowerCase();

config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", plainConfig.get("host"))
.put("port", plainConfig.get("port"))
.put("database", plainConfig.get("database"))
.put("schemas", List.of(schemaName))
.put("username", plainConfig.get("username"))
.put("password", plainConfig.get("password"))
.build());

final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also create a second schema (like unselected_schema) with some tables, and verify in the acceptance test that the catalog will not include anything in the second schema?

Otherwise, I think the schema selection is not tested any where currently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved data creation to a separate method and added one more schema creation to have some noise. Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that the SourceAcceptanceTest does not actually verify the catalog. I created an issue to track it:
#9818

database.execute(connection -> {
connection.createStatement().execute(createSchemaQuery);
Expand Down