Skip to content

Refactor database initialization logic #12961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.check;

import static org.jooq.impl.DSL.select;

import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.Database;
import java.util.Optional;
import java.util.function.Function;
import org.jooq.DSLContext;
import org.slf4j.Logger;

/**
* Performs a check to verify that the configured database is available.
*/
public interface DatabaseAvailabilityCheck extends DatabaseCheck {

/**
* The number of times to check if the database is available. TODO replace with a default value in a
* value injection annotation
*/
int NUM_POLL_TIMES = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a constant, and therefore should be final (maybe not super important given the comment above saying that this will be replaced)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a constant in an interface, so marking it final doesn't do anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, excuse my tunnel vision here 🤦


/**
* Checks whether the configured database is available.
*
* @throws InterruptedException if unable to perform the check.
*/
default void check() throws InterruptedException {
var initialized = false;
var totalTime = 0;
final var sleepTime = getTimeoutMs() / NUM_POLL_TIMES;

while (!initialized) {
getLogger().warn("Waiting for database to become available...");
if (totalTime >= getTimeoutMs()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a library for retrying (like https://resilience4j.readme.io/docs/retry). Instead of a custom retry policy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau Definitely. However, for this pass, I am trying to simply copy/move the existing logic to the new classes to make it easier to verify like behavior. Once we get this in place, we can do follow up work to improve this logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would most likely push us to use Failsafe instead of Resilience4j, mainly because it is a bit more modern and is supported by the OpenAPI generation, should we decide to turn it on there too.

throw new InterruptedException("Unable to connect to the database.");
}

final Optional<DSLContext> dslContext = getDslContext();

if(dslContext.isPresent()) {
final Database database = new Database(dslContext.get());
initialized = isDatabaseConnected(getDatabaseName()).apply(database);
if (!initialized) {
getLogger().info("Database is not ready yet. Please wait a moment, it might still be initializing...");
Exceptions.toRuntime(() -> Thread.sleep(sleepTime));
totalTime += sleepTime;
}
} else {
throw new InterruptedException("Database configuration not present.");
}
}
}

/**
* Generates a {@link Function} that is used to test if a connection can be made to the database by
* verifying that the {@code information_schema.tables} tables has been populated.
*
* @param databaseName The name of the database to test.
* @return A {@link Function} that can be invoked to test if the database is available.
*/
default Function<Database, Boolean> isDatabaseConnected(final String databaseName) {
return database -> {
try {
getLogger().info("Testing {} database connection...", databaseName);
return database.query(ctx -> ctx.fetchExists(select().from("information_schema.tables")));
} catch (final Exception e) {
getLogger().error("Failed to verify database connection.", e);
return false;
}
};
}

/**
* Retrieves the configured database name to be tested.
*
* @return The name of the database to test.
*/
String getDatabaseName();

/**
* Retrieves the configured {@link DSLContext} to be used to test the database availability.
*
* @return The configured {@link DSLContext} object.
*/
Optional<DSLContext> getDslContext();

/**
* Retrieves the configured {@link Logger} object to be used to record progress of the migration
* check.
*
* @return The configured {@link Logger} object.
*/
Logger getLogger();

/**
* Retrieves the timeout in milliseconds for the check. Once this timeout is exceeded, the check
* will fail with an {@link InterruptedException}.
*
* @return The timeout in milliseconds for the check.
*/
long getTimeoutMs();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.check;

/**
* Defines the interface for performing checks against a database.
*/
public interface DatabaseCheck {

/**
* Checks whether the configured database is available.
*
* @throws InterruptedException if unable to perform the check.
*/
void check() throws InterruptedException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.check;

import java.util.Optional;
import org.flywaydb.core.Flyway;
import org.slf4j.Logger;

/**
* Performs a check to verify that the configured database has been migrated to the appropriate
* version.
*/
public interface DatabaseMigrationCheck {

/**
* The number of times to check if the database has been migrated to the required schema version.
* TODO replace with a default value in a value injection annotation
*/
int NUM_POLL_TIMES = 10;

/**
* Checks whether the configured database has been migrated to the required minimum schema version.
*
* @throws InterruptedException if unable to perform the check.
*/
default void check() throws InterruptedException {
final var startTime = System.currentTimeMillis();
final var sleepTime = getTimeoutMs() / NUM_POLL_TIMES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about the use of the library for retrying

final Optional<Flyway> flywayOptional = getFlyway();

if(flywayOptional.isPresent()) {
final var flyway = flywayOptional.get();
var currDatabaseMigrationVersion = flyway.info().current().getVersion().getVersion();
getLogger().info("Current database migration version {}.", currDatabaseMigrationVersion);
getLogger().info("Minimum Flyway version required {}.", getMinimumFlywayVersion());

while (currDatabaseMigrationVersion.compareTo(getMinimumFlywayVersion()) < 0) {
if (System.currentTimeMillis() - startTime >= getTimeoutMs()) {
throw new InterruptedException("Timeout while waiting for database to fulfill minimum flyway migration version..");
}

Thread.sleep(sleepTime);
currDatabaseMigrationVersion = flyway.info().current().getVersion().getVersion();
}
getLogger().info("Verified that database has been migrated to the required minimum version {}.", getTimeoutMs());
} else {
throw new InterruptedException("Flyway configuration not present.");
}
}

/**
* Retrieves the configured {@link Flyway} object to be used to check the migration status of the
* database.
*
* @return The configured {@link Flyway} object.
*/
Optional<Flyway> getFlyway();

/**
* Retrieves the configured {@link Logger} object to be used to record progress of the migration
* check.
*
* @return The configured {@link Logger} object.
*/
Logger getLogger();

/**
* Retrieves the required minimum migration version of the schema.
*
* @return The required minimum migration version of the schema.
*/
String getMinimumFlywayVersion();

/**
* Retrieves the timeout in milliseconds for the check. Once this timeout is exceeded, the check
* will fail with an {@link InterruptedException}.
*
* @return The timeout in milliseconds for the check.
*/
long getTimeoutMs();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.check.impl;

import io.airbyte.db.check.DatabaseAvailabilityCheck;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.util.Optional;
import org.jooq.DSLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of the {@link DatabaseAvailabilityCheck} for the Configurations database.
*/
public class ConfigsDatabaseAvailabilityCheck implements DatabaseAvailabilityCheck {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigsDatabaseAvailabilityCheck.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, we can use the @slf4j instead of manually instantiate the logger.


// TODO inject via dependency injection framework
private final DSLContext dslContext;

// TODO inject via dependency injection framework
private final long timeoutMs;

public ConfigsDatabaseAvailabilityCheck(final DSLContext dslContext, final long timeoutMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can use @AllArgConstructor to generate this constructor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau This constructor is most likely temporary and will be removed when we add dependency injection. At that point, we can just annotate the member fields with the injection information and can remove the constructor entirely.

this.dslContext = dslContext;
this.timeoutMs = timeoutMs;
}

@Override
public String getDatabaseName() {
return ConfigsDatabaseInstance.DATABASE_LOGGING_NAME;
}

@Override
public Optional<DSLContext> getDslContext() {
return Optional.ofNullable(dslContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we allow a to have a null dslContext? it should be either mocked or a real context? Could we use something like https://projectlombok.org/features/NonNull to validate it in the constructor? This can be compatible with the constructor annotation as well: https://projectlombok.org/features/constructor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't. However, when we move to dependency injection, there is no guarantee that the DSLContext bean will exist. Therefore, I am trying to be defensive here so we can raise a meaningful exception that clearly states what the issue is vs just throwing a NullPointerException and not being obvious.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't. However, when we move to dependency injection, there is no guarantee that the DSLContext bean will exist.

This seems like it would be a recurring problem in many places, right? Does this mean we will need to wrap every bean in an optional and have explicit logic that checks if it exists and throws an error if not?

It seems like this is something that the framework should be able to enforce, i.e. bean X is required, and a meaningful exception should be thrown automatically if it does not exist at runtime. Maybe I am misunderstanding something here?

}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be generated with a @Getter annotation

public Logger getLogger() {
return LOGGER;
}

@Override
public long getTimeoutMs() {
return timeoutMs;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.check.impl;

import io.airbyte.db.check.DatabaseMigrationCheck;
import java.util.Optional;
import org.flywaydb.core.Flyway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of the {@link DatabaseMigrationCheck} for the Configurations database.
*/
public class ConfigsDatabaseMigrationCheck implements DatabaseMigrationCheck {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same comment about leveraging lombok to make this class less verbose. Let me know if this would break the compatibility with micronaut.


private static final Logger LOGGER = LoggerFactory.getLogger(ConfigsDatabaseMigrationCheck.class);

// TODO inject via dependency injection framework
private final Flyway flyway;

// TODO inject via dependency injection framework
private final String minimumFlywayVersion;

// TODO inject via dependency injection framework
private final long timeoutMs;

public ConfigsDatabaseMigrationCheck(final Flyway flyway, final String minimumFlywayVersion, final long timeoutMs) {
this.flyway = flyway;
this.minimumFlywayVersion = minimumFlywayVersion;
this.timeoutMs = timeoutMs;
}

@Override
public Optional<Flyway> getFlyway() {
return Optional.ofNullable(flyway);
}

@Override
public Logger getLogger() {
return LOGGER;
}

@Override
public String getMinimumFlywayVersion() {
return minimumFlywayVersion;
}

@Override
public long getTimeoutMs() {
return timeoutMs;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.check.impl;

import io.airbyte.db.check.DatabaseAvailabilityCheck;
import java.util.Optional;
import org.jooq.DSLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of the {@link DatabaseAvailabilityCheck} for the Jobs database.
*/
public class JobsDatabaseAvailabilityCheck implements DatabaseAvailabilityCheck {

private static final Logger LOGGER = LoggerFactory.getLogger(JobsDatabaseAvailabilityCheck.class);

private static final String DATABASE_NAME = "airbyte jobs";

// TODO inject via dependency injection framework
private final DSLContext dslContext;

// TODO inject via dependency injection framework
private final long timeoutMs;

public JobsDatabaseAvailabilityCheck(final DSLContext dslContext, final long timeoutMs) {
this.dslContext = dslContext;
this.timeoutMs = timeoutMs;
}

@Override
public String getDatabaseName() {
return DATABASE_NAME;
}

@Override
public Optional<DSLContext> getDslContext() {
return Optional.ofNullable(dslContext);
}

@Override
public Logger getLogger() {
return LOGGER;
}

@Override
public long getTimeoutMs() {
return timeoutMs;
}

}
Loading