diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseConfigDatabaseTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseConfigDatabaseTest.java new file mode 100644 index 0000000000000..3f8a221fc7db6 --- /dev/null +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseConfigDatabaseTest.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.config.persistence; + +import io.airbyte.db.Database; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DataSourceFactory; +import io.airbyte.db.factory.FlywayFactory; +import io.airbyte.db.init.DatabaseInitializationException; +import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; +import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider; +import io.airbyte.test.utils.DatabaseConnectionHelper; +import java.io.IOException; +import java.sql.SQLException; +import javax.sql.DataSource; +import org.flywaydb.core.Flyway; +import org.jooq.DSLContext; +import org.jooq.SQLDialect; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.PostgreSQLContainer; + +/** + * This class exists to abstract away the lifecycle of the test container database and the config + * database schema. This is ALL it intends to do. Any additional functionality belongs somewhere + * else. It is useful for test suites that need to interact directly with the database. + * + * This class sets up a test container database and runs the config database migrations against it + * to provide the most up-to-date schema. + * + * What this class is NOT designed to do: + * + * + * This comment is emphatically worded, because it is tempting to add things to this class. It has + * already happened in 3 previous iterations, and each time it takes multiple engineering days to + * fix it. + * + * Usage: + * + * + * Note: truncateAllTables() works by truncating each table in the db, if you add a new table, you + * will need to add it to that method for it work as expected. + */ +@SuppressWarnings({"PMD.MutableStaticState", "PMD.SignatureDeclareThrowsException"}) +class BaseConfigDatabaseTest { + + static Database database; + + // keep these private, do not expose outside this class! + private static PostgreSQLContainer container; + private static DataSource dataSource; + private static DSLContext dslContext; + + /** + * Create db test container, sets up java database resources, and runs migrations. Should not be + * called externally. It is not private because junit cannot access private methods. + * + * @throws DatabaseInitializationException - db fails to initialize + * @throws IOException - failure when interacting with db. + */ + @BeforeAll + static void dbSetup() throws DatabaseInitializationException, IOException { + createDbContainer(); + setDb(); + migrateDb(); + } + + /** + * Close all resources (container, data source, dsl context, database). Should not be called + * externally. It is not private because junit cannot access private methods. + * + * @throws Exception - exception while closing resources + */ + @AfterAll + static void dbDown() throws Exception { + dslContext.close(); + DataSourceFactory.close(dataSource); + container.close(); + } + + /** + * Truncates tables to reset them. Designed to be used in between tests. + * + * Note: NEW TABLES -- When a new table is added to the db, it will need to be added here. + * + * @throws SQLException - failure in truncate query. + */ + static void truncateAllTables() throws SQLException { + database.query(ctx -> ctx + .execute( + """ + TRUNCATE TABLE + actor, + actor_catalog, + actor_catalog_fetch_event, + actor_definition, + actor_definition_workspace_grant, + actor_oauth_parameter, + connection, + connection_operation, + operation, + state, + stream_reset, + workspace, + workspace_service_account + """)); + } + + private static void createDbContainer() { + container = new PostgreSQLContainer<>("postgres:13-alpine") + .withDatabaseName("airbyte") + .withUsername("docker") + .withPassword("docker"); + container.start(); + } + + private static void setDb() { + dataSource = DatabaseConnectionHelper.createDataSource(container); + dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); + database = new Database(dslContext); + } + + private static void migrateDb() throws IOException, DatabaseInitializationException { + final Flyway flyway = FlywayFactory.create( + dataSource, + StreamResetPersistenceTest.class.getName(), + ConfigsDatabaseMigrator.DB_IDENTIFIER, + ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); + new ConfigsDatabaseTestProvider(dslContext, flyway).create(true); + } + +} diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java deleted file mode 100644 index da381448620ee..0000000000000 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.config.persistence; - -import static org.jooq.impl.DSL.asterisk; -import static org.jooq.impl.DSL.count; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.config.ConfigSchema; -import io.airbyte.config.DestinationConnection; -import io.airbyte.config.Geography; -import io.airbyte.config.SourceConnection; -import io.airbyte.config.StandardDestinationDefinition; -import io.airbyte.config.StandardSourceDefinition; -import io.airbyte.config.StandardSourceDefinition.ReleaseStage; -import io.airbyte.config.StandardSourceDefinition.SourceType; -import io.airbyte.config.StandardWorkspace; -import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; -import io.airbyte.db.Database; -import java.sql.SQLException; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import javax.sql.DataSource; -import org.flywaydb.core.Flyway; -import org.jooq.DSLContext; -import org.jooq.Record1; -import org.jooq.Result; -import org.jooq.Table; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.testcontainers.containers.PostgreSQLContainer; - -/** - * This class provides downstream tests with constants and helpers. - */ -@SuppressWarnings({"PMD.MutableStaticState", "PMD.SignatureDeclareThrowsException"}) -class BaseDatabaseConfigPersistenceTest { - - static PostgreSQLContainer container; - static Database database; - static DatabaseConfigPersistence configPersistence; - static StandardSyncPersistence standardSyncPersistence; - static JsonSecretsProcessor jsonSecretsProcessor; - static DataSource dataSource; - static DSLContext dslContext; - static Flyway flyway; - - protected static final StandardSourceDefinition SOURCE_GITHUB = new StandardSourceDefinition() - .withName("GitHub") - .withSourceDefinitionId(UUID.fromString("ef69ef6e-aa7f-4af1-a01d-ef775033524e")) - .withDockerRepository("airbyte/source-github") - .withDockerImageTag("0.2.3") - .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/github") - .withIcon("github.svg") - .withSourceType(SourceType.API) - .withProtocolVersion("0.2.0") - .withTombstone(false); - protected static final StandardSourceDefinition SOURCE_POSTGRES = new StandardSourceDefinition() - .withName("Postgres") - .withSourceDefinitionId(UUID.fromString("decd338e-5647-4c0b-adf4-da0e75f5a750")) - .withDockerRepository("airbyte/source-postgres") - .withDockerImageTag("0.3.11") - .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres") - .withIcon("postgresql.svg") - .withSourceType(SourceType.DATABASE) - .withTombstone(false); - protected static final StandardSourceDefinition SOURCE_CUSTOM = new StandardSourceDefinition() - .withName("Custom") - .withSourceDefinitionId(UUID.fromString("baba338e-5647-4c0b-adf4-da0e75f5a750")) - .withDockerRepository("airbyte/cusom") - .withDockerImageTag("0.3.11") - .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres") - .withIcon("postgresql.svg") - .withSourceType(SourceType.DATABASE) - .withCustom(true) - .withReleaseStage(ReleaseStage.CUSTOM) - .withTombstone(false); - protected static final StandardDestinationDefinition DESTINATION_SNOWFLAKE = new StandardDestinationDefinition() - .withName("Snowflake") - .withDestinationDefinitionId(UUID.fromString("424892c4-daac-4491-b35d-c6688ba547ba")) - .withDockerRepository("airbyte/destination-snowflake") - .withDockerImageTag("0.3.16") - .withDocumentationUrl("https://docs.airbyte.io/integrations/destinations/snowflake") - .withProtocolVersion("0.2.0") - .withTombstone(false); - protected static final StandardDestinationDefinition DESTINATION_S3 = new StandardDestinationDefinition() - .withName("S3") - .withDestinationDefinitionId(UUID.fromString("4816b78f-1489-44c1-9060-4b19d5fa9362")) - .withDockerRepository("airbyte/destination-s3") - .withDockerImageTag("0.1.12") - .withDocumentationUrl("https://docs.airbyte.io/integrations/destinations/s3") - .withProtocolVersion("0.2.0") - .withTombstone(false); - protected static final StandardDestinationDefinition DESTINATION_CUSTOM = new StandardDestinationDefinition() - .withName("Custom") - .withDestinationDefinitionId(UUID.fromString("baba338e-5647-4c0b-adf4-da0e75f5a750")) - .withDockerRepository("airbyte/cusom") - .withDockerImageTag("0.3.11") - .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres") - .withIcon("postgresql.svg") - .withCustom(true) - .withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM) - .withTombstone(false); - private static final String CANNOT_BE_NULL = "can not be null"; - - @BeforeAll - static void dbSetup() { - container = new PostgreSQLContainer<>("postgres:13-alpine") - .withDatabaseName("airbyte") - .withUsername("docker") - .withPassword("docker"); - container.start(); - jsonSecretsProcessor = mock(JsonSecretsProcessor.class); - } - - @AfterAll - static void dbDown() { - container.close(); - } - - static void truncateAllTables() throws SQLException { - database.query(ctx -> ctx - .execute( - "TRUNCATE TABLE workspace_service_account, state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, " - + "actor, actor_definition, actor_definition_workspace_grant, workspace, stream_reset")); - } - - void writeSource(final ConfigPersistence configPersistence, final StandardSourceDefinition source) throws Exception { - configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), source); - } - - void writeSourceWithSourceConnection(final ConfigPersistence configPersistence, final StandardSourceDefinition source) - throws Exception { - configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), source); - final UUID connectionId = UUID.randomUUID(); - final UUID workspaceId = UUID.randomUUID(); - final StandardWorkspace workspace = new StandardWorkspace() - .withWorkspaceId(workspaceId) - .withName(CANNOT_BE_NULL) - .withSlug(CANNOT_BE_NULL) - .withInitialSetupComplete(true) - .withDefaultGeography(Geography.AUTO); - configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), workspace); - - final SourceConnection sourceConnection = new SourceConnection() - .withSourceId(connectionId) - .withWorkspaceId(workspaceId) - .withName(CANNOT_BE_NULL) - .withSourceDefinitionId(source.getSourceDefinitionId()); - configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, connectionId.toString(), sourceConnection); - } - - void writeDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination) - throws Exception { - configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination); - } - - void writeDestinationWithDestinationConnection(final ConfigPersistence configPersistence, - final StandardDestinationDefinition destination) - throws Exception { - configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination); - final UUID connectionId = UUID.randomUUID(); - final UUID workspaceId = UUID.randomUUID(); - final StandardWorkspace workspace = new StandardWorkspace() - .withWorkspaceId(workspaceId) - .withName(CANNOT_BE_NULL) - .withSlug(CANNOT_BE_NULL) - .withInitialSetupComplete(true) - .withDefaultGeography(Geography.AUTO); - configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), workspace); - - final DestinationConnection destinationConnection = new DestinationConnection() - .withDestinationId(connectionId) - .withWorkspaceId(workspaceId) - .withName(CANNOT_BE_NULL) - .withDestinationDefinitionId(destination.getDestinationDefinitionId()); - configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, connectionId.toString(), destinationConnection); - } - - void writeDestinations(final ConfigPersistence configPersistence, final List destinations) - throws Exception { - final Map destinationsByID = destinations.stream() - .collect(Collectors.toMap(destinationDefinition -> destinationDefinition.getDestinationDefinitionId().toString(), Function.identity())); - configPersistence.writeConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destinationsByID); - } - - void deleteDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination) - throws Exception { - configPersistence.deleteConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString()); - } - - protected Map> getMapWithSet(final Map> input) { - return input.entrySet().stream().collect(Collectors.toMap( - Entry::getKey, - e -> e.getValue().collect(Collectors.toSet()))); - } - - // assertEquals cannot correctly check the equality of two maps with stream values, - // so streams are converted to sets before being compared. - protected void assertSameConfigDump(final Map> expected, final Map> actual) { - assertEquals(getMapWithSet(expected), getMapWithSet(actual)); - } - - protected void assertRecordCount(final int expectedCount, final Table table) throws Exception { - final Result> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table).fetch()); - assertEquals(expectedCount, recordCount.get(0).value1()); - } - - protected void assertHasSource(final StandardSourceDefinition source) throws Exception { - assertEquals(source, configPersistence - .getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), - StandardSourceDefinition.class)); - } - - protected void assertHasDestination(final StandardDestinationDefinition destination) throws Exception { - assertEquals(destination, configPersistence - .getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), - StandardDestinationDefinition.class)); - } - -} diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java index 88fe070a20785..e941f3d28ea59 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java @@ -31,20 +31,12 @@ import io.airbyte.config.StandardWorkspace; import io.airbyte.config.persistence.ConfigRepository.DestinationAndDefinition; import io.airbyte.config.persistence.ConfigRepository.SourceAndDefinition; -import io.airbyte.db.Database; import io.airbyte.db.ExceptionWrappingDatabase; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.FlywayFactory; import io.airbyte.db.init.DatabaseInitializationException; -import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; -import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider; -import io.airbyte.db.instance.development.DevDatabaseMigrator; -import io.airbyte.db.instance.development.MigrationDevHelper; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.test.utils.DatabaseConnectionHelper; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.sql.SQLException; @@ -57,53 +49,24 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import javax.sql.DataSource; -import org.flywaydb.core.Flyway; import org.jooq.DSLContext; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.PostgreSQLContainer; @SuppressWarnings({"PMD.CyclomaticComplexity", "PMD.NPathComplexity"}) -class ConfigRepositoryE2EReadWriteTest { +class ConfigRepositoryE2EReadWriteTest extends BaseConfigDatabaseTest { - private static PostgreSQLContainer container; - private DataSource dataSource; - private DSLContext dslContext; - private Database database; private ConfigRepository configRepository; private DatabaseConfigPersistence configPersistence; - private Flyway flyway; private final static String DOCKER_IMAGE_TAG = "1.2.0"; private final static String CONFIG_HASH = "ConfigHash"; - @BeforeAll - public static void dbSetup() { - container = new PostgreSQLContainer<>("postgres:13-alpine") - .withDatabaseName("airbyte") - .withUsername("docker") - .withPassword("docker"); - container.start(); - } - @BeforeEach void setup() throws IOException, JsonValidationException, SQLException, DatabaseInitializationException, InterruptedException { - dataSource = DatabaseConnectionHelper.createDataSource(container); - dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); - flyway = FlywayFactory.create(dataSource, ConfigRepositoryE2EReadWriteTest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER, - ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); - database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false); configPersistence = spy(new DatabaseConfigPersistence(database)); configRepository = spy(new ConfigRepository(configPersistence, database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)), new StandardSyncPersistence(database))); - final ConfigsDatabaseMigrator configsDatabaseMigrator = - new ConfigsDatabaseMigrator(database, flyway); - final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator); - MigrationDevHelper.runLastMigration(devDatabaseMigrator); for (final StandardWorkspace workspace : MockData.standardWorkspaces()) { configRepository.writeStandardWorkspaceNoSecrets(workspace); } @@ -136,11 +99,6 @@ void setup() throws IOException, JsonValidationException, SQLException, Database database.transaction(ctx -> ctx.truncate(ACTOR_DEFINITION_WORKSPACE_GRANT).execute()); } - @AfterAll - public static void dbDown() { - container.close(); - } - @Test void testWorkspaceCountConnections() throws IOException { diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java index 799ce1e4b4d3f..4ba3a04cbbc23 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java @@ -25,47 +25,26 @@ import io.airbyte.config.StandardSyncState; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.WorkspaceServiceAccount; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.DataSourceFactory; -import io.airbyte.db.factory.FlywayFactory; -import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; -import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider; -import io.airbyte.db.instance.development.DevDatabaseMigrator; -import io.airbyte.db.instance.development.MigrationDevHelper; -import io.airbyte.test.utils.DatabaseConnectionHelper; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.commons.lang3.NotImplementedException; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @SuppressWarnings({"PMD.SignatureDeclareThrowsException", "PMD.JUnitTestsShouldIncludeAssert", "PMD.DetachedTestCase"}) -class DatabaseConfigPersistenceE2EReadWriteTest extends BaseDatabaseConfigPersistenceTest { +class DatabaseConfigPersistenceE2EReadWriteTest extends BaseConfigDatabaseTest { + + private ConfigPersistence configPersistence; + private StandardSyncPersistence standardSyncPersistence; @BeforeEach void setup() throws Exception { - dataSource = DatabaseConnectionHelper.createDataSource(container); - dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); - flyway = FlywayFactory.create(dataSource, DatabaseConfigPersistenceE2EReadWriteTest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER, - ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); - database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false); configPersistence = spy(new DatabaseConfigPersistence(database)); standardSyncPersistence = new StandardSyncPersistence(database); - final ConfigsDatabaseMigrator configsDatabaseMigrator = - new ConfigsDatabaseMigrator(database, flyway); - final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator); - MigrationDevHelper.runLastMigration(devDatabaseMigrator); - truncateAllTables(); - } - @AfterEach - void tearDown() throws Exception { - dslContext.close(); - DataSourceFactory.close(dataSource); + truncateAllTables(); } @Test diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java index 177b79441871a..481ca301d1422 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java @@ -8,6 +8,8 @@ import static io.airbyte.config.ConfigSchema.STANDARD_SOURCE_DEFINITION; import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION; import static org.assertj.core.api.Assertions.assertThat; +import static org.jooq.impl.DSL.asterisk; +import static org.jooq.impl.DSL.count; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -18,19 +20,17 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConfigSchema; import io.airbyte.config.ConfigWithMetadata; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.Geography; +import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSourceDefinition.ReleaseStage; +import io.airbyte.config.StandardSourceDefinition.SourceType; +import io.airbyte.config.StandardWorkspace; import io.airbyte.config.persistence.DatabaseConfigPersistence.ConnectorInfo; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.DataSourceFactory; -import io.airbyte.db.factory.FlywayFactory; -import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; -import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider; -import io.airbyte.db.instance.development.DevDatabaseMigrator; -import io.airbyte.db.instance.development.MigrationDevHelper; -import io.airbyte.test.utils.DatabaseConnectionHelper; import java.time.Duration; import java.time.Instant; import java.time.LocalDate; @@ -38,10 +38,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.UUID; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterEach; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.jooq.Record1; +import org.jooq.Result; +import org.jooq.Table; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -50,29 +55,75 @@ * methods. */ @SuppressWarnings({"PMD.SignatureDeclareThrowsException", "PMD.ShortVariable", "PMD.JUnitTestsShouldIncludeAssert"}) -class DatabaseConfigPersistenceTest extends BaseDatabaseConfigPersistenceTest { +class DatabaseConfigPersistenceTest extends BaseConfigDatabaseTest { + + public static final String DEFAULT_PROTOCOL_VERSION = "0.2.0"; + protected static final StandardSourceDefinition SOURCE_GITHUB = new StandardSourceDefinition() + .withName("GitHub") + .withSourceDefinitionId(UUID.fromString("ef69ef6e-aa7f-4af1-a01d-ef775033524e")) + .withDockerRepository("airbyte/source-github") + .withDockerImageTag("0.2.3") + .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/github") + .withIcon("github.svg") + .withSourceType(SourceType.API) + .withProtocolVersion(DEFAULT_PROTOCOL_VERSION) + .withTombstone(false); + protected static final StandardSourceDefinition SOURCE_POSTGRES = new StandardSourceDefinition() + .withName("Postgres") + .withSourceDefinitionId(UUID.fromString("decd338e-5647-4c0b-adf4-da0e75f5a750")) + .withDockerRepository("airbyte/source-postgres") + .withDockerImageTag("0.3.11") + .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres") + .withIcon("postgresql.svg") + .withSourceType(SourceType.DATABASE) + .withTombstone(false); + protected static final StandardSourceDefinition SOURCE_CUSTOM = new StandardSourceDefinition() + .withName("Custom") + .withSourceDefinitionId(UUID.fromString("baba338e-5647-4c0b-adf4-da0e75f5a750")) + .withDockerRepository("airbyte/cusom") + .withDockerImageTag("0.3.11") + .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres") + .withIcon("postgresql.svg") + .withSourceType(SourceType.DATABASE) + .withCustom(true) + .withReleaseStage(ReleaseStage.CUSTOM) + .withTombstone(false); + protected static final StandardDestinationDefinition DESTINATION_SNOWFLAKE = new StandardDestinationDefinition() + .withName("Snowflake") + .withDestinationDefinitionId(UUID.fromString("424892c4-daac-4491-b35d-c6688ba547ba")) + .withDockerRepository("airbyte/destination-snowflake") + .withDockerImageTag("0.3.16") + .withDocumentationUrl("https://docs.airbyte.io/integrations/destinations/snowflake") + .withProtocolVersion(DEFAULT_PROTOCOL_VERSION) + .withTombstone(false); + protected static final StandardDestinationDefinition DESTINATION_S3 = new StandardDestinationDefinition() + .withName("S3") + .withDestinationDefinitionId(UUID.fromString("4816b78f-1489-44c1-9060-4b19d5fa9362")) + .withDockerRepository("airbyte/destination-s3") + .withDockerImageTag("0.1.12") + .withDocumentationUrl("https://docs.airbyte.io/integrations/destinations/s3") + .withProtocolVersion(DEFAULT_PROTOCOL_VERSION) + .withTombstone(false); + protected static final StandardDestinationDefinition DESTINATION_CUSTOM = new StandardDestinationDefinition() + .withName("Custom") + .withDestinationDefinitionId(UUID.fromString("baba338e-5647-4c0b-adf4-da0e75f5a750")) + .withDockerRepository("airbyte/cusom") + .withDockerImageTag("0.3.11") + .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres") + .withIcon("postgresql.svg") + .withCustom(true) + .withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM) + .withTombstone(false); + private static final String CANNOT_BE_NULL = "can not be null"; + + private DatabaseConfigPersistence configPersistence; @BeforeEach public void setup() throws Exception { - dataSource = DatabaseConnectionHelper.createDataSource(container); - dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); - flyway = FlywayFactory.create(dataSource, DatabaseConfigPersistenceTest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER, - ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); - database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false); configPersistence = spy(new DatabaseConfigPersistence(database)); - final ConfigsDatabaseMigrator configsDatabaseMigrator = - new ConfigsDatabaseMigrator(database, flyway); - final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator); - MigrationDevHelper.runLastMigration(devDatabaseMigrator); truncateAllTables(); } - @AfterEach - void tearDown() throws Exception { - dslContext.close(); - DataSourceFactory.close(dataSource); - } - @Test void testMultiWriteAndGetConfig() throws Exception { writeDestinations(configPersistence, Lists.newArrayList(DESTINATION_S3, DESTINATION_SNOWFLAKE)); @@ -151,7 +202,7 @@ void testDeleteConfig() throws Exception { void testGetConnectorRepositoryToInfoMap() throws Exception { final String connectorRepository = "airbyte/duplicated-connector"; final String oldVersion = "0.1.10"; - final String newVersion = "0.2.0"; + final String newVersion = DEFAULT_PROTOCOL_VERSION; final StandardSourceDefinition source1 = new StandardSourceDefinition() .withSourceDefinitionId(UUID.randomUUID()) .withName("source-1") @@ -207,13 +258,13 @@ void testInsertConfigRecord() throws Exception { @Test void testHasNewVersion() { - assertTrue(DatabaseConfigPersistence.hasNewVersion("0.1.99", "0.2.0")); + assertTrue(DatabaseConfigPersistence.hasNewVersion("0.1.99", DEFAULT_PROTOCOL_VERSION)); assertFalse(DatabaseConfigPersistence.hasNewVersion("invalid_version", "0.1.2")); } @Test void testHasNewPatchVersion() { - assertFalse(DatabaseConfigPersistence.hasNewPatchVersion("0.1.99", "0.2.0")); + assertFalse(DatabaseConfigPersistence.hasNewPatchVersion("0.1.99", DEFAULT_PROTOCOL_VERSION)); assertFalse(DatabaseConfigPersistence.hasNewPatchVersion("invalid_version", "0.3.1")); assertTrue(DatabaseConfigPersistence.hasNewPatchVersion("0.1.0", "0.1.3")); } @@ -281,4 +332,97 @@ void filterCustomDestination() { assertThat(filteredSourceMap).containsOnlyKeys(nonCustomKey); } + void writeSource(final ConfigPersistence configPersistence, final StandardSourceDefinition source) throws Exception { + configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), source); + } + + void writeSourceWithSourceConnection(final ConfigPersistence configPersistence, final StandardSourceDefinition source) + throws Exception { + configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), source); + final UUID connectionId = UUID.randomUUID(); + final UUID workspaceId = UUID.randomUUID(); + final StandardWorkspace workspace = new StandardWorkspace() + .withWorkspaceId(workspaceId) + .withName(CANNOT_BE_NULL) + .withSlug(CANNOT_BE_NULL) + .withInitialSetupComplete(true) + .withDefaultGeography(Geography.AUTO); + configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), workspace); + + final SourceConnection sourceConnection = new SourceConnection() + .withSourceId(connectionId) + .withWorkspaceId(workspaceId) + .withName(CANNOT_BE_NULL) + .withSourceDefinitionId(source.getSourceDefinitionId()); + configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, connectionId.toString(), sourceConnection); + } + + void writeDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination) + throws Exception { + configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination); + } + + void writeDestinationWithDestinationConnection(final ConfigPersistence configPersistence, + final StandardDestinationDefinition destination) + throws Exception { + configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination); + final UUID connectionId = UUID.randomUUID(); + final UUID workspaceId = UUID.randomUUID(); + final StandardWorkspace workspace = new StandardWorkspace() + .withWorkspaceId(workspaceId) + .withName(CANNOT_BE_NULL) + .withSlug(CANNOT_BE_NULL) + .withInitialSetupComplete(true) + .withDefaultGeography(Geography.AUTO); + configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), workspace); + + final DestinationConnection destinationConnection = new DestinationConnection() + .withDestinationId(connectionId) + .withWorkspaceId(workspaceId) + .withName(CANNOT_BE_NULL) + .withDestinationDefinitionId(destination.getDestinationDefinitionId()); + configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, connectionId.toString(), destinationConnection); + } + + void writeDestinations(final ConfigPersistence configPersistence, final List destinations) + throws Exception { + final Map destinationsByID = destinations.stream() + .collect(Collectors.toMap(destinationDefinition -> destinationDefinition.getDestinationDefinitionId().toString(), Function.identity())); + configPersistence.writeConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destinationsByID); + } + + void deleteDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination) + throws Exception { + configPersistence.deleteConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString()); + } + + protected Map> getMapWithSet(final Map> input) { + return input.entrySet().stream().collect(Collectors.toMap( + Entry::getKey, + e -> e.getValue().collect(Collectors.toSet()))); + } + + // assertEquals cannot correctly check the equality of two maps with stream values, + // so streams are converted to sets before being compared. + protected void assertSameConfigDump(final Map> expected, final Map> actual) { + assertEquals(getMapWithSet(expected), getMapWithSet(actual)); + } + + protected void assertRecordCount(final int expectedCount, final Table table) throws Exception { + final Result> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table).fetch()); + assertEquals(expectedCount, recordCount.get(0).value1()); + } + + protected void assertHasSource(final StandardSourceDefinition source) throws Exception { + assertEquals(source, configPersistence + .getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), + StandardSourceDefinition.class)); + } + + protected void assertHasDestination(final StandardDestinationDefinition destination) throws Exception { + assertEquals(destination, configPersistence + .getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), + StandardDestinationDefinition.class)); + } + } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java index 4bea431aa5a00..e077d8e206300 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java @@ -5,32 +5,29 @@ package io.airbyte.config.persistence; import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION; +import static org.jooq.impl.DSL.asterisk; +import static org.jooq.impl.DSL.count; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConfigSchema; import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.StandardSourceDefinition.SourceType; import io.airbyte.config.persistence.DatabaseConfigPersistence.ConnectorInfo; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.DataSourceFactory; -import io.airbyte.db.factory.FlywayFactory; -import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; -import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider; -import io.airbyte.db.instance.development.DevDatabaseMigrator; -import io.airbyte.db.instance.development.MigrationDevHelper; import io.airbyte.protocol.models.ConnectorSpecification; -import io.airbyte.test.utils.DatabaseConnectionHelper; import java.io.IOException; import java.sql.SQLException; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.jooq.Record1; +import org.jooq.Result; +import org.jooq.Table; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -39,37 +36,31 @@ * Unit test for the {@link DatabaseConfigPersistence#updateConnectorDefinitions} method. */ @SuppressWarnings("PMD.SignatureDeclareThrowsException") -class DatabaseConfigPersistenceUpdateConnectorDefinitionsTest extends BaseDatabaseConfigPersistenceTest { +class DatabaseConfigPersistenceUpdateConnectorDefinitionsTest extends BaseConfigDatabaseTest { + + protected static final StandardSourceDefinition SOURCE_GITHUB = new StandardSourceDefinition() + .withName("GitHub") + .withSourceDefinitionId(UUID.fromString("ef69ef6e-aa7f-4af1-a01d-ef775033524e")) + .withDockerRepository("airbyte/source-github") + .withDockerImageTag("0.2.3") + .withDocumentationUrl("https://docs.airbyte.io/integrations/sources/github") + .withIcon("github.svg") + .withSourceType(SourceType.API) + .withProtocolVersion("0.2.0") + .withTombstone(false); private static final JsonNode SOURCE_GITHUB_JSON = Jsons.jsonNode(SOURCE_GITHUB); private static final String DOCKER_IMAGE_TAG = "0.0.0"; private static final String DOCKER_IMAGE_TAG_2 = "0.1000.0"; private static final String DEFAULT_PROTOCOL_VERSION = "0.2.0"; - @BeforeAll - public static void setup() throws Exception { - dataSource = DatabaseConnectionHelper.createDataSource(container); - dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); - flyway = FlywayFactory.create(dataSource, DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.class.getName(), - ConfigsDatabaseMigrator.DB_IDENTIFIER, - ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); - database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false); - configPersistence = new DatabaseConfigPersistence(database); - final ConfigsDatabaseMigrator configsDatabaseMigrator = - new ConfigsDatabaseMigrator(database, flyway); - final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator); - MigrationDevHelper.runLastMigration(devDatabaseMigrator); - } - - @AfterAll - public static void tearDown() throws Exception { - dslContext.close(); - DataSourceFactory.close(dataSource); - } + private DatabaseConfigPersistence configPersistence; @BeforeEach public void resetDatabase() throws SQLException { truncateAllTables(); + + configPersistence = new DatabaseConfigPersistence(database); } @Test @@ -227,4 +218,19 @@ private void assertUpdateConnectorDefinition(final List> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table).fetch()); + assertEquals(expectedCount, recordCount.get(0).value1()); + } + + protected void assertHasSource(final StandardSourceDefinition source) throws Exception { + assertEquals(source, configPersistence + .getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), + StandardSourceDefinition.class)); + } + } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java index 52b6e4f86e635..299d16657d0b5 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java @@ -134,7 +134,7 @@ public class MockData { private static final Instant NOW = Instant.parse("2021-12-15T20:30:40.00Z"); - private static final String CONNECTION_SPECIFICATION = "'{\"name\":\"John\", \"age\":30, \"car\":null}'"; + private static final String CONNECTION_SPECIFICATION = "{\"name\":\"John\", \"age\":30, \"car\":null}"; private static final UUID OPERATION_ID_4 = UUID.randomUUID(); private static final UUID WEBHOOK_CONFIG_ID = UUID.randomUUID(); private static final String WEBHOOK_OPERATION_EXECUTION_URL = "test-webhook-url"; @@ -340,21 +340,21 @@ public static List sourceConnections() { .withTombstone(false) .withSourceDefinitionId(SOURCE_DEFINITION_ID_1) .withWorkspaceId(WORKSPACE_ID_1) - .withConfiguration(Jsons.jsonNode(CONNECTION_SPECIFICATION)) + .withConfiguration(Jsons.deserialize(CONNECTION_SPECIFICATION)) .withSourceId(SOURCE_ID_1); final SourceConnection sourceConnection2 = new SourceConnection() .withName("source-2") .withTombstone(false) .withSourceDefinitionId(SOURCE_DEFINITION_ID_2) .withWorkspaceId(WORKSPACE_ID_1) - .withConfiguration(Jsons.jsonNode(CONNECTION_SPECIFICATION)) + .withConfiguration(Jsons.deserialize(CONNECTION_SPECIFICATION)) .withSourceId(SOURCE_ID_2); final SourceConnection sourceConnection3 = new SourceConnection() .withName("source-3") .withTombstone(false) .withSourceDefinitionId(SOURCE_DEFINITION_ID_1) .withWorkspaceId(WORKSPACE_ID_2) - .withConfiguration(Jsons.jsonNode((""))) + .withConfiguration(Jsons.emptyObject()) .withSourceId(SOURCE_ID_3); return Arrays.asList(sourceConnection1, sourceConnection2, sourceConnection3); } @@ -365,21 +365,21 @@ public static List destinationConnections() { .withTombstone(false) .withDestinationDefinitionId(DESTINATION_DEFINITION_ID_1) .withWorkspaceId(WORKSPACE_ID_1) - .withConfiguration(Jsons.jsonNode(CONNECTION_SPECIFICATION)) + .withConfiguration(Jsons.deserialize(CONNECTION_SPECIFICATION)) .withDestinationId(DESTINATION_ID_1); final DestinationConnection destinationConnection2 = new DestinationConnection() .withName("destination-2") .withTombstone(false) .withDestinationDefinitionId(DESTINATION_DEFINITION_ID_2) .withWorkspaceId(WORKSPACE_ID_1) - .withConfiguration(Jsons.jsonNode(CONNECTION_SPECIFICATION)) + .withConfiguration(Jsons.deserialize(CONNECTION_SPECIFICATION)) .withDestinationId(DESTINATION_ID_2); final DestinationConnection destinationConnection3 = new DestinationConnection() .withName("destination-3") .withTombstone(true) .withDestinationDefinitionId(DESTINATION_DEFINITION_ID_2) .withWorkspaceId(WORKSPACE_ID_2) - .withConfiguration(Jsons.jsonNode("")) + .withConfiguration(Jsons.emptyObject()) .withDestinationId(DESTINATION_ID_3); return Arrays.asList(destinationConnection1, destinationConnection2, destinationConnection3); } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceE2ETest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceE2ETest.java index 1aecba632e0f2..3e15ebe9bba3e 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceE2ETest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceE2ETest.java @@ -25,13 +25,7 @@ import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSync.Status; import io.airbyte.config.StandardWorkspace; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.DataSourceFactory; -import io.airbyte.db.factory.FlywayFactory; -import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; -import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider; import io.airbyte.protocol.models.ConnectorSpecification; -import io.airbyte.test.utils.DatabaseConnectionHelper; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.sql.SQLException; @@ -43,16 +37,15 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -class StandardSyncPersistenceE2ETest extends BaseDatabaseConfigPersistenceTest { +class StandardSyncPersistenceE2ETest extends BaseConfigDatabaseTest { record StandardSyncProtocolVersionFlag(UUID standardSyncId, boolean unsupportedProtocolVersion) {} private ConfigRepository configRepository; + private StandardSyncPersistence standardSyncPersistence; UUID workspaceId; StandardWorkspace workspace; @@ -71,23 +64,12 @@ record StandardSyncProtocolVersionFlag(UUID standardSyncId, boolean unsupportedP @BeforeEach void beforeEach() throws Exception { - dataSource = DatabaseConnectionHelper.createDataSource(container); - dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); - flyway = FlywayFactory.create(dataSource, StandardSyncPersistenceE2ETest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER, - ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); - database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(true); truncateAllTables(); standardSyncPersistence = new StandardSyncPersistence(database); configRepository = new ConfigRepository(database); } - @AfterEach - void afterEach() throws Exception { - dslContext.close(); - DataSourceFactory.close(dataSource); - } - @Test void testClearUnsupportedProtocolVersionFlagFromSource() throws IOException, JsonValidationException, SQLException { createBaseObjects(); diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java index 4e85864e61cd4..78b5ce3a24895 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java @@ -16,34 +16,27 @@ import io.airbyte.config.State; import io.airbyte.config.StateType; import io.airbyte.config.StateWrapper; -import io.airbyte.db.ExceptionWrappingDatabase; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.FlywayFactory; import io.airbyte.db.init.DatabaseInitializationException; -import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; -import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider; import io.airbyte.protocol.models.AirbyteGlobalState; import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.AirbyteStreamState; import io.airbyte.protocol.models.StreamDescriptor; -import io.airbyte.test.utils.DatabaseConnectionHelper; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; +import java.sql.SQLException; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; import org.jooq.JSONB; -import org.jooq.SQLDialect; import org.jooq.impl.DSL; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -class StatePersistenceTest extends BaseDatabaseConfigPersistenceTest { +class StatePersistenceTest extends BaseConfigDatabaseTest { private ConfigRepository configRepository; private StatePersistence statePersistence; @@ -55,6 +48,36 @@ class StatePersistenceTest extends BaseDatabaseConfigPersistenceTest { private static final String GLOBAL_STATE = "\"my global state\""; private static final String STATE = "state"; + @BeforeEach + void beforeEach() throws DatabaseInitializationException, IOException, JsonValidationException, SQLException { + truncateAllTables(); + + setupTestData(); + statePersistence = new StatePersistence(database); + } + + private void setupTestData() throws JsonValidationException, IOException { + configRepository = new ConfigRepository(database); + + final StandardWorkspace workspace = MockData.standardWorkspaces().get(0); + final StandardSourceDefinition sourceDefinition = MockData.publicSourceDefinition(); + final SourceConnection sourceConnection = MockData.sourceConnections().get(0); + final StandardDestinationDefinition destinationDefinition = MockData.publicDestinationDefinition(); + final DestinationConnection destinationConnection = MockData.destinationConnections().get(0); + final StandardSync sync = MockData.standardSyncs().get(0); + + configRepository.writeStandardWorkspaceNoSecrets(workspace); + configRepository.writeStandardSourceDefinition(sourceDefinition); + configRepository.writeSourceConnectionNoSecrets(sourceConnection); + configRepository.writeStandardDestinationDefinition(destinationDefinition); + configRepository.writeDestinationConnectionNoSecrets(destinationConnection); + configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(0)); + configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(1)); + configRepository.writeStandardSync(sync); + + connectionId = sync.getConnectionId(); + } + @Test void testReadingNonExistingState() throws IOException { Assertions.assertTrue(statePersistence.getCurrentState(UUID.randomUUID()).isEmpty()); @@ -444,7 +467,7 @@ void testStreamFullReset() throws IOException { } @Test - void testInconsistentTypeUpdates() throws IOException { + void testInconsistentTypeUpdates() throws IOException, SQLException { final StateWrapper streamState = new StateWrapper() .withStateType(StateType.STREAM) .withStateMessages(Arrays.asList( @@ -479,10 +502,13 @@ void testInconsistentTypeUpdates() throws IOException { // We should be guarded against those cases let's make sure we don't make things worse if we're in // an inconsistent state - dslContext.insertInto(DSL.table(STATE)) - .columns(DSL.field("id"), DSL.field("connection_id"), DSL.field("type"), DSL.field(STATE)) - .values(UUID.randomUUID(), connectionId, io.airbyte.db.instance.configs.jooq.generated.enums.StateType.GLOBAL, JSONB.valueOf("{}")) - .execute(); + database.transaction(ctx -> { + ctx.insertInto(DSL.table(STATE)) + .columns(DSL.field("id"), DSL.field("connection_id"), DSL.field("type"), DSL.field(STATE)) + .values(UUID.randomUUID(), connectionId, io.airbyte.db.instance.configs.jooq.generated.enums.StateType.GLOBAL, JSONB.valueOf("{}")) + .execute(); + return null; + }); Assertions.assertThrows(IllegalStateException.class, () -> statePersistence.updateOrCreateState(connectionId, streamState)); Assertions.assertThrows(IllegalStateException.class, () -> statePersistence.getCurrentState(connectionId)); } @@ -497,79 +523,22 @@ void testEnumsConversion() { } @Test - void testStatePersistenceLegacyReadConsistency() throws IOException { - final JsonNode jsonState = Jsons.deserialize("{\"my\": \"state\"}"); - final State state = new State().withState(jsonState); - configRepository.updateConnectionState(connectionId, state); - - final StateWrapper readStateWrapper = statePersistence.getCurrentState(connectionId).orElseThrow(); - Assertions.assertEquals(StateType.LEGACY, readStateWrapper.getStateType()); - Assertions.assertEquals(state.getState(), readStateWrapper.getLegacyState()); - } - - @Test - void testStatePersistenceLegacyWriteConsistency() throws IOException { + void testStatePersistenceLegacyWriteConsistency() throws IOException, SQLException { final JsonNode jsonState = Jsons.deserialize("{\"my\": \"state\"}"); final StateWrapper stateWrapper = new StateWrapper().withStateType(StateType.LEGACY).withLegacyState(jsonState); statePersistence.updateOrCreateState(connectionId, stateWrapper); // Making sure we still follow the legacy format - final List readStates = dslContext - .selectFrom(STATE) + final List readStates = database.transaction(ctx -> ctx.selectFrom(STATE) .where(DSL.field("connection_id").eq(connectionId)) .fetch().map(r -> Jsons.deserialize(r.get(DSL.field(STATE, JSONB.class)).data(), State.class)) - .stream().toList(); + .stream() + .toList()); Assertions.assertEquals(1, readStates.size()); Assertions.assertEquals(readStates.get(0).getState(), stateWrapper.getLegacyState()); } - @BeforeEach - void beforeEach() throws DatabaseInitializationException, IOException, JsonValidationException { - dataSource = DatabaseConnectionHelper.createDataSource(container); - dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); - flyway = FlywayFactory.create(dataSource, StatePersistenceTest.class.getName(), - ConfigsDatabaseMigrator.DB_IDENTIFIER, ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); - database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(true); - setupTestData(); - - statePersistence = new StatePersistence(database); - } - - @AfterEach - void afterEach() { - // Making sure we reset between tests - dslContext.dropSchemaIfExists("public").cascade().execute(); - dslContext.createSchema("public").execute(); - dslContext.setSchema("public").execute(); - } - - private void setupTestData() throws JsonValidationException, IOException { - configRepository = new ConfigRepository( - new DatabaseConfigPersistence(database), - database, - new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)), - new StandardSyncPersistence(database)); - - final StandardWorkspace workspace = MockData.standardWorkspaces().get(0); - final StandardSourceDefinition sourceDefinition = MockData.publicSourceDefinition(); - final SourceConnection sourceConnection = MockData.sourceConnections().get(0); - final StandardDestinationDefinition destinationDefinition = MockData.publicDestinationDefinition(); - final DestinationConnection destinationConnection = MockData.destinationConnections().get(0); - final StandardSync sync = MockData.standardSyncs().get(0); - - configRepository.writeStandardWorkspaceNoSecrets(workspace); - configRepository.writeStandardSourceDefinition(sourceDefinition); - configRepository.writeSourceConnectionNoSecrets(sourceConnection); - configRepository.writeStandardDestinationDefinition(destinationDefinition); - configRepository.writeDestinationConnectionNoSecrets(destinationConnection); - configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(0)); - configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(1)); - configRepository.writeStandardSync(sync); - - connectionId = sync.getConnectionId(); - } - private StateWrapper clone(final StateWrapper state) { return switch (state.getStateType()) { case LEGACY -> new StateWrapper() diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StreamResetPersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StreamResetPersistenceTest.java index f121051430a9b..b4c266f6da36f 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StreamResetPersistenceTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StreamResetPersistenceTest.java @@ -8,49 +8,25 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.spy; -import io.airbyte.db.factory.DSLContextFactory; -import io.airbyte.db.factory.DataSourceFactory; -import io.airbyte.db.factory.FlywayFactory; -import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; -import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider; -import io.airbyte.db.instance.development.DevDatabaseMigrator; -import io.airbyte.db.instance.development.MigrationDevHelper; import io.airbyte.protocol.models.StreamDescriptor; -import io.airbyte.test.utils.DatabaseConnectionHelper; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class StreamResetPersistenceTest extends BaseDatabaseConfigPersistenceTest { +class StreamResetPersistenceTest extends BaseConfigDatabaseTest { static StreamResetPersistence streamResetPersistence; private static final Logger LOGGER = LoggerFactory.getLogger(StreamResetPersistenceTest.class); @BeforeEach public void setup() throws Exception { - dataSource = DatabaseConnectionHelper.createDataSource(container); - dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); - flyway = FlywayFactory.create(dataSource, StreamResetPersistenceTest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER, - ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); - database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false); - streamResetPersistence = spy(new StreamResetPersistence(database)); - final ConfigsDatabaseMigrator configsDatabaseMigrator = - new ConfigsDatabaseMigrator(database, flyway); - final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator); - MigrationDevHelper.runLastMigration(devDatabaseMigrator); truncateAllTables(); - } - @AfterEach - void tearDown() throws Exception { - dslContext.close(); - DataSourceFactory.close(dataSource); + streamResetPersistence = spy(new StreamResetPersistence(database)); } @Test @@ -62,15 +38,15 @@ void testCreateSameResetTwiceOnlyCreateItOnce() throws Exception { streamResetPersistence.createStreamResets(connectionId, List.of(streamDescriptor1, streamDescriptor2)); final List result = streamResetPersistence.getStreamResets(connectionId); - LOGGER.info(String.valueOf(dslContext.selectFrom("stream_reset").fetch())); + LOGGER.info(database.query(ctx -> ctx.selectFrom("stream_reset").fetch().toString())); assertEquals(2, result.size()); streamResetPersistence.createStreamResets(connectionId, List.of(streamDescriptor1)); - LOGGER.info(String.valueOf(dslContext.selectFrom("stream_reset").fetch())); + LOGGER.info(database.query(ctx -> ctx.selectFrom("stream_reset").fetch().toString())); assertEquals(2, streamResetPersistence.getStreamResets(connectionId).size()); streamResetPersistence.createStreamResets(connectionId, List.of(streamDescriptor2)); - LOGGER.info(String.valueOf(dslContext.selectFrom("stream_reset").fetch())); + LOGGER.info(database.query(ctx -> ctx.selectFrom("stream_reset").fetch().toString())); assertEquals(2, streamResetPersistence.getStreamResets(connectionId).size()); } diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseTestProvider.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseTestProvider.java index 98f09191e776a..1c26d28c8287c 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseTestProvider.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseTestProvider.java @@ -41,8 +41,7 @@ public Database create(final boolean runMigration) throws IOException, DatabaseI final Database database = new Database(dslContext); if (runMigration) { - final DatabaseMigrator migrator = new ConfigsDatabaseMigrator( - database, flyway); + final DatabaseMigrator migrator = new ConfigsDatabaseMigrator(database, flyway); migrator.createBaseline(); migrator.migrate(); } else {