Skip to content

Verify source redshift schema selection in tests #9862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand Down Expand Up @@ -139,9 +140,16 @@ public void testCheckConnection() throws Exception {
*/
@Test
public void testDiscover() throws Exception {
// the worker validates that it is a valid catalog, so we do not need to validate again (as long as
// we use the worker, which we will not want to do long term).
assertNotNull(runDiscover(), "Expected discover to produce a catalog");
final AirbyteCatalog discoverOutput = runDiscover();
assertNotNull(discoverOutput, "Expected discover to produce a catalog");
verifyCatalog(discoverOutput);
}

/**
* Override this method to check the actual catalog.
*/
protected void verifyCatalog(final AirbyteCatalog catalog) throws Exception {
// do nothing by default
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
Expand Down Expand Up @@ -69,8 +70,10 @@
// 4. Then implement the abstract methods documented below.
public abstract class JdbcSourceAcceptanceTest {

public static String SCHEMA_NAME = "jdbc_integration_test1";
public static String SCHEMA_NAME2 = "jdbc_integration_test2";
// schema name must be randomized for each test run,
// otherwise parallel runs can interfere with each other
public static String SCHEMA_NAME = Strings.addRandomSuffix("jdbc_integration_test1", "_", 5).toLowerCase();
public static String SCHEMA_NAME2 = Strings.addRandomSuffix("jdbc_integration_test2", "_", 5).toLowerCase();
public static Set<String> TEST_SCHEMAS = ImmutableSet.of(SCHEMA_NAME, SCHEMA_NAME2);

public static String TABLE_NAME = "id_and_name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.io.IOs;
Expand All @@ -16,23 +18,30 @@
import io.airbyte.integrations.source.redshift.RedshiftSource;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest {

protected static final List<Field> FIELDS = List.of(
Field.of("c_custkey", JsonSchemaPrimitive.NUMBER),
Field.of("c_name", JsonSchemaPrimitive.STRING),
Field.of("c_nation", JsonSchemaPrimitive.STRING));

// This test case expects an active redshift cluster that is useable from outside of vpc
protected ObjectNode config;
protected JdbcDatabase database;
protected String schemaName;
protected String schemaToIgnore;
protected String streamName;

protected static ObjectNode getStaticConfig() {
Expand All @@ -43,28 +52,34 @@ protected static ObjectNode getStaticConfig() {
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
config = getStaticConfig();

database = Databases.createJdbcDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:redshift://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()),
RedshiftSource.DRIVER_CLASS);
database = createDatabase(config);

schemaName = Strings.addRandomSuffix("integration_test", "_", 5).toLowerCase();
schemaToIgnore = schemaName + "shouldIgnore";

// limit the connection to one schema only
config = config.set("schemas", Jsons.jsonNode(List.of(schemaName)));

// create a test data
createTestData(database, schemaName);

// create a schema with data that will not be used for testing, but would be used to check schema
// filtering. This one should not be visible in results
createTestData(database, schemaName + "shouldIgnore");
createTestData(database, schemaToIgnore);
}

private void createTestData(final JdbcDatabase database, final String schemaName)
protected static JdbcDatabase createDatabase(final JsonNode config) {
return Databases.createJdbcDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:redshift://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()),
RedshiftSource.DRIVER_CLASS);
}

protected void createTestData(final JdbcDatabase database, final String schemaName)
throws SQLException {
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
database.execute(connection -> {
Expand All @@ -90,8 +105,10 @@ private void createTestData(final JdbcDatabase database, final String schemaName

@Override
protected void tearDown(final TestDestinationEnv testEnv) throws SQLException {
final String dropSchemaQuery = String.format("DROP SCHEMA IF EXISTS %s CASCADE", schemaName);
database.execute(connection -> connection.createStatement().execute(dropSchemaQuery));
database.execute(connection -> connection.createStatement()
.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schemaName)));
database.execute(connection -> connection.createStatement()
.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schemaToIgnore)));
}

@Override
Expand All @@ -111,17 +128,24 @@ protected JsonNode getConfig() {

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return CatalogHelpers.createConfiguredAirbyteCatalog(
streamName,
schemaName,
Field.of("c_custkey", JsonSchemaPrimitive.NUMBER),
Field.of("c_name", JsonSchemaPrimitive.STRING),
Field.of("c_nation", JsonSchemaPrimitive.STRING));
return CatalogHelpers.createConfiguredAirbyteCatalog(streamName, schemaName, FIELDS);
}

@Override
protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected void verifyCatalog(final AirbyteCatalog catalog) {
final List<AirbyteStream> streams = catalog.getStreams();
// only one stream is expected; the schema that should be ignored
// must not be included in the retrieved catalog
assertEquals(1, streams.size());
final AirbyteStream actualStream = streams.get(0);
assertEquals(schemaName, actualStream.getNamespace());
assertEquals(streamName, actualStream.getName());
assertEquals(CatalogHelpers.fieldsToJsonSchema(FIELDS), actualStream.getJsonSchema());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import io.airbyte.commons.string.Strings;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.source.redshift.RedshiftSource;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;

public class RedshiftSslSourceAcceptanceTest extends RedshiftSourceAcceptanceTest {

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
config = getStaticConfig();

database = Databases.createJdbcDatabase(
protected static JdbcDatabase createDatabase(final JsonNode config) {
return Databases.createJdbcDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:redshift://%s:%s/%s",
Expand All @@ -26,25 +22,6 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
RedshiftSource.DRIVER_CLASS,
"ssl=true;" +
"sslfactory=com.amazon.redshift.ssl.NonValidatingFactory");

schemaName = Strings.addRandomSuffix("integration_test", "_", 5).toLowerCase();
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
database.execute(connection -> {
connection.createStatement().execute(createSchemaQuery);
});

streamName = "customer";
final String fqTableName = JdbcUtils.getFullyQualifiedTableName(schemaName, streamName);
final String createTestTable =
String.format("CREATE TABLE IF NOT EXISTS %s (c_custkey INTEGER, c_name VARCHAR(16), c_nation VARCHAR(16));\n", fqTableName);
database.execute(connection -> {
connection.createStatement().execute(createTestTable);
});

final String insertTestData = String.format("insert into %s values (1, 'Chris', 'France');\n", fqTableName);
database.execute(connection -> {
connection.createStatement().execute(insertTestData);
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public static ConfiguredAirbyteCatalog createConfiguredAirbyteCatalog(final Stri
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(createConfiguredAirbyteStream(streamName, namespace, fields)));
}

public static ConfiguredAirbyteCatalog createConfiguredAirbyteCatalog(final String streamName, final String namespace, final List<Field> fields) {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(createConfiguredAirbyteStream(streamName, namespace, fields)));
}

public static ConfiguredAirbyteStream createConfiguredAirbyteStream(final String streamName, final String namespace, final Field... fields) {
return createConfiguredAirbyteStream(streamName, namespace, Arrays.asList(fields));
}
Expand Down