Skip to content

🎉Source-redshift: added an optional field for schema\s selection #9721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 26, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.CommonField;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -22,6 +25,8 @@ public class RedshiftSource extends AbstractJdbcSource<JDBCType> implements Sour

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftSource.class);
public static final String DRIVER_CLASS = "com.amazon.redshift.jdbc.Driver";
private static final String SCHEMAS = "schemas";
private List<String> schemas;

// todo (cgardens) - clean up passing the dialect as null versus explicitly adding the case to the
// constructor.
Expand All @@ -39,7 +44,20 @@ public JsonNode toDatabaseConfig(final JsonNode redshiftConfig) {
redshiftConfig.get("host").asText(),
redshiftConfig.get("port").asText(),
redshiftConfig.get("database").asText()));

if (redshiftConfig.has(SCHEMAS) && redshiftConfig.get(SCHEMAS).isArray()) {
schemas = new ArrayList<>();
for (final JsonNode schema : redshiftConfig.get(SCHEMAS)) {
schemas.add(schema.asText());
}

if (schemas != null && !schemas.isEmpty()) {
additionalProperties.add("currentSchema=" + String.join(",", schemas));
}
}

addSsl(additionalProperties);

builder.put("connection_properties", String.join(";", additionalProperties));

return Jsons.jsonNode(builder
Expand All @@ -51,6 +69,25 @@ private void addSsl(final List<String> additionalProperties) {
additionalProperties.add("sslfactory=com.amazon.redshift.ssl.NonValidatingFactory");
}

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(JdbcDatabase database) throws Exception {
if (schemas != null && !schemas.isEmpty()) {
// process explicitly selected (from UI) schemas
final List<TableInfo<CommonField<JDBCType>>> internals = new ArrayList<>();
for (String schema : schemas) {
LOGGER.debug("Discovering schema: {}", schema);
internals.addAll(super.discoverInternal(database, schema));
}
for (TableInfo<CommonField<JDBCType>> info : internals) {
LOGGER.debug("Found table (schema: {}): {}", info.getNameSpace(), info.getName());
}
return internals;
} else {
LOGGER.info("No schemas explicitly set on UI to process, so will process all of existing schemas in DB");
return super.discoverInternal(database);
}
}

@Override
public Set<String> getExcludedInternalNameSpaces() {
return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"host": {
"title": "Host",
"description": "Host Endpoint of the Redshift Cluster (must include the cluster-id, region and end with .redshift.amazonaws.com).",
"type": "string"
"type": "string",
"order": 1
},
"port": {
"title": "Port",
Expand All @@ -19,24 +20,40 @@
"minimum": 0,
"maximum": 65536,
"default": 5439,
"examples": ["5439"]
"examples": ["5439"],
"order": 2
},
"database": {
"title": "Database",
"description": "Name of the database.",
"type": "string",
"examples": ["master"]
"examples": ["master"],
"order": 3
},
"schemas": {
"title": "Schemas",
"description": "The list of schemas to sync from. Specify one or more explicitly or keep empty to process all schemas. Schema names are case sensitive.",
"type": "array",
"items": {
"type": "string"
},
"minItems": 0,
"uniqueItems": true,
"examples": ["public"],
"order": 4
},
"username": {
"title": "Username",
"description": "Username to use to access the database.",
"type": "string"
"type": "string",
"order": 5
},
"password": {
"title": "Password",
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true
"airbyte_secret": true,
"order": 6
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
Expand All @@ -29,13 +30,13 @@
public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest {

// This test case expects an active redshift cluster that is useable from outside of vpc
protected JsonNode config;
protected ObjectNode config;
protected JdbcDatabase database;
protected String schemaName;
protected String streamName;

protected static JsonNode getStaticConfig() {
return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json")));
protected static ObjectNode getStaticConfig() {
return (ObjectNode) Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json")));
}

@Override
Expand All @@ -52,6 +53,19 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
RedshiftSource.DRIVER_CLASS);

schemaName = Strings.addRandomSuffix("integration_test", "_", 5).toLowerCase();

config = config.set("schemas", Jsons.jsonNode(List.of(schemaName)));

// create a test data
createTestData(database, schemaName);

// create a schema with data that will not be used for testing, but would be used to check schema
// filtering. This one should not be visible in results
createTestData(database, schemaName + "shouldIgnore");
}

private void createTestData(final JdbcDatabase database, final String schemaName)
throws SQLException {
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also create a second schema (like unselected_schema) with some tables, and verify in the acceptance test that the catalog will not include anything in the second schema?

Otherwise, I think the schema selection is not tested any where currently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved data creation to a separate method and added one more schema creation to have some noise. Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that the SourceAcceptanceTest does not actually verify the catalog. I created an issue to track it:
#9818

database.execute(connection -> {
connection.createStatement().execute(createSchemaQuery);
Expand All @@ -60,12 +74,15 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
streamName = "customer";
final String fqTableName = JdbcUtils.getFullyQualifiedTableName(schemaName, streamName);
final String createTestTable =
String.format("CREATE TABLE IF NOT EXISTS %s (c_custkey INTEGER, c_name VARCHAR(16), c_nation VARCHAR(16));\n", fqTableName);
String.format(
"CREATE TABLE IF NOT EXISTS %s (c_custkey INTEGER, c_name VARCHAR(16), c_nation VARCHAR(16));\n",
fqTableName);
database.execute(connection -> {
connection.createStatement().execute(createTestTable);
});

final String insertTestData = String.format("insert into %s values (1, 'Chris', 'France');\n", fqTableName);
final String insertTestData = String.format("insert into %s values (1, 'Chris', 'France');\n",
fqTableName);
database.execute(connection -> {
connection.createStatement().execute(insertTestData);
});
Expand Down