-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Refactor database initialization logic #12961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
58bbb11
edb8f7f
9cce6f2
18f0d7a
9cd38b0
16bac65
b3fb5d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.check; | ||
|
||
import static org.jooq.impl.DSL.select; | ||
|
||
import io.airbyte.commons.lang.Exceptions; | ||
import io.airbyte.db.Database; | ||
import java.util.Optional; | ||
import java.util.function.Function; | ||
import org.jooq.DSLContext; | ||
import org.slf4j.Logger; | ||
|
||
/** | ||
* Performs a check to verify that the configured database is available. | ||
*/ | ||
public interface DatabaseAvailabilityCheck extends DatabaseCheck { | ||
|
||
/** | ||
* The number of times to check if the database is available. TODO replace with a default value in a | ||
* value injection annotation | ||
*/ | ||
int NUM_POLL_TIMES = 10; | ||
|
||
/** | ||
* Checks whether the configured database is available. | ||
* | ||
* @throws InterruptedException if unable to perform the check. | ||
*/ | ||
default void check() throws InterruptedException { | ||
var initialized = false; | ||
var totalTime = 0; | ||
final var sleepTime = getTimeoutMs() / NUM_POLL_TIMES; | ||
|
||
while (!initialized) { | ||
getLogger().warn("Waiting for database to become available..."); | ||
if (totalTime >= getTimeoutMs()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we use a library for retrying (like https://resilience4j.readme.io/docs/retry). Instead of a custom retry policy? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benmoriceau Definitely. However, for this pass, I am trying to simply copy/move the existing logic to the new classes to make it easier to verify like behavior. Once we get this in place, we can do follow up work to improve this logic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would most likely push us to use Failsafe instead of Resilience4j, mainly because it is a bit more modern and is supported by the OpenAPI generation, should we decide to turn it on there too. |
||
throw new InterruptedException("Unable to connect to the database."); | ||
} | ||
|
||
final Optional<DSLContext> dslContext = getDslContext(); | ||
|
||
if(dslContext.isPresent()) { | ||
final Database database = new Database(dslContext.get()); | ||
initialized = isDatabaseConnected(getDatabaseName()).apply(database); | ||
if (!initialized) { | ||
getLogger().info("Database is not ready yet. Please wait a moment, it might still be initializing..."); | ||
Exceptions.toRuntime(() -> Thread.sleep(sleepTime)); | ||
jdpgrailsdev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
totalTime += sleepTime; | ||
} | ||
} else { | ||
throw new InterruptedException("Database configuration not present."); | ||
jdpgrailsdev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
|
||
/** | ||
* Generates a {@link Function} that is used to test if a connection can be made to the database by | ||
* verifying that the {@code information_schema.tables} tables has been populated. | ||
* | ||
* @param databaseName The name of the database to test. | ||
* @return A {@link Function} that can be invoked to test if the database is available. | ||
*/ | ||
default Function<Database, Boolean> isDatabaseConnected(final String databaseName) { | ||
return database -> { | ||
try { | ||
getLogger().info("Testing {} database connection...", databaseName); | ||
return database.query(ctx -> ctx.fetchExists(select().from("information_schema.tables"))); | ||
} catch (final Exception e) { | ||
getLogger().error("Failed to verify database connection.", e); | ||
return false; | ||
} | ||
}; | ||
} | ||
|
||
/** | ||
* Retrieves the configured database name to be tested. | ||
* | ||
* @return The name of the database to test. | ||
*/ | ||
String getDatabaseName(); | ||
|
||
/** | ||
* Retrieves the configured {@link DSLContext} to be used to test the database availability. | ||
* | ||
* @return The configured {@link DSLContext} object. | ||
*/ | ||
Optional<DSLContext> getDslContext(); | ||
|
||
/** | ||
* Retrieves the configured {@link Logger} object to be used to record progress of the migration | ||
* check. | ||
* | ||
* @return The configured {@link Logger} object. | ||
*/ | ||
Logger getLogger(); | ||
|
||
/** | ||
* Retrieves the timeout in milliseconds for the check. Once this timeout is exceeded, the check | ||
* will fail with an {@link InterruptedException}. | ||
* | ||
* @return The timeout in milliseconds for the check. | ||
*/ | ||
long getTimeoutMs(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.check; | ||
|
||
/** | ||
* Defines the interface for performing checks against a database. | ||
*/ | ||
public interface DatabaseCheck { | ||
|
||
/** | ||
* Checks whether the configured database is available. | ||
* | ||
* @throws InterruptedException if unable to perform the check. | ||
*/ | ||
void check() throws InterruptedException; | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.check; | ||
|
||
import java.util.Optional; | ||
import org.flywaydb.core.Flyway; | ||
import org.slf4j.Logger; | ||
|
||
/** | ||
* Performs a check to verify that the configured database has been migrated to the appropriate | ||
* version. | ||
*/ | ||
public interface DatabaseMigrationCheck { | ||
|
||
/** | ||
* The number of times to check if the database has been migrated to the required schema version. | ||
* TODO replace with a default value in a value injection annotation | ||
*/ | ||
int NUM_POLL_TIMES = 10; | ||
|
||
/** | ||
* Checks whether the configured database has been migrated to the required minimum schema version. | ||
* | ||
* @throws InterruptedException if unable to perform the check. | ||
*/ | ||
default void check() throws InterruptedException { | ||
final var startTime = System.currentTimeMillis(); | ||
final var sleepTime = getTimeoutMs() / NUM_POLL_TIMES; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment about the use of the library for retrying |
||
final Optional<Flyway> flywayOptional = getFlyway(); | ||
|
||
if(flywayOptional.isPresent()) { | ||
final var flyway = flywayOptional.get(); | ||
var currDatabaseMigrationVersion = flyway.info().current().getVersion().getVersion(); | ||
getLogger().info("Current database migration version {}.", currDatabaseMigrationVersion); | ||
getLogger().info("Minimum Flyway version required {}.", getMinimumFlywayVersion()); | ||
|
||
while (currDatabaseMigrationVersion.compareTo(getMinimumFlywayVersion()) < 0) { | ||
if (System.currentTimeMillis() - startTime >= getTimeoutMs()) { | ||
throw new InterruptedException("Timeout while waiting for database to fulfill minimum flyway migration version.."); | ||
} | ||
|
||
Thread.sleep(sleepTime); | ||
currDatabaseMigrationVersion = flyway.info().current().getVersion().getVersion(); | ||
} | ||
getLogger().info("Verified that database has been migrated to the required minimum version {}.", getTimeoutMs()); | ||
} else { | ||
throw new InterruptedException("Flyway configuration not present."); | ||
} | ||
} | ||
|
||
/** | ||
* Retrieves the configured {@link Flyway} object to be used to check the migration status of the | ||
* database. | ||
* | ||
* @return The configured {@link Flyway} object. | ||
*/ | ||
Optional<Flyway> getFlyway(); | ||
|
||
/** | ||
* Retrieves the configured {@link Logger} object to be used to record progress of the migration | ||
* check. | ||
* | ||
* @return The configured {@link Logger} object. | ||
*/ | ||
Logger getLogger(); | ||
|
||
/** | ||
* Retrieves the required minimum migration version of the schema. | ||
* | ||
* @return The required minimum migration version of the schema. | ||
*/ | ||
String getMinimumFlywayVersion(); | ||
|
||
/** | ||
* Retrieves the timeout in milliseconds for the check. Once this timeout is exceeded, the check | ||
* will fail with an {@link InterruptedException}. | ||
* | ||
* @return The timeout in milliseconds for the check. | ||
*/ | ||
long getTimeoutMs(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.check.impl; | ||
|
||
import io.airbyte.db.check.DatabaseAvailabilityCheck; | ||
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; | ||
import java.util.Optional; | ||
import org.jooq.DSLContext; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* Implementation of the {@link DatabaseAvailabilityCheck} for the Configurations database. | ||
*/ | ||
public class ConfigsDatabaseAvailabilityCheck implements DatabaseAvailabilityCheck { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigsDatabaseAvailabilityCheck.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, we can use the @slf4j instead of manually instantiate the logger. |
||
|
||
// TODO inject via dependency injection framework | ||
private final DSLContext dslContext; | ||
|
||
// TODO inject via dependency injection framework | ||
private final long timeoutMs; | ||
|
||
public ConfigsDatabaseAvailabilityCheck(final DSLContext dslContext, final long timeoutMs) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we can use @AllArgConstructor to generate this constructor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benmoriceau This constructor is most likely temporary and will be removed when we add dependency injection. At that point, we can just annotate the member fields with the injection information and can remove the constructor entirely. |
||
this.dslContext = dslContext; | ||
this.timeoutMs = timeoutMs; | ||
} | ||
|
||
@Override | ||
public String getDatabaseName() { | ||
return ConfigsDatabaseInstance.DATABASE_LOGGING_NAME; | ||
} | ||
|
||
@Override | ||
public Optional<DSLContext> getDslContext() { | ||
return Optional.ofNullable(dslContext); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we allow a to have a null dslContext? it should be either mocked or a real context? Could we use something like https://projectlombok.org/features/NonNull to validate it in the constructor? This can be compatible with the constructor annotation as well: https://projectlombok.org/features/constructor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't. However, when we move to dependency injection, there is no guarantee that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This seems like it would be a recurring problem in many places, right? Does this mean we will need to wrap every bean in an optional and have explicit logic that checks if it exists and throws an error if not? It seems like this is something that the framework should be able to enforce, i.e. bean X is required, and a meaningful exception should be thrown automatically if it does not exist at runtime. Maybe I am misunderstanding something here? |
||
} | ||
|
||
@Override | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be generated with a @Getter annotation |
||
public Logger getLogger() { | ||
return LOGGER; | ||
} | ||
|
||
@Override | ||
public long getTimeoutMs() { | ||
return timeoutMs; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.check.impl; | ||
|
||
import io.airbyte.db.check.DatabaseMigrationCheck; | ||
import java.util.Optional; | ||
import org.flywaydb.core.Flyway; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* Implementation of the {@link DatabaseMigrationCheck} for the Configurations database. | ||
*/ | ||
public class ConfigsDatabaseMigrationCheck implements DatabaseMigrationCheck { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have the same comment about leveraging lombok to make this class less verbose. Let me know if this would break the compatibility with micronaut. |
||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigsDatabaseMigrationCheck.class); | ||
|
||
// TODO inject via dependency injection framework | ||
private final Flyway flyway; | ||
|
||
// TODO inject via dependency injection framework | ||
private final String minimumFlywayVersion; | ||
|
||
// TODO inject via dependency injection framework | ||
private final long timeoutMs; | ||
|
||
public ConfigsDatabaseMigrationCheck(final Flyway flyway, final String minimumFlywayVersion, final long timeoutMs) { | ||
this.flyway = flyway; | ||
this.minimumFlywayVersion = minimumFlywayVersion; | ||
this.timeoutMs = timeoutMs; | ||
} | ||
|
||
@Override | ||
public Optional<Flyway> getFlyway() { | ||
return Optional.ofNullable(flyway); | ||
} | ||
|
||
@Override | ||
public Logger getLogger() { | ||
return LOGGER; | ||
} | ||
|
||
@Override | ||
public String getMinimumFlywayVersion() { | ||
return minimumFlywayVersion; | ||
} | ||
|
||
@Override | ||
public long getTimeoutMs() { | ||
return timeoutMs; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.check.impl; | ||
|
||
import io.airbyte.db.check.DatabaseAvailabilityCheck; | ||
import java.util.Optional; | ||
import org.jooq.DSLContext; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* Implementation of the {@link DatabaseAvailabilityCheck} for the Jobs database. | ||
*/ | ||
public class JobsDatabaseAvailabilityCheck implements DatabaseAvailabilityCheck { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(JobsDatabaseAvailabilityCheck.class); | ||
|
||
private static final String DATABASE_NAME = "airbyte jobs"; | ||
|
||
// TODO inject via dependency injection framework | ||
private final DSLContext dslContext; | ||
|
||
// TODO inject via dependency injection framework | ||
private final long timeoutMs; | ||
|
||
public JobsDatabaseAvailabilityCheck(final DSLContext dslContext, final long timeoutMs) { | ||
this.dslContext = dslContext; | ||
this.timeoutMs = timeoutMs; | ||
} | ||
|
||
@Override | ||
public String getDatabaseName() { | ||
return DATABASE_NAME; | ||
} | ||
|
||
@Override | ||
public Optional<DSLContext> getDslContext() { | ||
return Optional.ofNullable(dslContext); | ||
} | ||
|
||
@Override | ||
public Logger getLogger() { | ||
return LOGGER; | ||
} | ||
|
||
@Override | ||
public long getTimeoutMs() { | ||
return timeoutMs; | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a constant, and therefore should be
final
(maybe not super important given the comment above saying that this will be replaced)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a constant in an interface, so marking it
final
doesn't do anything.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, excuse my tunnel vision here 🤦