diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseConfigDatabaseTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseConfigDatabaseTest.java
new file mode 100644
index 0000000000000..3f8a221fc7db6
--- /dev/null
+++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseConfigDatabaseTest.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.config.persistence;
+
+import io.airbyte.db.Database;
+import io.airbyte.db.factory.DSLContextFactory;
+import io.airbyte.db.factory.DataSourceFactory;
+import io.airbyte.db.factory.FlywayFactory;
+import io.airbyte.db.init.DatabaseInitializationException;
+import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
+import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
+import io.airbyte.test.utils.DatabaseConnectionHelper;
+import java.io.IOException;
+import java.sql.SQLException;
+import javax.sql.DataSource;
+import org.flywaydb.core.Flyway;
+import org.jooq.DSLContext;
+import org.jooq.SQLDialect;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+/**
+ * This class exists to abstract away the lifecycle of the test container database and the config
+ * database schema. This is ALL it intends to do. Any additional functionality belongs somewhere
+ * else. It is useful for test suites that need to interact directly with the database.
+ *
+ * This class sets up a test container database and runs the config database migrations against it
+ * to provide the most up-to-date schema.
+ *
+ * What this class is NOT designed to do:
+ *
+ * - test migration behavior, only should be used to test query behavior against the current
+ * schema.
+ * - expose database details -- if you are attempting to expose container, dataSource, dslContext,
+ * something is wrong.
+ * - add test fixtures or helpers--do NOT put "generic" resource helper methods (e.g.
+ * createTestSource())
+ *
+ *
+ * This comment is emphatically worded, because it is tempting to add things to this class. It has
+ * already happened in 3 previous iterations, and each time it takes multiple engineering days to
+ * fix it.
+ *
+ * Usage:
+ *
+ * - Extend: Extend this class. By doing so, it will automatically create the test container db
+ * and run migrations against it at the start of the test suite (@BeforeAll).
+ * - Use database: As part of the @BeforeAll the database field is set. This is the only field
+ * that the extending class can access. It's lifecycle is fully managed by this class.
+ * - Reset schema: To reset the database in between tests, call truncateAllTables() as part
+ * of @BeforeEach. This is the only method that this class exposes externally. It is exposed in such
+ * a way, because most test suites need to declare their own @BeforeEach, so it is easier for them
+ * to simply call this method there, then trying to apply a more complex inheritance scheme.
+ *
+ *
+ * Note: truncateAllTables() works by truncating each table in the db, if you add a new table, you
+ * will need to add it to that method for it work as expected.
+ */
+@SuppressWarnings({"PMD.MutableStaticState", "PMD.SignatureDeclareThrowsException"})
+class BaseConfigDatabaseTest {
+
+ static Database database;
+
+ // keep these private, do not expose outside this class!
+ private static PostgreSQLContainer> container;
+ private static DataSource dataSource;
+ private static DSLContext dslContext;
+
+ /**
+ * Create db test container, sets up java database resources, and runs migrations. Should not be
+ * called externally. It is not private because junit cannot access private methods.
+ *
+ * @throws DatabaseInitializationException - db fails to initialize
+ * @throws IOException - failure when interacting with db.
+ */
+ @BeforeAll
+ static void dbSetup() throws DatabaseInitializationException, IOException {
+ createDbContainer();
+ setDb();
+ migrateDb();
+ }
+
+ /**
+ * Close all resources (container, data source, dsl context, database). Should not be called
+ * externally. It is not private because junit cannot access private methods.
+ *
+ * @throws Exception - exception while closing resources
+ */
+ @AfterAll
+ static void dbDown() throws Exception {
+ dslContext.close();
+ DataSourceFactory.close(dataSource);
+ container.close();
+ }
+
+ /**
+ * Truncates tables to reset them. Designed to be used in between tests.
+ *
+ * Note: NEW TABLES -- When a new table is added to the db, it will need to be added here.
+ *
+ * @throws SQLException - failure in truncate query.
+ */
+ static void truncateAllTables() throws SQLException {
+ database.query(ctx -> ctx
+ .execute(
+ """
+ TRUNCATE TABLE
+ actor,
+ actor_catalog,
+ actor_catalog_fetch_event,
+ actor_definition,
+ actor_definition_workspace_grant,
+ actor_oauth_parameter,
+ connection,
+ connection_operation,
+ operation,
+ state,
+ stream_reset,
+ workspace,
+ workspace_service_account
+ """));
+ }
+
+ private static void createDbContainer() {
+ container = new PostgreSQLContainer<>("postgres:13-alpine")
+ .withDatabaseName("airbyte")
+ .withUsername("docker")
+ .withPassword("docker");
+ container.start();
+ }
+
+ private static void setDb() {
+ dataSource = DatabaseConnectionHelper.createDataSource(container);
+ dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES);
+ database = new Database(dslContext);
+ }
+
+ private static void migrateDb() throws IOException, DatabaseInitializationException {
+ final Flyway flyway = FlywayFactory.create(
+ dataSource,
+ StreamResetPersistenceTest.class.getName(),
+ ConfigsDatabaseMigrator.DB_IDENTIFIER,
+ ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
+ new ConfigsDatabaseTestProvider(dslContext, flyway).create(true);
+ }
+
+}
diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java
deleted file mode 100644
index da381448620ee..0000000000000
--- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
- */
-
-package io.airbyte.config.persistence;
-
-import static org.jooq.impl.DSL.asterisk;
-import static org.jooq.impl.DSL.count;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.mock;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import io.airbyte.config.ConfigSchema;
-import io.airbyte.config.DestinationConnection;
-import io.airbyte.config.Geography;
-import io.airbyte.config.SourceConnection;
-import io.airbyte.config.StandardDestinationDefinition;
-import io.airbyte.config.StandardSourceDefinition;
-import io.airbyte.config.StandardSourceDefinition.ReleaseStage;
-import io.airbyte.config.StandardSourceDefinition.SourceType;
-import io.airbyte.config.StandardWorkspace;
-import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
-import io.airbyte.db.Database;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import javax.sql.DataSource;
-import org.flywaydb.core.Flyway;
-import org.jooq.DSLContext;
-import org.jooq.Record1;
-import org.jooq.Result;
-import org.jooq.Table;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.testcontainers.containers.PostgreSQLContainer;
-
-/**
- * This class provides downstream tests with constants and helpers.
- */
-@SuppressWarnings({"PMD.MutableStaticState", "PMD.SignatureDeclareThrowsException"})
-class BaseDatabaseConfigPersistenceTest {
-
- static PostgreSQLContainer> container;
- static Database database;
- static DatabaseConfigPersistence configPersistence;
- static StandardSyncPersistence standardSyncPersistence;
- static JsonSecretsProcessor jsonSecretsProcessor;
- static DataSource dataSource;
- static DSLContext dslContext;
- static Flyway flyway;
-
- protected static final StandardSourceDefinition SOURCE_GITHUB = new StandardSourceDefinition()
- .withName("GitHub")
- .withSourceDefinitionId(UUID.fromString("ef69ef6e-aa7f-4af1-a01d-ef775033524e"))
- .withDockerRepository("airbyte/source-github")
- .withDockerImageTag("0.2.3")
- .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/github")
- .withIcon("github.svg")
- .withSourceType(SourceType.API)
- .withProtocolVersion("0.2.0")
- .withTombstone(false);
- protected static final StandardSourceDefinition SOURCE_POSTGRES = new StandardSourceDefinition()
- .withName("Postgres")
- .withSourceDefinitionId(UUID.fromString("decd338e-5647-4c0b-adf4-da0e75f5a750"))
- .withDockerRepository("airbyte/source-postgres")
- .withDockerImageTag("0.3.11")
- .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres")
- .withIcon("postgresql.svg")
- .withSourceType(SourceType.DATABASE)
- .withTombstone(false);
- protected static final StandardSourceDefinition SOURCE_CUSTOM = new StandardSourceDefinition()
- .withName("Custom")
- .withSourceDefinitionId(UUID.fromString("baba338e-5647-4c0b-adf4-da0e75f5a750"))
- .withDockerRepository("airbyte/cusom")
- .withDockerImageTag("0.3.11")
- .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres")
- .withIcon("postgresql.svg")
- .withSourceType(SourceType.DATABASE)
- .withCustom(true)
- .withReleaseStage(ReleaseStage.CUSTOM)
- .withTombstone(false);
- protected static final StandardDestinationDefinition DESTINATION_SNOWFLAKE = new StandardDestinationDefinition()
- .withName("Snowflake")
- .withDestinationDefinitionId(UUID.fromString("424892c4-daac-4491-b35d-c6688ba547ba"))
- .withDockerRepository("airbyte/destination-snowflake")
- .withDockerImageTag("0.3.16")
- .withDocumentationUrl("https://docs.airbyte.io/integrations/destinations/snowflake")
- .withProtocolVersion("0.2.0")
- .withTombstone(false);
- protected static final StandardDestinationDefinition DESTINATION_S3 = new StandardDestinationDefinition()
- .withName("S3")
- .withDestinationDefinitionId(UUID.fromString("4816b78f-1489-44c1-9060-4b19d5fa9362"))
- .withDockerRepository("airbyte/destination-s3")
- .withDockerImageTag("0.1.12")
- .withDocumentationUrl("https://docs.airbyte.io/integrations/destinations/s3")
- .withProtocolVersion("0.2.0")
- .withTombstone(false);
- protected static final StandardDestinationDefinition DESTINATION_CUSTOM = new StandardDestinationDefinition()
- .withName("Custom")
- .withDestinationDefinitionId(UUID.fromString("baba338e-5647-4c0b-adf4-da0e75f5a750"))
- .withDockerRepository("airbyte/cusom")
- .withDockerImageTag("0.3.11")
- .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres")
- .withIcon("postgresql.svg")
- .withCustom(true)
- .withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM)
- .withTombstone(false);
- private static final String CANNOT_BE_NULL = "can not be null";
-
- @BeforeAll
- static void dbSetup() {
- container = new PostgreSQLContainer<>("postgres:13-alpine")
- .withDatabaseName("airbyte")
- .withUsername("docker")
- .withPassword("docker");
- container.start();
- jsonSecretsProcessor = mock(JsonSecretsProcessor.class);
- }
-
- @AfterAll
- static void dbDown() {
- container.close();
- }
-
- static void truncateAllTables() throws SQLException {
- database.query(ctx -> ctx
- .execute(
- "TRUNCATE TABLE workspace_service_account, state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, "
- + "actor, actor_definition, actor_definition_workspace_grant, workspace, stream_reset"));
- }
-
- void writeSource(final ConfigPersistence configPersistence, final StandardSourceDefinition source) throws Exception {
- configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), source);
- }
-
- void writeSourceWithSourceConnection(final ConfigPersistence configPersistence, final StandardSourceDefinition source)
- throws Exception {
- configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), source);
- final UUID connectionId = UUID.randomUUID();
- final UUID workspaceId = UUID.randomUUID();
- final StandardWorkspace workspace = new StandardWorkspace()
- .withWorkspaceId(workspaceId)
- .withName(CANNOT_BE_NULL)
- .withSlug(CANNOT_BE_NULL)
- .withInitialSetupComplete(true)
- .withDefaultGeography(Geography.AUTO);
- configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), workspace);
-
- final SourceConnection sourceConnection = new SourceConnection()
- .withSourceId(connectionId)
- .withWorkspaceId(workspaceId)
- .withName(CANNOT_BE_NULL)
- .withSourceDefinitionId(source.getSourceDefinitionId());
- configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, connectionId.toString(), sourceConnection);
- }
-
- void writeDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination)
- throws Exception {
- configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination);
- }
-
- void writeDestinationWithDestinationConnection(final ConfigPersistence configPersistence,
- final StandardDestinationDefinition destination)
- throws Exception {
- configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination);
- final UUID connectionId = UUID.randomUUID();
- final UUID workspaceId = UUID.randomUUID();
- final StandardWorkspace workspace = new StandardWorkspace()
- .withWorkspaceId(workspaceId)
- .withName(CANNOT_BE_NULL)
- .withSlug(CANNOT_BE_NULL)
- .withInitialSetupComplete(true)
- .withDefaultGeography(Geography.AUTO);
- configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), workspace);
-
- final DestinationConnection destinationConnection = new DestinationConnection()
- .withDestinationId(connectionId)
- .withWorkspaceId(workspaceId)
- .withName(CANNOT_BE_NULL)
- .withDestinationDefinitionId(destination.getDestinationDefinitionId());
- configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, connectionId.toString(), destinationConnection);
- }
-
- void writeDestinations(final ConfigPersistence configPersistence, final List destinations)
- throws Exception {
- final Map destinationsByID = destinations.stream()
- .collect(Collectors.toMap(destinationDefinition -> destinationDefinition.getDestinationDefinitionId().toString(), Function.identity()));
- configPersistence.writeConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destinationsByID);
- }
-
- void deleteDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination)
- throws Exception {
- configPersistence.deleteConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString());
- }
-
- protected Map> getMapWithSet(final Map> input) {
- return input.entrySet().stream().collect(Collectors.toMap(
- Entry::getKey,
- e -> e.getValue().collect(Collectors.toSet())));
- }
-
- // assertEquals cannot correctly check the equality of two maps with stream values,
- // so streams are converted to sets before being compared.
- protected void assertSameConfigDump(final Map> expected, final Map> actual) {
- assertEquals(getMapWithSet(expected), getMapWithSet(actual));
- }
-
- protected void assertRecordCount(final int expectedCount, final Table table) throws Exception {
- final Result> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table).fetch());
- assertEquals(expectedCount, recordCount.get(0).value1());
- }
-
- protected void assertHasSource(final StandardSourceDefinition source) throws Exception {
- assertEquals(source, configPersistence
- .getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(),
- StandardSourceDefinition.class));
- }
-
- protected void assertHasDestination(final StandardDestinationDefinition destination) throws Exception {
- assertEquals(destination, configPersistence
- .getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(),
- StandardDestinationDefinition.class));
- }
-
-}
diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java
index 88fe070a20785..e941f3d28ea59 100644
--- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java
+++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java
@@ -31,20 +31,12 @@
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigRepository.DestinationAndDefinition;
import io.airbyte.config.persistence.ConfigRepository.SourceAndDefinition;
-import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
-import io.airbyte.db.factory.DSLContextFactory;
-import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.init.DatabaseInitializationException;
-import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
-import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
-import io.airbyte.db.instance.development.DevDatabaseMigrator;
-import io.airbyte.db.instance.development.MigrationDevHelper;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
-import io.airbyte.test.utils.DatabaseConnectionHelper;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.sql.SQLException;
@@ -57,53 +49,24 @@
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
-import javax.sql.DataSource;
-import org.flywaydb.core.Flyway;
import org.jooq.DSLContext;
-import org.jooq.SQLDialect;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.PostgreSQLContainer;
@SuppressWarnings({"PMD.CyclomaticComplexity", "PMD.NPathComplexity"})
-class ConfigRepositoryE2EReadWriteTest {
+class ConfigRepositoryE2EReadWriteTest extends BaseConfigDatabaseTest {
- private static PostgreSQLContainer> container;
- private DataSource dataSource;
- private DSLContext dslContext;
- private Database database;
private ConfigRepository configRepository;
private DatabaseConfigPersistence configPersistence;
- private Flyway flyway;
private final static String DOCKER_IMAGE_TAG = "1.2.0";
private final static String CONFIG_HASH = "ConfigHash";
- @BeforeAll
- public static void dbSetup() {
- container = new PostgreSQLContainer<>("postgres:13-alpine")
- .withDatabaseName("airbyte")
- .withUsername("docker")
- .withPassword("docker");
- container.start();
- }
-
@BeforeEach
void setup() throws IOException, JsonValidationException, SQLException, DatabaseInitializationException, InterruptedException {
- dataSource = DatabaseConnectionHelper.createDataSource(container);
- dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES);
- flyway = FlywayFactory.create(dataSource, ConfigRepositoryE2EReadWriteTest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER,
- ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
- database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false);
configPersistence = spy(new DatabaseConfigPersistence(database));
configRepository = spy(new ConfigRepository(configPersistence, database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)),
new StandardSyncPersistence(database)));
- final ConfigsDatabaseMigrator configsDatabaseMigrator =
- new ConfigsDatabaseMigrator(database, flyway);
- final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
- MigrationDevHelper.runLastMigration(devDatabaseMigrator);
for (final StandardWorkspace workspace : MockData.standardWorkspaces()) {
configRepository.writeStandardWorkspaceNoSecrets(workspace);
}
@@ -136,11 +99,6 @@ void setup() throws IOException, JsonValidationException, SQLException, Database
database.transaction(ctx -> ctx.truncate(ACTOR_DEFINITION_WORKSPACE_GRANT).execute());
}
- @AfterAll
- public static void dbDown() {
- container.close();
- }
-
@Test
void testWorkspaceCountConnections() throws IOException {
diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java
index 799ce1e4b4d3f..4ba3a04cbbc23 100644
--- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java
+++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java
@@ -25,47 +25,26 @@
import io.airbyte.config.StandardSyncState;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.WorkspaceServiceAccount;
-import io.airbyte.db.factory.DSLContextFactory;
-import io.airbyte.db.factory.DataSourceFactory;
-import io.airbyte.db.factory.FlywayFactory;
-import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
-import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
-import io.airbyte.db.instance.development.DevDatabaseMigrator;
-import io.airbyte.db.instance.development.MigrationDevHelper;
-import io.airbyte.test.utils.DatabaseConnectionHelper;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.NotImplementedException;
-import org.jooq.SQLDialect;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@SuppressWarnings({"PMD.SignatureDeclareThrowsException", "PMD.JUnitTestsShouldIncludeAssert", "PMD.DetachedTestCase"})
-class DatabaseConfigPersistenceE2EReadWriteTest extends BaseDatabaseConfigPersistenceTest {
+class DatabaseConfigPersistenceE2EReadWriteTest extends BaseConfigDatabaseTest {
+
+ private ConfigPersistence configPersistence;
+ private StandardSyncPersistence standardSyncPersistence;
@BeforeEach
void setup() throws Exception {
- dataSource = DatabaseConnectionHelper.createDataSource(container);
- dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES);
- flyway = FlywayFactory.create(dataSource, DatabaseConfigPersistenceE2EReadWriteTest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER,
- ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
- database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false);
configPersistence = spy(new DatabaseConfigPersistence(database));
standardSyncPersistence = new StandardSyncPersistence(database);
- final ConfigsDatabaseMigrator configsDatabaseMigrator =
- new ConfigsDatabaseMigrator(database, flyway);
- final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
- MigrationDevHelper.runLastMigration(devDatabaseMigrator);
- truncateAllTables();
- }
- @AfterEach
- void tearDown() throws Exception {
- dslContext.close();
- DataSourceFactory.close(dataSource);
+ truncateAllTables();
}
@Test
diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java
index 177b79441871a..481ca301d1422 100644
--- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java
+++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java
@@ -8,6 +8,8 @@
import static io.airbyte.config.ConfigSchema.STANDARD_SOURCE_DEFINITION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.jooq.impl.DSL.asterisk;
+import static org.jooq.impl.DSL.count;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -18,19 +20,17 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
+import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigWithMetadata;
+import io.airbyte.config.DestinationConnection;
+import io.airbyte.config.Geography;
+import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.ReleaseStage;
+import io.airbyte.config.StandardSourceDefinition.SourceType;
+import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.DatabaseConfigPersistence.ConnectorInfo;
-import io.airbyte.db.factory.DSLContextFactory;
-import io.airbyte.db.factory.DataSourceFactory;
-import io.airbyte.db.factory.FlywayFactory;
-import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
-import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
-import io.airbyte.db.instance.development.DevDatabaseMigrator;
-import io.airbyte.db.instance.development.MigrationDevHelper;
-import io.airbyte.test.utils.DatabaseConnectionHelper;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
@@ -38,10 +38,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
-import org.jooq.SQLDialect;
-import org.junit.jupiter.api.AfterEach;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.jooq.Record1;
+import org.jooq.Result;
+import org.jooq.Table;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -50,29 +55,75 @@
* methods.
*/
@SuppressWarnings({"PMD.SignatureDeclareThrowsException", "PMD.ShortVariable", "PMD.JUnitTestsShouldIncludeAssert"})
-class DatabaseConfigPersistenceTest extends BaseDatabaseConfigPersistenceTest {
+class DatabaseConfigPersistenceTest extends BaseConfigDatabaseTest {
+
+ public static final String DEFAULT_PROTOCOL_VERSION = "0.2.0";
+ protected static final StandardSourceDefinition SOURCE_GITHUB = new StandardSourceDefinition()
+ .withName("GitHub")
+ .withSourceDefinitionId(UUID.fromString("ef69ef6e-aa7f-4af1-a01d-ef775033524e"))
+ .withDockerRepository("airbyte/source-github")
+ .withDockerImageTag("0.2.3")
+ .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/github")
+ .withIcon("github.svg")
+ .withSourceType(SourceType.API)
+ .withProtocolVersion(DEFAULT_PROTOCOL_VERSION)
+ .withTombstone(false);
+ protected static final StandardSourceDefinition SOURCE_POSTGRES = new StandardSourceDefinition()
+ .withName("Postgres")
+ .withSourceDefinitionId(UUID.fromString("decd338e-5647-4c0b-adf4-da0e75f5a750"))
+ .withDockerRepository("airbyte/source-postgres")
+ .withDockerImageTag("0.3.11")
+ .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres")
+ .withIcon("postgresql.svg")
+ .withSourceType(SourceType.DATABASE)
+ .withTombstone(false);
+ protected static final StandardSourceDefinition SOURCE_CUSTOM = new StandardSourceDefinition()
+ .withName("Custom")
+ .withSourceDefinitionId(UUID.fromString("baba338e-5647-4c0b-adf4-da0e75f5a750"))
+ .withDockerRepository("airbyte/cusom")
+ .withDockerImageTag("0.3.11")
+ .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres")
+ .withIcon("postgresql.svg")
+ .withSourceType(SourceType.DATABASE)
+ .withCustom(true)
+ .withReleaseStage(ReleaseStage.CUSTOM)
+ .withTombstone(false);
+ protected static final StandardDestinationDefinition DESTINATION_SNOWFLAKE = new StandardDestinationDefinition()
+ .withName("Snowflake")
+ .withDestinationDefinitionId(UUID.fromString("424892c4-daac-4491-b35d-c6688ba547ba"))
+ .withDockerRepository("airbyte/destination-snowflake")
+ .withDockerImageTag("0.3.16")
+ .withDocumentationUrl("https://docs.airbyte.io/integrations/destinations/snowflake")
+ .withProtocolVersion(DEFAULT_PROTOCOL_VERSION)
+ .withTombstone(false);
+ protected static final StandardDestinationDefinition DESTINATION_S3 = new StandardDestinationDefinition()
+ .withName("S3")
+ .withDestinationDefinitionId(UUID.fromString("4816b78f-1489-44c1-9060-4b19d5fa9362"))
+ .withDockerRepository("airbyte/destination-s3")
+ .withDockerImageTag("0.1.12")
+ .withDocumentationUrl("https://docs.airbyte.io/integrations/destinations/s3")
+ .withProtocolVersion(DEFAULT_PROTOCOL_VERSION)
+ .withTombstone(false);
+ protected static final StandardDestinationDefinition DESTINATION_CUSTOM = new StandardDestinationDefinition()
+ .withName("Custom")
+ .withDestinationDefinitionId(UUID.fromString("baba338e-5647-4c0b-adf4-da0e75f5a750"))
+ .withDockerRepository("airbyte/cusom")
+ .withDockerImageTag("0.3.11")
+ .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres")
+ .withIcon("postgresql.svg")
+ .withCustom(true)
+ .withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM)
+ .withTombstone(false);
+ private static final String CANNOT_BE_NULL = "can not be null";
+
+ private DatabaseConfigPersistence configPersistence;
@BeforeEach
public void setup() throws Exception {
- dataSource = DatabaseConnectionHelper.createDataSource(container);
- dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES);
- flyway = FlywayFactory.create(dataSource, DatabaseConfigPersistenceTest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER,
- ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
- database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false);
configPersistence = spy(new DatabaseConfigPersistence(database));
- final ConfigsDatabaseMigrator configsDatabaseMigrator =
- new ConfigsDatabaseMigrator(database, flyway);
- final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
- MigrationDevHelper.runLastMigration(devDatabaseMigrator);
truncateAllTables();
}
- @AfterEach
- void tearDown() throws Exception {
- dslContext.close();
- DataSourceFactory.close(dataSource);
- }
-
@Test
void testMultiWriteAndGetConfig() throws Exception {
writeDestinations(configPersistence, Lists.newArrayList(DESTINATION_S3, DESTINATION_SNOWFLAKE));
@@ -151,7 +202,7 @@ void testDeleteConfig() throws Exception {
void testGetConnectorRepositoryToInfoMap() throws Exception {
final String connectorRepository = "airbyte/duplicated-connector";
final String oldVersion = "0.1.10";
- final String newVersion = "0.2.0";
+ final String newVersion = DEFAULT_PROTOCOL_VERSION;
final StandardSourceDefinition source1 = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withName("source-1")
@@ -207,13 +258,13 @@ void testInsertConfigRecord() throws Exception {
@Test
void testHasNewVersion() {
- assertTrue(DatabaseConfigPersistence.hasNewVersion("0.1.99", "0.2.0"));
+ assertTrue(DatabaseConfigPersistence.hasNewVersion("0.1.99", DEFAULT_PROTOCOL_VERSION));
assertFalse(DatabaseConfigPersistence.hasNewVersion("invalid_version", "0.1.2"));
}
@Test
void testHasNewPatchVersion() {
- assertFalse(DatabaseConfigPersistence.hasNewPatchVersion("0.1.99", "0.2.0"));
+ assertFalse(DatabaseConfigPersistence.hasNewPatchVersion("0.1.99", DEFAULT_PROTOCOL_VERSION));
assertFalse(DatabaseConfigPersistence.hasNewPatchVersion("invalid_version", "0.3.1"));
assertTrue(DatabaseConfigPersistence.hasNewPatchVersion("0.1.0", "0.1.3"));
}
@@ -281,4 +332,97 @@ void filterCustomDestination() {
assertThat(filteredSourceMap).containsOnlyKeys(nonCustomKey);
}
+ void writeSource(final ConfigPersistence configPersistence, final StandardSourceDefinition source) throws Exception {
+ configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), source);
+ }
+
+ void writeSourceWithSourceConnection(final ConfigPersistence configPersistence, final StandardSourceDefinition source)
+ throws Exception {
+ configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), source);
+ final UUID connectionId = UUID.randomUUID();
+ final UUID workspaceId = UUID.randomUUID();
+ final StandardWorkspace workspace = new StandardWorkspace()
+ .withWorkspaceId(workspaceId)
+ .withName(CANNOT_BE_NULL)
+ .withSlug(CANNOT_BE_NULL)
+ .withInitialSetupComplete(true)
+ .withDefaultGeography(Geography.AUTO);
+ configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), workspace);
+
+ final SourceConnection sourceConnection = new SourceConnection()
+ .withSourceId(connectionId)
+ .withWorkspaceId(workspaceId)
+ .withName(CANNOT_BE_NULL)
+ .withSourceDefinitionId(source.getSourceDefinitionId());
+ configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, connectionId.toString(), sourceConnection);
+ }
+
+ void writeDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination)
+ throws Exception {
+ configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination);
+ }
+
+ void writeDestinationWithDestinationConnection(final ConfigPersistence configPersistence,
+ final StandardDestinationDefinition destination)
+ throws Exception {
+ configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination);
+ final UUID connectionId = UUID.randomUUID();
+ final UUID workspaceId = UUID.randomUUID();
+ final StandardWorkspace workspace = new StandardWorkspace()
+ .withWorkspaceId(workspaceId)
+ .withName(CANNOT_BE_NULL)
+ .withSlug(CANNOT_BE_NULL)
+ .withInitialSetupComplete(true)
+ .withDefaultGeography(Geography.AUTO);
+ configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), workspace);
+
+ final DestinationConnection destinationConnection = new DestinationConnection()
+ .withDestinationId(connectionId)
+ .withWorkspaceId(workspaceId)
+ .withName(CANNOT_BE_NULL)
+ .withDestinationDefinitionId(destination.getDestinationDefinitionId());
+ configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, connectionId.toString(), destinationConnection);
+ }
+
+ void writeDestinations(final ConfigPersistence configPersistence, final List destinations)
+ throws Exception {
+ final Map destinationsByID = destinations.stream()
+ .collect(Collectors.toMap(destinationDefinition -> destinationDefinition.getDestinationDefinitionId().toString(), Function.identity()));
+ configPersistence.writeConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destinationsByID);
+ }
+
+ void deleteDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination)
+ throws Exception {
+ configPersistence.deleteConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString());
+ }
+
+ protected Map> getMapWithSet(final Map> input) {
+ return input.entrySet().stream().collect(Collectors.toMap(
+ Entry::getKey,
+ e -> e.getValue().collect(Collectors.toSet())));
+ }
+
+ // assertEquals cannot correctly check the equality of two maps with stream values,
+ // so streams are converted to sets before being compared.
+ protected void assertSameConfigDump(final Map> expected, final Map> actual) {
+ assertEquals(getMapWithSet(expected), getMapWithSet(actual));
+ }
+
+ protected void assertRecordCount(final int expectedCount, final Table table) throws Exception {
+ final Result> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table).fetch());
+ assertEquals(expectedCount, recordCount.get(0).value1());
+ }
+
+ protected void assertHasSource(final StandardSourceDefinition source) throws Exception {
+ assertEquals(source, configPersistence
+ .getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(),
+ StandardSourceDefinition.class));
+ }
+
+ protected void assertHasDestination(final StandardDestinationDefinition destination) throws Exception {
+ assertEquals(destination, configPersistence
+ .getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(),
+ StandardDestinationDefinition.class));
+ }
+
}
diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java
index 4bea431aa5a00..e077d8e206300 100644
--- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java
+++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java
@@ -5,32 +5,29 @@
package io.airbyte.config.persistence;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION;
+import static org.jooq.impl.DSL.asterisk;
+import static org.jooq.impl.DSL.count;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.StandardSourceDefinition;
+import io.airbyte.config.StandardSourceDefinition.SourceType;
import io.airbyte.config.persistence.DatabaseConfigPersistence.ConnectorInfo;
-import io.airbyte.db.factory.DSLContextFactory;
-import io.airbyte.db.factory.DataSourceFactory;
-import io.airbyte.db.factory.FlywayFactory;
-import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
-import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
-import io.airbyte.db.instance.development.DevDatabaseMigrator;
-import io.airbyte.db.instance.development.MigrationDevHelper;
import io.airbyte.protocol.models.ConnectorSpecification;
-import io.airbyte.test.utils.DatabaseConnectionHelper;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
-import org.jooq.SQLDialect;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.jooq.Record1;
+import org.jooq.Result;
+import org.jooq.Table;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
@@ -39,37 +36,31 @@
* Unit test for the {@link DatabaseConfigPersistence#updateConnectorDefinitions} method.
*/
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
-class DatabaseConfigPersistenceUpdateConnectorDefinitionsTest extends BaseDatabaseConfigPersistenceTest {
+class DatabaseConfigPersistenceUpdateConnectorDefinitionsTest extends BaseConfigDatabaseTest {
+
+ protected static final StandardSourceDefinition SOURCE_GITHUB = new StandardSourceDefinition()
+ .withName("GitHub")
+ .withSourceDefinitionId(UUID.fromString("ef69ef6e-aa7f-4af1-a01d-ef775033524e"))
+ .withDockerRepository("airbyte/source-github")
+ .withDockerImageTag("0.2.3")
+ .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/github")
+ .withIcon("github.svg")
+ .withSourceType(SourceType.API)
+ .withProtocolVersion("0.2.0")
+ .withTombstone(false);
private static final JsonNode SOURCE_GITHUB_JSON = Jsons.jsonNode(SOURCE_GITHUB);
private static final String DOCKER_IMAGE_TAG = "0.0.0";
private static final String DOCKER_IMAGE_TAG_2 = "0.1000.0";
private static final String DEFAULT_PROTOCOL_VERSION = "0.2.0";
- @BeforeAll
- public static void setup() throws Exception {
- dataSource = DatabaseConnectionHelper.createDataSource(container);
- dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES);
- flyway = FlywayFactory.create(dataSource, DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.class.getName(),
- ConfigsDatabaseMigrator.DB_IDENTIFIER,
- ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
- database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false);
- configPersistence = new DatabaseConfigPersistence(database);
- final ConfigsDatabaseMigrator configsDatabaseMigrator =
- new ConfigsDatabaseMigrator(database, flyway);
- final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
- MigrationDevHelper.runLastMigration(devDatabaseMigrator);
- }
-
- @AfterAll
- public static void tearDown() throws Exception {
- dslContext.close();
- DataSourceFactory.close(dataSource);
- }
+ private DatabaseConfigPersistence configPersistence;
@BeforeEach
public void resetDatabase() throws SQLException {
truncateAllTables();
+
+ configPersistence = new DatabaseConfigPersistence(database);
}
@Test
@@ -227,4 +218,19 @@ private void assertUpdateConnectorDefinition(final List> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table).fetch());
+ assertEquals(expectedCount, recordCount.get(0).value1());
+ }
+
+ protected void assertHasSource(final StandardSourceDefinition source) throws Exception {
+ assertEquals(source, configPersistence
+ .getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(),
+ StandardSourceDefinition.class));
+ }
+
}
diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java
index 52b6e4f86e635..299d16657d0b5 100644
--- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java
+++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java
@@ -134,7 +134,7 @@ public class MockData {
private static final Instant NOW = Instant.parse("2021-12-15T20:30:40.00Z");
- private static final String CONNECTION_SPECIFICATION = "'{\"name\":\"John\", \"age\":30, \"car\":null}'";
+ private static final String CONNECTION_SPECIFICATION = "{\"name\":\"John\", \"age\":30, \"car\":null}";
private static final UUID OPERATION_ID_4 = UUID.randomUUID();
private static final UUID WEBHOOK_CONFIG_ID = UUID.randomUUID();
private static final String WEBHOOK_OPERATION_EXECUTION_URL = "test-webhook-url";
@@ -340,21 +340,21 @@ public static List sourceConnections() {
.withTombstone(false)
.withSourceDefinitionId(SOURCE_DEFINITION_ID_1)
.withWorkspaceId(WORKSPACE_ID_1)
- .withConfiguration(Jsons.jsonNode(CONNECTION_SPECIFICATION))
+ .withConfiguration(Jsons.deserialize(CONNECTION_SPECIFICATION))
.withSourceId(SOURCE_ID_1);
final SourceConnection sourceConnection2 = new SourceConnection()
.withName("source-2")
.withTombstone(false)
.withSourceDefinitionId(SOURCE_DEFINITION_ID_2)
.withWorkspaceId(WORKSPACE_ID_1)
- .withConfiguration(Jsons.jsonNode(CONNECTION_SPECIFICATION))
+ .withConfiguration(Jsons.deserialize(CONNECTION_SPECIFICATION))
.withSourceId(SOURCE_ID_2);
final SourceConnection sourceConnection3 = new SourceConnection()
.withName("source-3")
.withTombstone(false)
.withSourceDefinitionId(SOURCE_DEFINITION_ID_1)
.withWorkspaceId(WORKSPACE_ID_2)
- .withConfiguration(Jsons.jsonNode(("")))
+ .withConfiguration(Jsons.emptyObject())
.withSourceId(SOURCE_ID_3);
return Arrays.asList(sourceConnection1, sourceConnection2, sourceConnection3);
}
@@ -365,21 +365,21 @@ public static List destinationConnections() {
.withTombstone(false)
.withDestinationDefinitionId(DESTINATION_DEFINITION_ID_1)
.withWorkspaceId(WORKSPACE_ID_1)
- .withConfiguration(Jsons.jsonNode(CONNECTION_SPECIFICATION))
+ .withConfiguration(Jsons.deserialize(CONNECTION_SPECIFICATION))
.withDestinationId(DESTINATION_ID_1);
final DestinationConnection destinationConnection2 = new DestinationConnection()
.withName("destination-2")
.withTombstone(false)
.withDestinationDefinitionId(DESTINATION_DEFINITION_ID_2)
.withWorkspaceId(WORKSPACE_ID_1)
- .withConfiguration(Jsons.jsonNode(CONNECTION_SPECIFICATION))
+ .withConfiguration(Jsons.deserialize(CONNECTION_SPECIFICATION))
.withDestinationId(DESTINATION_ID_2);
final DestinationConnection destinationConnection3 = new DestinationConnection()
.withName("destination-3")
.withTombstone(true)
.withDestinationDefinitionId(DESTINATION_DEFINITION_ID_2)
.withWorkspaceId(WORKSPACE_ID_2)
- .withConfiguration(Jsons.jsonNode(""))
+ .withConfiguration(Jsons.emptyObject())
.withDestinationId(DESTINATION_ID_3);
return Arrays.asList(destinationConnection1, destinationConnection2, destinationConnection3);
}
diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceE2ETest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceE2ETest.java
index 1aecba632e0f2..3e15ebe9bba3e 100644
--- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceE2ETest.java
+++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceE2ETest.java
@@ -25,13 +25,7 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.StandardWorkspace;
-import io.airbyte.db.factory.DSLContextFactory;
-import io.airbyte.db.factory.DataSourceFactory;
-import io.airbyte.db.factory.FlywayFactory;
-import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
-import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
import io.airbyte.protocol.models.ConnectorSpecification;
-import io.airbyte.test.utils.DatabaseConnectionHelper;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.sql.SQLException;
@@ -43,16 +37,15 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
-import org.jooq.SQLDialect;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-class StandardSyncPersistenceE2ETest extends BaseDatabaseConfigPersistenceTest {
+class StandardSyncPersistenceE2ETest extends BaseConfigDatabaseTest {
record StandardSyncProtocolVersionFlag(UUID standardSyncId, boolean unsupportedProtocolVersion) {}
private ConfigRepository configRepository;
+ private StandardSyncPersistence standardSyncPersistence;
UUID workspaceId;
StandardWorkspace workspace;
@@ -71,23 +64,12 @@ record StandardSyncProtocolVersionFlag(UUID standardSyncId, boolean unsupportedP
@BeforeEach
void beforeEach() throws Exception {
- dataSource = DatabaseConnectionHelper.createDataSource(container);
- dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES);
- flyway = FlywayFactory.create(dataSource, StandardSyncPersistenceE2ETest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER,
- ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
- database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(true);
truncateAllTables();
standardSyncPersistence = new StandardSyncPersistence(database);
configRepository = new ConfigRepository(database);
}
- @AfterEach
- void afterEach() throws Exception {
- dslContext.close();
- DataSourceFactory.close(dataSource);
- }
-
@Test
void testClearUnsupportedProtocolVersionFlagFromSource() throws IOException, JsonValidationException, SQLException {
createBaseObjects();
diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java
index 4e85864e61cd4..78b5ce3a24895 100644
--- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java
+++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java
@@ -16,34 +16,27 @@
import io.airbyte.config.State;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
-import io.airbyte.db.ExceptionWrappingDatabase;
-import io.airbyte.db.factory.DSLContextFactory;
-import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.init.DatabaseInitializationException;
-import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
-import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.StreamDescriptor;
-import io.airbyte.test.utils.DatabaseConnectionHelper;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
+import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.jooq.JSONB;
-import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-class StatePersistenceTest extends BaseDatabaseConfigPersistenceTest {
+class StatePersistenceTest extends BaseConfigDatabaseTest {
private ConfigRepository configRepository;
private StatePersistence statePersistence;
@@ -55,6 +48,36 @@ class StatePersistenceTest extends BaseDatabaseConfigPersistenceTest {
private static final String GLOBAL_STATE = "\"my global state\"";
private static final String STATE = "state";
+ @BeforeEach
+ void beforeEach() throws DatabaseInitializationException, IOException, JsonValidationException, SQLException {
+ truncateAllTables();
+
+ setupTestData();
+ statePersistence = new StatePersistence(database);
+ }
+
+ private void setupTestData() throws JsonValidationException, IOException {
+ configRepository = new ConfigRepository(database);
+
+ final StandardWorkspace workspace = MockData.standardWorkspaces().get(0);
+ final StandardSourceDefinition sourceDefinition = MockData.publicSourceDefinition();
+ final SourceConnection sourceConnection = MockData.sourceConnections().get(0);
+ final StandardDestinationDefinition destinationDefinition = MockData.publicDestinationDefinition();
+ final DestinationConnection destinationConnection = MockData.destinationConnections().get(0);
+ final StandardSync sync = MockData.standardSyncs().get(0);
+
+ configRepository.writeStandardWorkspaceNoSecrets(workspace);
+ configRepository.writeStandardSourceDefinition(sourceDefinition);
+ configRepository.writeSourceConnectionNoSecrets(sourceConnection);
+ configRepository.writeStandardDestinationDefinition(destinationDefinition);
+ configRepository.writeDestinationConnectionNoSecrets(destinationConnection);
+ configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(0));
+ configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(1));
+ configRepository.writeStandardSync(sync);
+
+ connectionId = sync.getConnectionId();
+ }
+
@Test
void testReadingNonExistingState() throws IOException {
Assertions.assertTrue(statePersistence.getCurrentState(UUID.randomUUID()).isEmpty());
@@ -444,7 +467,7 @@ void testStreamFullReset() throws IOException {
}
@Test
- void testInconsistentTypeUpdates() throws IOException {
+ void testInconsistentTypeUpdates() throws IOException, SQLException {
final StateWrapper streamState = new StateWrapper()
.withStateType(StateType.STREAM)
.withStateMessages(Arrays.asList(
@@ -479,10 +502,13 @@ void testInconsistentTypeUpdates() throws IOException {
// We should be guarded against those cases let's make sure we don't make things worse if we're in
// an inconsistent state
- dslContext.insertInto(DSL.table(STATE))
- .columns(DSL.field("id"), DSL.field("connection_id"), DSL.field("type"), DSL.field(STATE))
- .values(UUID.randomUUID(), connectionId, io.airbyte.db.instance.configs.jooq.generated.enums.StateType.GLOBAL, JSONB.valueOf("{}"))
- .execute();
+ database.transaction(ctx -> {
+ ctx.insertInto(DSL.table(STATE))
+ .columns(DSL.field("id"), DSL.field("connection_id"), DSL.field("type"), DSL.field(STATE))
+ .values(UUID.randomUUID(), connectionId, io.airbyte.db.instance.configs.jooq.generated.enums.StateType.GLOBAL, JSONB.valueOf("{}"))
+ .execute();
+ return null;
+ });
Assertions.assertThrows(IllegalStateException.class, () -> statePersistence.updateOrCreateState(connectionId, streamState));
Assertions.assertThrows(IllegalStateException.class, () -> statePersistence.getCurrentState(connectionId));
}
@@ -497,79 +523,22 @@ void testEnumsConversion() {
}
@Test
- void testStatePersistenceLegacyReadConsistency() throws IOException {
- final JsonNode jsonState = Jsons.deserialize("{\"my\": \"state\"}");
- final State state = new State().withState(jsonState);
- configRepository.updateConnectionState(connectionId, state);
-
- final StateWrapper readStateWrapper = statePersistence.getCurrentState(connectionId).orElseThrow();
- Assertions.assertEquals(StateType.LEGACY, readStateWrapper.getStateType());
- Assertions.assertEquals(state.getState(), readStateWrapper.getLegacyState());
- }
-
- @Test
- void testStatePersistenceLegacyWriteConsistency() throws IOException {
+ void testStatePersistenceLegacyWriteConsistency() throws IOException, SQLException {
final JsonNode jsonState = Jsons.deserialize("{\"my\": \"state\"}");
final StateWrapper stateWrapper = new StateWrapper().withStateType(StateType.LEGACY).withLegacyState(jsonState);
statePersistence.updateOrCreateState(connectionId, stateWrapper);
// Making sure we still follow the legacy format
- final List readStates = dslContext
- .selectFrom(STATE)
+ final List readStates = database.transaction(ctx -> ctx.selectFrom(STATE)
.where(DSL.field("connection_id").eq(connectionId))
.fetch().map(r -> Jsons.deserialize(r.get(DSL.field(STATE, JSONB.class)).data(), State.class))
- .stream().toList();
+ .stream()
+ .toList());
Assertions.assertEquals(1, readStates.size());
Assertions.assertEquals(readStates.get(0).getState(), stateWrapper.getLegacyState());
}
- @BeforeEach
- void beforeEach() throws DatabaseInitializationException, IOException, JsonValidationException {
- dataSource = DatabaseConnectionHelper.createDataSource(container);
- dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES);
- flyway = FlywayFactory.create(dataSource, StatePersistenceTest.class.getName(),
- ConfigsDatabaseMigrator.DB_IDENTIFIER, ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
- database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(true);
- setupTestData();
-
- statePersistence = new StatePersistence(database);
- }
-
- @AfterEach
- void afterEach() {
- // Making sure we reset between tests
- dslContext.dropSchemaIfExists("public").cascade().execute();
- dslContext.createSchema("public").execute();
- dslContext.setSchema("public").execute();
- }
-
- private void setupTestData() throws JsonValidationException, IOException {
- configRepository = new ConfigRepository(
- new DatabaseConfigPersistence(database),
- database,
- new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)),
- new StandardSyncPersistence(database));
-
- final StandardWorkspace workspace = MockData.standardWorkspaces().get(0);
- final StandardSourceDefinition sourceDefinition = MockData.publicSourceDefinition();
- final SourceConnection sourceConnection = MockData.sourceConnections().get(0);
- final StandardDestinationDefinition destinationDefinition = MockData.publicDestinationDefinition();
- final DestinationConnection destinationConnection = MockData.destinationConnections().get(0);
- final StandardSync sync = MockData.standardSyncs().get(0);
-
- configRepository.writeStandardWorkspaceNoSecrets(workspace);
- configRepository.writeStandardSourceDefinition(sourceDefinition);
- configRepository.writeSourceConnectionNoSecrets(sourceConnection);
- configRepository.writeStandardDestinationDefinition(destinationDefinition);
- configRepository.writeDestinationConnectionNoSecrets(destinationConnection);
- configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(0));
- configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(1));
- configRepository.writeStandardSync(sync);
-
- connectionId = sync.getConnectionId();
- }
-
private StateWrapper clone(final StateWrapper state) {
return switch (state.getStateType()) {
case LEGACY -> new StateWrapper()
diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StreamResetPersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StreamResetPersistenceTest.java
index f121051430a9b..b4c266f6da36f 100644
--- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StreamResetPersistenceTest.java
+++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StreamResetPersistenceTest.java
@@ -8,49 +8,25 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.spy;
-import io.airbyte.db.factory.DSLContextFactory;
-import io.airbyte.db.factory.DataSourceFactory;
-import io.airbyte.db.factory.FlywayFactory;
-import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
-import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
-import io.airbyte.db.instance.development.DevDatabaseMigrator;
-import io.airbyte.db.instance.development.MigrationDevHelper;
import io.airbyte.protocol.models.StreamDescriptor;
-import io.airbyte.test.utils.DatabaseConnectionHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import org.jooq.SQLDialect;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class StreamResetPersistenceTest extends BaseDatabaseConfigPersistenceTest {
+class StreamResetPersistenceTest extends BaseConfigDatabaseTest {
static StreamResetPersistence streamResetPersistence;
private static final Logger LOGGER = LoggerFactory.getLogger(StreamResetPersistenceTest.class);
@BeforeEach
public void setup() throws Exception {
- dataSource = DatabaseConnectionHelper.createDataSource(container);
- dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES);
- flyway = FlywayFactory.create(dataSource, StreamResetPersistenceTest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER,
- ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
- database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false);
- streamResetPersistence = spy(new StreamResetPersistence(database));
- final ConfigsDatabaseMigrator configsDatabaseMigrator =
- new ConfigsDatabaseMigrator(database, flyway);
- final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
- MigrationDevHelper.runLastMigration(devDatabaseMigrator);
truncateAllTables();
- }
- @AfterEach
- void tearDown() throws Exception {
- dslContext.close();
- DataSourceFactory.close(dataSource);
+ streamResetPersistence = spy(new StreamResetPersistence(database));
}
@Test
@@ -62,15 +38,15 @@ void testCreateSameResetTwiceOnlyCreateItOnce() throws Exception {
streamResetPersistence.createStreamResets(connectionId, List.of(streamDescriptor1, streamDescriptor2));
final List result = streamResetPersistence.getStreamResets(connectionId);
- LOGGER.info(String.valueOf(dslContext.selectFrom("stream_reset").fetch()));
+ LOGGER.info(database.query(ctx -> ctx.selectFrom("stream_reset").fetch().toString()));
assertEquals(2, result.size());
streamResetPersistence.createStreamResets(connectionId, List.of(streamDescriptor1));
- LOGGER.info(String.valueOf(dslContext.selectFrom("stream_reset").fetch()));
+ LOGGER.info(database.query(ctx -> ctx.selectFrom("stream_reset").fetch().toString()));
assertEquals(2, streamResetPersistence.getStreamResets(connectionId).size());
streamResetPersistence.createStreamResets(connectionId, List.of(streamDescriptor2));
- LOGGER.info(String.valueOf(dslContext.selectFrom("stream_reset").fetch()));
+ LOGGER.info(database.query(ctx -> ctx.selectFrom("stream_reset").fetch().toString()));
assertEquals(2, streamResetPersistence.getStreamResets(connectionId).size());
}
diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseTestProvider.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseTestProvider.java
index 98f09191e776a..1c26d28c8287c 100644
--- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseTestProvider.java
+++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseTestProvider.java
@@ -41,8 +41,7 @@ public Database create(final boolean runMigration) throws IOException, DatabaseI
final Database database = new Database(dslContext);
if (runMigration) {
- final DatabaseMigrator migrator = new ConfigsDatabaseMigrator(
- database, flyway);
+ final DatabaseMigrator migrator = new ConfigsDatabaseMigrator(database, flyway);
migrator.createBaseline();
migrator.migrate();
} else {