diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/check/DatabaseAvailabilityCheck.java b/airbyte-db/lib/src/main/java/io/airbyte/db/check/DatabaseAvailabilityCheck.java new file mode 100644 index 0000000000000..ee665dd70b9c6 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/check/DatabaseAvailabilityCheck.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check; + +import static org.jooq.impl.DSL.select; + +import io.airbyte.db.Database; +import java.util.Optional; +import java.util.function.Function; +import org.jooq.DSLContext; +import org.slf4j.Logger; + +/** + * Performs a check to verify that the configured database is available. + */ +public interface DatabaseAvailabilityCheck extends DatabaseCheck { + + /** + * The number of times to check if the database is available. TODO replace with a default value in a + * value injection annotation + */ + int NUM_POLL_TIMES = 10; + + /** + * Checks whether the configured database is available. + * + * @throws DatabaseCheckException if unable to perform the check. + */ + default void check() throws DatabaseCheckException { + var initialized = false; + var totalTime = 0; + final var sleepTime = getTimeoutMs() / NUM_POLL_TIMES; + + while (!initialized) { + getLogger().warn("Waiting for database to become available..."); + if (totalTime >= getTimeoutMs()) { + throw new DatabaseCheckException("Unable to connect to the database."); + } + + final Optional dslContext = getDslContext(); + + if (dslContext.isPresent()) { + final Database database = new Database(dslContext.get()); + initialized = isDatabaseConnected(getDatabaseName()).apply(database); + if (!initialized) { + getLogger().info("Database is not ready yet. Please wait a moment, it might still be initializing..."); + try { + Thread.sleep(sleepTime); + } catch (final InterruptedException e) { + throw new DatabaseCheckException("Unable to wait for database to be ready.", e); + } + totalTime += sleepTime; + } + } else { + throw new DatabaseCheckException("Database configuration not present."); + } + } + } + + /** + * Generates a {@link Function} that is used to test if a connection can be made to the database by + * verifying that the {@code information_schema.tables} tables has been populated. + * + * @param databaseName The name of the database to test. + * @return A {@link Function} that can be invoked to test if the database is available. + */ + default Function isDatabaseConnected(final String databaseName) { + return database -> { + try { + getLogger().info("Testing {} database connection...", databaseName); + return database.query(ctx -> ctx.fetchExists(select().from("information_schema.tables"))); + } catch (final Exception e) { + getLogger().error("Failed to verify database connection.", e); + return false; + } + }; + } + + /** + * Retrieves the configured database name to be tested. + * + * @return The name of the database to test. + */ + String getDatabaseName(); + + /** + * Retrieves the configured {@link DSLContext} to be used to test the database availability. + * + * @return The configured {@link DSLContext} object. + */ + Optional getDslContext(); + + /** + * Retrieves the configured {@link Logger} object to be used to record progress of the migration + * check. + * + * @return The configured {@link Logger} object. + */ + Logger getLogger(); + + /** + * Retrieves the timeout in milliseconds for the check. Once this timeout is exceeded, the check + * will fail with an {@link InterruptedException}. + * + * @return The timeout in milliseconds for the check. + */ + long getTimeoutMs(); + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/check/DatabaseCheck.java b/airbyte-db/lib/src/main/java/io/airbyte/db/check/DatabaseCheck.java new file mode 100644 index 0000000000000..a78297588a0e8 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/check/DatabaseCheck.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check; + +/** + * Defines the interface for performing checks against a database. + */ +public interface DatabaseCheck { + + /** + * Checks whether the configured database is available. + * + * @throws DatabaseCheckException if unable to perform the check. + */ + void check() throws DatabaseCheckException; + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/check/DatabaseCheckException.java b/airbyte-db/lib/src/main/java/io/airbyte/db/check/DatabaseCheckException.java new file mode 100644 index 0000000000000..e7b96e4e606fb --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/check/DatabaseCheckException.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check; + +/** + * Custom exception that represents a failure that occurs during an attempt to check the + * availability or migration status of a database. + */ +public class DatabaseCheckException extends Exception { + + public DatabaseCheckException(final String message) { + super(message); + } + + public DatabaseCheckException(final String message, final Throwable cause) { + super(message, cause); + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/check/DatabaseMigrationCheck.java b/airbyte-db/lib/src/main/java/io/airbyte/db/check/DatabaseMigrationCheck.java new file mode 100644 index 0000000000000..1cf2864b1fd78 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/check/DatabaseMigrationCheck.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check; + +import java.util.Optional; +import org.flywaydb.core.Flyway; +import org.slf4j.Logger; + +/** + * Performs a check to verify that the configured database has been migrated to the appropriate + * version. + */ +public interface DatabaseMigrationCheck { + + /** + * The number of times to check if the database has been migrated to the required schema version. + * TODO replace with a default value in a value injection annotation + */ + int NUM_POLL_TIMES = 10; + + /** + * Checks whether the configured database has been migrated to the required minimum schema version. + * + * @throws DatabaseCheckException if unable to perform the check. + */ + default void check() throws DatabaseCheckException { + final var startTime = System.currentTimeMillis(); + final var sleepTime = getTimeoutMs() / NUM_POLL_TIMES; + final Optional flywayOptional = getFlyway(); + + if (flywayOptional.isPresent()) { + final var flyway = flywayOptional.get(); + var currDatabaseMigrationVersion = flyway.info().current().getVersion().getVersion(); + getLogger().info("Current database migration version {}.", currDatabaseMigrationVersion); + getLogger().info("Minimum Flyway version required {}.", getMinimumFlywayVersion()); + + while (currDatabaseMigrationVersion.compareTo(getMinimumFlywayVersion()) < 0) { + if (System.currentTimeMillis() - startTime >= getTimeoutMs()) { + throw new DatabaseCheckException("Timeout while waiting for database to fulfill minimum flyway migration version.."); + } + + try { + Thread.sleep(sleepTime); + } catch (final InterruptedException e) { + throw new DatabaseCheckException("Unable to wait for database to be migrated.", e); + } + currDatabaseMigrationVersion = flyway.info().current().getVersion().getVersion(); + } + getLogger().info("Verified that database has been migrated to the required minimum version {}.", getTimeoutMs()); + } else { + throw new DatabaseCheckException("Flyway configuration not present."); + } + } + + /** + * Retrieves the configured {@link Flyway} object to be used to check the migration status of the + * database. + * + * @return The configured {@link Flyway} object. + */ + Optional getFlyway(); + + /** + * Retrieves the configured {@link Logger} object to be used to record progress of the migration + * check. + * + * @return The configured {@link Logger} object. + */ + Logger getLogger(); + + /** + * Retrieves the required minimum migration version of the schema. + * + * @return The required minimum migration version of the schema. + */ + String getMinimumFlywayVersion(); + + /** + * Retrieves the timeout in milliseconds for the check. Once this timeout is exceeded, the check + * will fail with an {@link InterruptedException}. + * + * @return The timeout in milliseconds for the check. + */ + long getTimeoutMs(); + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/check/impl/ConfigsDatabaseAvailabilityCheck.java b/airbyte-db/lib/src/main/java/io/airbyte/db/check/impl/ConfigsDatabaseAvailabilityCheck.java new file mode 100644 index 0000000000000..fdbe46cc361ca --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/check/impl/ConfigsDatabaseAvailabilityCheck.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check.impl; + +import io.airbyte.db.check.DatabaseAvailabilityCheck; +import io.airbyte.db.instance.DatabaseConstants; +import java.util.Optional; +import org.jooq.DSLContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of the {@link DatabaseAvailabilityCheck} for the Configurations database. + */ +public class ConfigsDatabaseAvailabilityCheck implements DatabaseAvailabilityCheck { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigsDatabaseAvailabilityCheck.class); + + // TODO inject via dependency injection framework + private final DSLContext dslContext; + + // TODO inject via dependency injection framework + private final long timeoutMs; + + public ConfigsDatabaseAvailabilityCheck(final DSLContext dslContext, final long timeoutMs) { + this.dslContext = dslContext; + this.timeoutMs = timeoutMs; + } + + @Override + public String getDatabaseName() { + return DatabaseConstants.CONFIGS_DATABASE_LOGGING_NAME; + } + + @Override + public Optional getDslContext() { + return Optional.ofNullable(dslContext); + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public long getTimeoutMs() { + return timeoutMs; + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/check/impl/ConfigsDatabaseMigrationCheck.java b/airbyte-db/lib/src/main/java/io/airbyte/db/check/impl/ConfigsDatabaseMigrationCheck.java new file mode 100644 index 0000000000000..95f832afe5699 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/check/impl/ConfigsDatabaseMigrationCheck.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check.impl; + +import io.airbyte.db.check.DatabaseMigrationCheck; +import java.util.Optional; +import org.flywaydb.core.Flyway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of the {@link DatabaseMigrationCheck} for the Configurations database. + */ +public class ConfigsDatabaseMigrationCheck implements DatabaseMigrationCheck { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigsDatabaseMigrationCheck.class); + + // TODO inject via dependency injection framework + private final Flyway flyway; + + // TODO inject via dependency injection framework + private final String minimumFlywayVersion; + + // TODO inject via dependency injection framework + private final long timeoutMs; + + public ConfigsDatabaseMigrationCheck(final Flyway flyway, final String minimumFlywayVersion, final long timeoutMs) { + this.flyway = flyway; + this.minimumFlywayVersion = minimumFlywayVersion; + this.timeoutMs = timeoutMs; + } + + @Override + public Optional getFlyway() { + return Optional.ofNullable(flyway); + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public String getMinimumFlywayVersion() { + return minimumFlywayVersion; + } + + @Override + public long getTimeoutMs() { + return timeoutMs; + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/check/impl/JobsDatabaseAvailabilityCheck.java b/airbyte-db/lib/src/main/java/io/airbyte/db/check/impl/JobsDatabaseAvailabilityCheck.java new file mode 100644 index 0000000000000..4fd289e8a42cb --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/check/impl/JobsDatabaseAvailabilityCheck.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check.impl; + +import io.airbyte.db.check.DatabaseAvailabilityCheck; +import io.airbyte.db.instance.DatabaseConstants; +import java.util.Optional; +import org.jooq.DSLContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of the {@link DatabaseAvailabilityCheck} for the Jobs database. + */ +public class JobsDatabaseAvailabilityCheck implements DatabaseAvailabilityCheck { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobsDatabaseAvailabilityCheck.class); + + // TODO inject via dependency injection framework + private final DSLContext dslContext; + + // TODO inject via dependency injection framework + private final long timeoutMs; + + public JobsDatabaseAvailabilityCheck(final DSLContext dslContext, final long timeoutMs) { + this.dslContext = dslContext; + this.timeoutMs = timeoutMs; + } + + @Override + public String getDatabaseName() { + return DatabaseConstants.JOBS_DATABASE_LOGGING_NAME; + } + + @Override + public Optional getDslContext() { + return Optional.ofNullable(dslContext); + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public long getTimeoutMs() { + return timeoutMs; + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/check/impl/JobsDatabaseMigrationCheck.java b/airbyte-db/lib/src/main/java/io/airbyte/db/check/impl/JobsDatabaseMigrationCheck.java new file mode 100644 index 0000000000000..33bb6789c4279 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/check/impl/JobsDatabaseMigrationCheck.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check.impl; + +import io.airbyte.db.check.DatabaseMigrationCheck; +import java.util.Optional; +import org.flywaydb.core.Flyway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of the {@link DatabaseMigrationCheck} for the Jobs database. + */ +public class JobsDatabaseMigrationCheck implements DatabaseMigrationCheck { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobsDatabaseMigrationCheck.class); + + // TODO inject via dependency injection framework + private final Flyway flyway; + + // TODO inject via dependency injection framework + private final String minimumFlywayVersion; + + // TODO inject via dependency injection framework + private final long timeoutMs; + + public JobsDatabaseMigrationCheck(final Flyway flyway, final String minimumFlywayVersion, final long timeoutMs) { + this.flyway = flyway; + this.minimumFlywayVersion = minimumFlywayVersion; + this.timeoutMs = timeoutMs; + } + + @Override + public Optional getFlyway() { + return Optional.ofNullable(flyway); + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public String getMinimumFlywayVersion() { + return minimumFlywayVersion; + } + + @Override + public long getTimeoutMs() { + return timeoutMs; + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/init/DatabaseInitializationException.java b/airbyte-db/lib/src/main/java/io/airbyte/db/init/DatabaseInitializationException.java new file mode 100644 index 0000000000000..7408aefd5b6cd --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/init/DatabaseInitializationException.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.init; + +/** + * Custom exception that represents a failure that occurs during an attempt to initialize a + * database. + */ +public class DatabaseInitializationException extends Exception { + + public DatabaseInitializationException(final String message) { + super(message); + } + + public DatabaseInitializationException(final String message, final Throwable cause) { + super(message, cause); + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/init/DatabaseInitializer.java b/airbyte-db/lib/src/main/java/io/airbyte/db/init/DatabaseInitializer.java new file mode 100644 index 0000000000000..070d438612e50 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/init/DatabaseInitializer.java @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.init; + +import static org.jooq.impl.DSL.select; + +import io.airbyte.db.Database; +import io.airbyte.db.ExceptionWrappingDatabase; +import io.airbyte.db.check.DatabaseAvailabilityCheck; +import io.airbyte.db.check.DatabaseCheckException; +import java.io.IOException; +import java.util.Collection; +import java.util.Optional; +import org.jooq.DSLContext; +import org.jooq.impl.DSL; +import org.slf4j.Logger; + +/** + * Performs the initialization of the configured database if the database is available and has not + * yet been initialized. + * + * In the future, this logic could be completely removed if the schema initialization script is + * converted to a migration script. + */ +public interface DatabaseInitializer { + + /** + * Initializes the configured database. + * + * @throws DatabaseInitializationException if unable to perform the initialization. + */ + default void init() throws DatabaseInitializationException { + initialize(); + } + + /** + * Initializes the configured database by using the following steps: + * + *
    + *
  1. Verify that the database is available and accepting connections
  2. + *
  3. Verify that the database is populated with the initial schema. If not, create the initial + * schema.
  4. + *
+ * + * @throws DatabaseInitializationException if unable to verify the database availability. + */ + default void initialize() throws DatabaseInitializationException { + // Verify that the database is up and reachable first + final Optional availabilityCheck = getDatabaseAvailabilityCheck(); + if (availabilityCheck.isPresent()) { + try { + availabilityCheck.get().check(); + final Optional dslContext = getDslContext(); + if (dslContext.isPresent()) { + final Database database = new Database(dslContext.get()); + new ExceptionWrappingDatabase(database).transaction(this::initializeSchema); + } else { + throw new DatabaseInitializationException("Database configuration not present."); + } + } catch (final DatabaseCheckException | IOException e) { + throw new DatabaseInitializationException("Database availability check failed.", e); + } + } else { + throw new DatabaseInitializationException("Availability check not configured."); + } + } + + /** + * Tests whether the provided table exists in the database. + * + * @param ctx A {@link DSLContext} used to query the database. + * @param tableName The name of the table. + * @return {@code True} if the table exists or {@code false} otherwise. + */ + default boolean hasTable(final DSLContext ctx, final String tableName) { + return ctx.fetchExists(select() + .from("information_schema.tables") + .where(DSL.field("table_name").eq(tableName) + .and(DSL.field("table_schema").eq("public")))); + } + + /** + * Initializes the schema in the database represented by the provided {@link DSLContext} instance. + * + * If the initial tables already exist in the database, initialization is skipped. Otherwise, the + * script provided by the {@link #getInitialSchema()} method is executed against the database. + * + * @param ctx The {@link DSLContext} used to execute the schema initialization. + * @return {@code true} indicating that the operation ran + */ + default boolean initializeSchema(final DSLContext ctx) { + final Optional> tableNames = getTableNames(); + + if (tableNames.isPresent()) { + // Verify that all the required tables are present + if (tableNames.get().stream().allMatch(tableName -> hasTable(ctx, tableName))) { + getLogger().info("The {} database is initialized", getDatabaseName()); + } else { + getLogger().info("The {} database has not been initialized; initializing it with schema: \n{}", getDatabaseName(), + getInitialSchema()); + ctx.execute(getInitialSchema()); + getLogger().info("The {} database successfully initialized with schema: \n{}.", getDatabaseName(), getInitialSchema()); + } + return true; + } else { + getLogger().warn("Initial collection of table names is empty. Cannot perform schema check."); + return false; + } + } + + /** + * Retrieves the {@link DatabaseAvailabilityCheck} used to verify that the database is running and + * available. + * + * @return The {@link DatabaseAvailabilityCheck}. + */ + Optional getDatabaseAvailabilityCheck(); + + /** + * Retrieves the configured database name to be tested. + * + * @return The name of the database to test. + */ + String getDatabaseName(); + + /** + * Retrieves the configured {@link DSLContext} to be used to test the database availability. + * + * @return The configured {@link DSLContext} object. + */ + Optional getDslContext(); + + /** + * Retrieve the initial schema to be applied to the database if the database is not already + * populated with the expected table(s). + * + * @return The initial schema. + */ + String getInitialSchema(); + + /** + * Retrieves the configured {@link Logger} object to be used to record progress of the migration + * check. + * + * @return The configured {@link Logger} object. + */ + Logger getLogger(); + + /** + * The collection of table names that will be used to confirm database availability. + * + * @return The collection of database table names. + */ + Optional> getTableNames(); + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/init/impl/ConfigsDatabaseInitializer.java b/airbyte-db/lib/src/main/java/io/airbyte/db/init/impl/ConfigsDatabaseInitializer.java new file mode 100644 index 0000000000000..02ec8b32fdf65 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/init/impl/ConfigsDatabaseInitializer.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.init.impl; + +import io.airbyte.db.check.DatabaseAvailabilityCheck; +import io.airbyte.db.init.DatabaseInitializer; +import io.airbyte.db.instance.DatabaseConstants; +import java.util.Collection; +import java.util.Optional; +import org.jooq.DSLContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of the {@link DatabaseInitializer} for the Configurations database that creates + * the schema if it does not currently exist. + */ +public class ConfigsDatabaseInitializer implements DatabaseInitializer { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigsDatabaseInitializer.class); + + // TODO inject via dependency injection framework + private final DatabaseAvailabilityCheck databaseAvailablityCheck; + + // TODO inject via dependency injection framework + private final DSLContext dslContext; + + // TODO inject via dependency injection framework + private final String initialSchema; + + public ConfigsDatabaseInitializer(final DatabaseAvailabilityCheck databaseAvailablityCheck, + final DSLContext dslContext, + final String initialSchema) { + this.databaseAvailablityCheck = databaseAvailablityCheck; + this.dslContext = dslContext; + this.initialSchema = initialSchema; + } + + @Override + public Optional getDatabaseAvailabilityCheck() { + return Optional.ofNullable(databaseAvailablityCheck); + } + + @Override + public String getDatabaseName() { + return DatabaseConstants.CONFIGS_DATABASE_LOGGING_NAME; + } + + @Override + public Optional getDslContext() { + return Optional.ofNullable(dslContext); + } + + @Override + public String getInitialSchema() { + return initialSchema; + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public Optional> getTableNames() { + return Optional.of(DatabaseConstants.CONFIGS_INITIAL_EXPECTED_TABLES); + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/init/impl/JobsDatabaseInitializer.java b/airbyte-db/lib/src/main/java/io/airbyte/db/init/impl/JobsDatabaseInitializer.java new file mode 100644 index 0000000000000..d3b8721238b9d --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/init/impl/JobsDatabaseInitializer.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.init.impl; + +import io.airbyte.db.check.DatabaseAvailabilityCheck; +import io.airbyte.db.init.DatabaseInitializer; +import io.airbyte.db.instance.DatabaseConstants; +import java.util.Collection; +import java.util.Optional; +import org.jooq.DSLContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of the {@link DatabaseInitializer} for the Jobs database that creates the schema + * if it does not currently exist. + */ +public class JobsDatabaseInitializer implements DatabaseInitializer { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobsDatabaseInitializer.class); + + // TODO inject via dependency injection framework + private final DatabaseAvailabilityCheck databaseAvailablityCheck; + + // TODO inject via dependency injection framework + private final DSLContext dslContext; + + // TODO inject via dependency injection framework + private final String initialSchema; + + public JobsDatabaseInitializer(final DatabaseAvailabilityCheck databaseAvailablityCheck, + final DSLContext dslContext, + final String initialSchema) { + this.databaseAvailablityCheck = databaseAvailablityCheck; + this.dslContext = dslContext; + this.initialSchema = initialSchema; + } + + @Override + public Optional getDatabaseAvailabilityCheck() { + return Optional.ofNullable(databaseAvailablityCheck); + } + + @Override + public String getDatabaseName() { + return DatabaseConstants.JOBS_DATABASE_LOGGING_NAME; + } + + @Override + public Optional getDslContext() { + return Optional.ofNullable(dslContext); + } + + @Override + public String getInitialSchema() { + return initialSchema; + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public Optional> getTableNames() { + return Optional.of(DatabaseConstants.JOBS_INITIAL_EXPECTED_TABLES); + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/DatabaseConstants.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/DatabaseConstants.java new file mode 100644 index 0000000000000..bbdcfb4bcf92a --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/DatabaseConstants.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance; + +import io.airbyte.db.instance.jobs.JobsDatabaseSchema; +import java.util.Collections; +import java.util.Set; + +/** + * Collection of database related constants. + */ +public final class DatabaseConstants { + + /** + * Logical name of the Configurations database. + */ + public static final String CONFIGS_DATABASE_LOGGING_NAME = "airbyte configs"; + + /** + * Collection of tables expected to be present in the Configurations database after creation. + */ + public static final Set CONFIGS_INITIAL_EXPECTED_TABLES = Collections.singleton("airbyte_configs"); + + /** + * Path to the script that contains the initial schema definition for the Configurations database. + */ + public static final String CONFIGS_SCHEMA_PATH = "configs_database/schema.sql"; + + /** + * Logical name of the Jobs database. + */ + public static final String JOBS_DATABASE_LOGGING_NAME = "airbyte jobs"; + + /** + * Collection of tables expected to be present in the Jobs database after creation. + */ + public static final Set JOBS_INITIAL_EXPECTED_TABLES = JobsDatabaseSchema.getTableNames(); + + /** + * Path to the script that contains the initial schema definition for the Jobs database. + */ + public static final String JOBS_SCHEMA_PATH = "jobs_database/schema.sql"; + + /** + * Private constructor to prevent instantiation. + */ + private DatabaseConstants() {} + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/DatabaseInstance.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/DatabaseInstance.java index ff9425279a31a..8223283206554 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/DatabaseInstance.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/DatabaseInstance.java @@ -11,17 +11,29 @@ public interface DatabaseInstance { /** * Check is a database has been initialized. + * + * @deprecated Will be removed in future versions that separate the initialization from + * creation/retrieval of an instance. */ + @Deprecated(forRemoval = true) boolean isInitialized() throws IOException; /** * Get a database that has been initialized and is ready to use. + * + * @deprecated Will be replaced in future versions that separate the initialization from + * creation/retrieval of an instance. */ + @Deprecated(forRemoval = true) Database getInitialized(); /** * Get an empty database and initialize it. + * + * @deprecated Will be replaced in future versions that separate the initialization from + * creation/retrieval of an instance. */ + @Deprecated(forRemoval = true) Database getAndInitialize() throws IOException; } diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/MinimumFlywayMigrationVersionCheck.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/MinimumFlywayMigrationVersionCheck.java index bf37bf7323941..06d6479ab7584 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/MinimumFlywayMigrationVersionCheck.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/MinimumFlywayMigrationVersionCheck.java @@ -17,7 +17,11 @@ * start interacting with the database. *

* Methods have dynamic pool times, and have configurable timeouts. + * + * @deprecated This class has been marked as deprecated as we move to using an application framework + * to manage resources. This class will be removed in a future release. */ +@Deprecated(forRemoval = true) public class MinimumFlywayMigrationVersionCheck { // Exposed so applications have a default timeout variable. diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseInstance.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseInstance.java index e1ff55f2935c4..dd356bbede70f 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseInstance.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseInstance.java @@ -8,10 +8,9 @@ import io.airbyte.db.Database; import io.airbyte.db.ExceptionWrappingDatabase; import io.airbyte.db.instance.BaseDatabaseInstance; +import io.airbyte.db.instance.DatabaseConstants; import io.airbyte.db.instance.DatabaseInstance; import java.io.IOException; -import java.util.Collections; -import java.util.Set; import java.util.function.Function; import org.jooq.DSLContext; import org.slf4j.Logger; @@ -20,9 +19,7 @@ public class ConfigsDatabaseInstance extends BaseDatabaseInstance implements DatabaseInstance { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigsDatabaseInstance.class); - private static final Set INITIAL_EXPECTED_TABLES = Collections.singleton("airbyte_configs"); - private static final String DATABASE_LOGGING_NAME = "airbyte configs"; - private static final String SCHEMA_PATH = "configs_database/schema.sql"; + private static final Function IS_CONFIGS_DATABASE_READY = database -> { try { LOGGER.info("Testing if airbyte_configs has been created and seeded..."); @@ -39,7 +36,9 @@ public class ConfigsDatabaseInstance extends BaseDatabaseInstance implements Dat private Database database; public ConfigsDatabaseInstance(final DSLContext dslContext) throws IOException { - super(dslContext, DATABASE_LOGGING_NAME, MoreResources.readResource(SCHEMA_PATH), INITIAL_EXPECTED_TABLES, + super(dslContext, DatabaseConstants.CONFIGS_DATABASE_LOGGING_NAME, + MoreResources.readResource(DatabaseConstants.CONFIGS_SCHEMA_PATH), + DatabaseConstants.CONFIGS_INITIAL_EXPECTED_TABLES, IS_CONFIGS_DATABASE_READY); } diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/jobs/JobsDatabaseInstance.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/jobs/JobsDatabaseInstance.java index 8d378e07f8445..4a658191b0272 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/jobs/JobsDatabaseInstance.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/jobs/JobsDatabaseInstance.java @@ -8,6 +8,7 @@ import io.airbyte.commons.resources.MoreResources; import io.airbyte.db.Database; import io.airbyte.db.instance.BaseDatabaseInstance; +import io.airbyte.db.instance.DatabaseConstants; import io.airbyte.db.instance.DatabaseInstance; import java.io.IOException; import java.util.function.Function; @@ -19,8 +20,6 @@ public class JobsDatabaseInstance extends BaseDatabaseInstance implements Databa private static final Logger LOGGER = LoggerFactory.getLogger(JobsDatabaseInstance.class); - private static final String DATABASE_LOGGING_NAME = "airbyte jobs"; - private static final String SCHEMA_PATH = "jobs_database/schema.sql"; private static final Function IS_JOBS_DATABASE_READY = database -> { try { LOGGER.info("Testing if jobs database is ready..."); @@ -32,11 +31,12 @@ public class JobsDatabaseInstance extends BaseDatabaseInstance implements Databa @VisibleForTesting public JobsDatabaseInstance(final DSLContext dslContext, final String schema) { - super(dslContext, DATABASE_LOGGING_NAME, schema, JobsDatabaseSchema.getTableNames(), IS_JOBS_DATABASE_READY); + super(dslContext, DatabaseConstants.JOBS_DATABASE_LOGGING_NAME, schema, + DatabaseConstants.JOBS_INITIAL_EXPECTED_TABLES, IS_JOBS_DATABASE_READY); } public JobsDatabaseInstance(final DSLContext dslContext) throws IOException { - this(dslContext, MoreResources.readResource(SCHEMA_PATH)); + this(dslContext, MoreResources.readResource(DatabaseConstants.JOBS_SCHEMA_PATH)); } } diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/AbstractDatabaseAvailabilityCheckTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/AbstractDatabaseAvailabilityCheckTest.java new file mode 100644 index 0000000000000..c0e281268db07 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/AbstractDatabaseAvailabilityCheckTest.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check.impl; + +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DataSourceFactory; +import javax.sql.DataSource; +import org.jooq.DSLContext; +import org.jooq.SQLDialect; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.PostgreSQLContainer; + +/** + * Common test setup for database availability check tests. + */ +public abstract class AbstractDatabaseAvailabilityCheckTest { + + protected static final long TIMEOUT_MS = 500L; + + protected PostgreSQLContainer container; + + protected DataSource dataSource; + + protected DSLContext dslContext; + + @BeforeEach + void setup() { + container = new PostgreSQLContainer<>("postgres:13-alpine"); + container.start(); + + dataSource = DataSourceFactory.create(container.getUsername(), container.getPassword(), container.getDriverClassName(), container.getJdbcUrl()); + dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); + } + + @AfterEach + void cleanup() throws Exception { + DataSourceFactory.close(dataSource); + dslContext.close(); + container.stop(); + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/ConfigsDatabaseAvailabilityCheckTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/ConfigsDatabaseAvailabilityCheckTest.java new file mode 100644 index 0000000000000..c9a736899d512 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/ConfigsDatabaseAvailabilityCheckTest.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check.impl; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.airbyte.db.check.DatabaseCheckException; +import org.jooq.DSLContext; +import org.jooq.Select; +import org.jooq.exception.DataAccessException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Test suite for the {@link ConfigsDatabaseAvailabilityCheck} class. + */ +public class ConfigsDatabaseAvailabilityCheckTest extends AbstractDatabaseAvailabilityCheckTest { + + @Test + void checkDatabaseAvailability() { + final var check = new ConfigsDatabaseAvailabilityCheck(dslContext, TIMEOUT_MS); + Assertions.assertDoesNotThrow(() -> check.check()); + } + + @Test + void checkDatabaseAvailabilityTimeout() { + final DSLContext dslContext = mock(DSLContext.class); + when(dslContext.fetchExists(any(Select.class))).thenThrow(new DataAccessException("test")); + final var check = new ConfigsDatabaseAvailabilityCheck(dslContext, TIMEOUT_MS); + Assertions.assertThrows(DatabaseCheckException.class, () -> check.check()); + } + + @Test + void checkDatabaseAvailabilityNullDslContext() { + final var check = new ConfigsDatabaseAvailabilityCheck(null, TIMEOUT_MS); + Assertions.assertThrows(DatabaseCheckException.class, () -> check.check()); + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/ConfigsDatabaseMigrationCheckTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/ConfigsDatabaseMigrationCheckTest.java new file mode 100644 index 0000000000000..916f05ff5e441 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/ConfigsDatabaseMigrationCheckTest.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check.impl; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.airbyte.db.check.DatabaseCheckException; +import org.flywaydb.core.Flyway; +import org.flywaydb.core.api.MigrationInfo; +import org.flywaydb.core.api.MigrationInfoService; +import org.flywaydb.core.api.MigrationVersion; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Test suite for the {@link ConfigsDatabaseMigrationCheck} class. + */ +public class ConfigsDatabaseMigrationCheckTest { + + @Test + void testMigrationCheck() { + final var minimumVersion = "1.0.0"; + final var currentVersion = "1.2.3"; + final var migrationVersion = MigrationVersion.fromVersion(currentVersion); + final var migrationInfo = mock(MigrationInfo.class); + final var migrationInfoService = mock(MigrationInfoService.class); + final var flyway = mock(Flyway.class); + + when(migrationInfo.getVersion()).thenReturn(migrationVersion); + when(migrationInfoService.current()).thenReturn(migrationInfo); + when(flyway.info()).thenReturn(migrationInfoService); + + final var check = new ConfigsDatabaseMigrationCheck(flyway, minimumVersion, AbstractDatabaseAvailabilityCheckTest.TIMEOUT_MS); + Assertions.assertDoesNotThrow(() -> check.check()); + } + + @Test + void testMigrationCheckEqualVersion() { + final var minimumVersion = "1.2.3"; + final var currentVersion = minimumVersion; + final var migrationVersion = MigrationVersion.fromVersion(currentVersion); + final var migrationInfo = mock(MigrationInfo.class); + final var migrationInfoService = mock(MigrationInfoService.class); + final var flyway = mock(Flyway.class); + + when(migrationInfo.getVersion()).thenReturn(migrationVersion); + when(migrationInfoService.current()).thenReturn(migrationInfo); + when(flyway.info()).thenReturn(migrationInfoService); + + final var check = new ConfigsDatabaseMigrationCheck(flyway, minimumVersion, AbstractDatabaseAvailabilityCheckTest.TIMEOUT_MS); + Assertions.assertDoesNotThrow(() -> check.check()); + } + + @Test + void testMigrationCheckTimeout() { + final var minimumVersion = "2.0.0"; + final var currentVersion = "1.2.3"; + final var migrationVersion = MigrationVersion.fromVersion(currentVersion); + final var migrationInfo = mock(MigrationInfo.class); + final var migrationInfoService = mock(MigrationInfoService.class); + final var flyway = mock(Flyway.class); + + when(migrationInfo.getVersion()).thenReturn(migrationVersion); + when(migrationInfoService.current()).thenReturn(migrationInfo); + when(flyway.info()).thenReturn(migrationInfoService); + + final var check = new ConfigsDatabaseMigrationCheck(flyway, minimumVersion, AbstractDatabaseAvailabilityCheckTest.TIMEOUT_MS); + Assertions.assertThrows(DatabaseCheckException.class, () -> check.check()); + } + + @Test + void checkDatabaseAvailabilityNullFlyway() { + final var minimumVersion = "2.0.0"; + final var check = new ConfigsDatabaseMigrationCheck(null, minimumVersion, AbstractDatabaseAvailabilityCheckTest.TIMEOUT_MS); + Assertions.assertThrows(DatabaseCheckException.class, () -> check.check()); + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/JobsDatabaseAvailabilityCheckTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/JobsDatabaseAvailabilityCheckTest.java new file mode 100644 index 0000000000000..636c7092fbdd5 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/JobsDatabaseAvailabilityCheckTest.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check.impl; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.airbyte.db.check.DatabaseCheckException; +import org.jooq.DSLContext; +import org.jooq.Select; +import org.jooq.exception.DataAccessException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Test suite for the {@link JobsDatabaseAvailabilityCheck} class. + */ +public class JobsDatabaseAvailabilityCheckTest extends AbstractDatabaseAvailabilityCheckTest { + + @Test + void checkDatabaseAvailability() { + final var check = new JobsDatabaseAvailabilityCheck(dslContext, TIMEOUT_MS); + Assertions.assertDoesNotThrow(() -> check.check()); + } + + @Test + void checkDatabaseAvailabilityTimeout() { + final DSLContext dslContext = mock(DSLContext.class); + when(dslContext.fetchExists(any(Select.class))).thenThrow(new DataAccessException("test")); + final var check = new JobsDatabaseAvailabilityCheck(dslContext, TIMEOUT_MS); + Assertions.assertThrows(DatabaseCheckException.class, () -> check.check()); + } + + @Test + void checkDatabaseAvailabilityNullDslContext() { + final var check = new JobsDatabaseAvailabilityCheck(null, TIMEOUT_MS); + Assertions.assertThrows(DatabaseCheckException.class, () -> check.check()); + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/JobsDatabaseMigrationCheckTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/JobsDatabaseMigrationCheckTest.java new file mode 100644 index 0000000000000..6b8c8caf023ca --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/check/impl/JobsDatabaseMigrationCheckTest.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.check.impl; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.airbyte.db.check.DatabaseCheckException; +import org.flywaydb.core.Flyway; +import org.flywaydb.core.api.MigrationInfo; +import org.flywaydb.core.api.MigrationInfoService; +import org.flywaydb.core.api.MigrationVersion; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Test suite for the {@link JobsDatabaseMigrationCheck} class. + */ +public class JobsDatabaseMigrationCheckTest { + + @Test + void testMigrationCheck() { + final var minimumVersion = "1.0.0"; + final var currentVersion = "1.2.3"; + final var migrationVersion = MigrationVersion.fromVersion(currentVersion); + final var migrationInfo = mock(MigrationInfo.class); + final var migrationInfoService = mock(MigrationInfoService.class); + final var flyway = mock(Flyway.class); + + when(migrationInfo.getVersion()).thenReturn(migrationVersion); + when(migrationInfoService.current()).thenReturn(migrationInfo); + when(flyway.info()).thenReturn(migrationInfoService); + + final var check = new JobsDatabaseMigrationCheck(flyway, minimumVersion, AbstractDatabaseAvailabilityCheckTest.TIMEOUT_MS); + Assertions.assertDoesNotThrow(() -> check.check()); + } + + @Test + void testMigrationCheckEqualVersion() { + final var minimumVersion = "1.2.3"; + final var currentVersion = minimumVersion; + final var migrationVersion = MigrationVersion.fromVersion(currentVersion); + final var migrationInfo = mock(MigrationInfo.class); + final var migrationInfoService = mock(MigrationInfoService.class); + final var flyway = mock(Flyway.class); + + when(migrationInfo.getVersion()).thenReturn(migrationVersion); + when(migrationInfoService.current()).thenReturn(migrationInfo); + when(flyway.info()).thenReturn(migrationInfoService); + + final var check = new JobsDatabaseMigrationCheck(flyway, minimumVersion, AbstractDatabaseAvailabilityCheckTest.TIMEOUT_MS); + Assertions.assertDoesNotThrow(() -> check.check()); + } + + @Test + void testMigrationCheckTimeout() { + final var minimumVersion = "2.0.0"; + final var currentVersion = "1.2.3"; + final var migrationVersion = MigrationVersion.fromVersion(currentVersion); + final var migrationInfo = mock(MigrationInfo.class); + final var migrationInfoService = mock(MigrationInfoService.class); + final var flyway = mock(Flyway.class); + + when(migrationInfo.getVersion()).thenReturn(migrationVersion); + when(migrationInfoService.current()).thenReturn(migrationInfo); + when(flyway.info()).thenReturn(migrationInfoService); + + final var check = new JobsDatabaseMigrationCheck(flyway, minimumVersion, AbstractDatabaseAvailabilityCheckTest.TIMEOUT_MS); + Assertions.assertThrows(DatabaseCheckException.class, () -> check.check()); + } + + @Test + void checkDatabaseAvailabilityNullFlyway() { + final var minimumVersion = "2.0.0"; + final var check = new JobsDatabaseMigrationCheck(null, minimumVersion, AbstractDatabaseAvailabilityCheckTest.TIMEOUT_MS); + Assertions.assertThrows(DatabaseCheckException.class, () -> check.check()); + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/init/DatabaseInitializerTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/init/DatabaseInitializerTest.java new file mode 100644 index 0000000000000..6679fd06ff0ff --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/init/DatabaseInitializerTest.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.init; + +import static org.mockito.Mockito.mock; + +import io.airbyte.db.check.DatabaseAvailabilityCheck; +import java.util.Collection; +import java.util.Optional; +import org.jooq.DSLContext; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DatabaseInitializerTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseInitializerTest.class); + + @Test + void testExceptionHandling() { + final var initializer = new DatabaseInitializer() { + + @Override + public void initialize() throws DatabaseInitializationException { + throw new DatabaseInitializationException("test"); + } + + @Override + public Optional getDatabaseAvailabilityCheck() { + return Optional.empty(); + } + + @Override + public String getDatabaseName() { + return null; + } + + @Override + public Optional getDslContext() { + return Optional.empty(); + } + + @Override + public String getInitialSchema() { + return null; + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public Optional> getTableNames() { + return Optional.empty(); + } + + }; + + Assertions.assertThrows(DatabaseInitializationException.class, () -> initializer.init()); + } + + @Test + void testEmptyTableNames() { + final var dslContext = mock(DSLContext.class); + final var initializer = new DatabaseInitializer() { + + @Override + public Optional getDatabaseAvailabilityCheck() { + return Optional.of(mock(DatabaseAvailabilityCheck.class)); + } + + @Override + public String getDatabaseName() { + return null; + } + + @Override + public Optional getDslContext() { + return Optional.of(dslContext); + } + + @Override + public String getInitialSchema() { + return null; + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public Optional> getTableNames() { + return Optional.empty(); + } + + }; + + Assertions.assertEquals(false, initializer.initializeSchema(dslContext)); + Assertions.assertNotNull(initializer.getTableNames()); + Assertions.assertEquals(false, initializer.getTableNames().isPresent()); + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/init/impl/AbstractDatabaseInitializerTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/init/impl/AbstractDatabaseInitializerTest.java new file mode 100644 index 0000000000000..49782c0e4dc75 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/init/impl/AbstractDatabaseInitializerTest.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.init.impl; + +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DataSourceFactory; +import javax.sql.DataSource; +import org.jooq.DSLContext; +import org.jooq.SQLDialect; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.PostgreSQLContainer; + +/** + * Common test setup for database initialization tests. + */ +public class AbstractDatabaseInitializerTest { + + protected PostgreSQLContainer container; + + protected DataSource dataSource; + + protected DSLContext dslContext; + + @BeforeEach + void setup() { + container = new PostgreSQLContainer<>("postgres:13-alpine"); + container.start(); + + dataSource = DataSourceFactory.create(container.getUsername(), container.getPassword(), container.getDriverClassName(), container.getJdbcUrl()); + dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); + } + + @AfterEach + void cleanup() throws Exception { + DataSourceFactory.close(dataSource); + dslContext.close(); + container.stop(); + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/init/impl/ConfigsDatabaseInitializerTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/init/impl/ConfigsDatabaseInitializerTest.java new file mode 100644 index 0000000000000..85d556ba9a496 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/init/impl/ConfigsDatabaseInitializerTest.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.init.impl; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.db.check.DatabaseAvailabilityCheck; +import io.airbyte.db.check.DatabaseCheckException; +import io.airbyte.db.init.DatabaseInitializationException; +import io.airbyte.db.instance.DatabaseConstants; +import java.io.IOException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Test suite for the {@link ConfigsDatabaseInitializer} class. + */ +public class ConfigsDatabaseInitializerTest extends AbstractDatabaseInitializerTest { + + @Test + void testInitializingSchema() throws IOException { + final var databaseAvailabilityCheck = mock(DatabaseAvailabilityCheck.class); + final var initialSchema = MoreResources.readResource(DatabaseConstants.CONFIGS_SCHEMA_PATH); + final var initializer = new ConfigsDatabaseInitializer(databaseAvailabilityCheck, dslContext, initialSchema); + + Assertions.assertDoesNotThrow(() -> initializer.init()); + assertTrue(initializer.hasTable(dslContext, initializer.getTableNames().get().stream().findFirst().get())); + } + + @Test + void testInitializingSchemaAlreadyExists() throws IOException { + final var databaseAvailabilityCheck = mock(DatabaseAvailabilityCheck.class); + final var initialSchema = MoreResources.readResource(DatabaseConstants.CONFIGS_SCHEMA_PATH); + dslContext.execute(initialSchema); + final var initializer = new ConfigsDatabaseInitializer(databaseAvailabilityCheck, dslContext, initialSchema); + + Assertions.assertDoesNotThrow(() -> initializer.init()); + assertTrue(initializer.hasTable(dslContext, initializer.getTableNames().get().stream().findFirst().get())); + } + + @Test + void testInitializationException() throws IOException, DatabaseCheckException { + final var databaseAvailabilityCheck = mock(DatabaseAvailabilityCheck.class); + final var initialSchema = MoreResources.readResource(DatabaseConstants.CONFIGS_SCHEMA_PATH); + + doThrow(new DatabaseCheckException("test")).when(databaseAvailabilityCheck).check(); + + final var initializer = new ConfigsDatabaseInitializer(databaseAvailabilityCheck, dslContext, initialSchema); + Assertions.assertThrows(DatabaseInitializationException.class, () -> initializer.init()); + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/init/impl/JobsDatabaseInitializerTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/init/impl/JobsDatabaseInitializerTest.java new file mode 100644 index 0000000000000..fb903d2995078 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/init/impl/JobsDatabaseInitializerTest.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.init.impl; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.db.check.DatabaseAvailabilityCheck; +import io.airbyte.db.check.DatabaseCheckException; +import io.airbyte.db.init.DatabaseInitializationException; +import io.airbyte.db.instance.DatabaseConstants; +import java.io.IOException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Test suite for the {@link JobsDatabaseInitializer} class. + */ +public class JobsDatabaseInitializerTest extends AbstractDatabaseInitializerTest { + + @Test + void testInitializingSchema() throws IOException { + final var databaseAvailabilityCheck = mock(DatabaseAvailabilityCheck.class); + final var initialSchema = MoreResources.readResource(DatabaseConstants.JOBS_SCHEMA_PATH); + final var initializer = new JobsDatabaseInitializer(databaseAvailabilityCheck, dslContext, initialSchema); + + Assertions.assertDoesNotThrow(() -> initializer.init()); + assertTrue(initializer.hasTable(dslContext, initializer.getTableNames().get().stream().findFirst().get())); + } + + @Test + void testInitializingSchemaAlreadyExists() throws IOException { + final var databaseAvailabilityCheck = mock(DatabaseAvailabilityCheck.class); + final var initialSchema = MoreResources.readResource(DatabaseConstants.JOBS_SCHEMA_PATH); + dslContext.execute(initialSchema); + final var initializer = new JobsDatabaseInitializer(databaseAvailabilityCheck, dslContext, initialSchema); + + Assertions.assertDoesNotThrow(() -> initializer.init()); + assertTrue(initializer.hasTable(dslContext, initializer.getTableNames().get().stream().findFirst().get())); + } + + @Test + void testInitializationException() throws IOException, DatabaseCheckException { + final var databaseAvailabilityCheck = mock(DatabaseAvailabilityCheck.class); + final var initialSchema = MoreResources.readResource(DatabaseConstants.JOBS_SCHEMA_PATH); + + doThrow(new DatabaseCheckException("test")).when(databaseAvailabilityCheck).check(); + + final var initializer = new JobsDatabaseInitializer(databaseAvailabilityCheck, dslContext, initialSchema); + Assertions.assertThrows(DatabaseInitializationException.class, () -> initializer.init()); + } + + @Test + void testInitializationNullAvailabilityCheck() throws IOException { + final var initialSchema = MoreResources.readResource(DatabaseConstants.JOBS_SCHEMA_PATH); + final var initializer = new JobsDatabaseInitializer(null, dslContext, initialSchema); + Assertions.assertThrows(DatabaseInitializationException.class, () -> initializer.init()); + } + + @Test + void testInitializationNullDslContext() throws IOException { + final var databaseAvailabilityCheck = mock(DatabaseAvailabilityCheck.class); + final var initialSchema = MoreResources.readResource(DatabaseConstants.JOBS_SCHEMA_PATH); + final var initializer = new JobsDatabaseInitializer(databaseAvailabilityCheck, null, initialSchema); + Assertions.assertThrows(DatabaseInitializationException.class, () -> initializer.init()); + } + +}