Skip to content

🔪 Deprecate file-based migration system #6077

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
*/
public class AirbyteVersion {

/**
* We moved the configs from the data volume to the configs database, and introduced Flyway
* migration in v0.29. The data volume and the file-based migration are deprecated in v0.30. Users
* must first upgrade to the last version before v0.30 to migrate their configs from the data
* volume, and integrate with the new Flyway system.
*/
public static final AirbyteVersion MINIMUM_REQUIRED_VERSION = new AirbyteVersion("0.29.17-alpha");

private static final String DEV_VERSION = "dev";
public static final String AIRBYTE_VERSION_KEY_NAME = "airbyte_version";

Expand All @@ -39,6 +47,9 @@ public class AirbyteVersion {
private final String minor;
private final String patch;

/**
* Expected input format: x.y.z-w
*/
public AirbyteVersion(final String version) {
Preconditions.checkNotNull(version);
this.version = version;
Expand Down Expand Up @@ -112,6 +123,10 @@ public int patchVersionCompareTo(final AirbyteVersion another) {
return compareVersion(patch, another.patch);
}

public boolean compatibleWithMinRequiredVersion() {
return this.patchVersionCompareTo(MINIMUM_REQUIRED_VERSION) >= 0;
}

/**
* Version string needs to be converted to integer for comparison, because string comparison does
* not handle version string with different digits correctly. For example:
Expand All @@ -132,8 +147,9 @@ public static String getErrorMessage(final String version1, final String version
final String cleanVersion2 = version2.replace("\n", "").strip();
return String.format(
"Version mismatch between %s and %s.\n" +
"Please upgrade or reset your Airbyte Database, see more at https://docs.airbyte.io/operator-guides/upgrading-airbyte",
cleanVersion1, cleanVersion2);
"Please upgrade to at least %s, or reset your Airbyte Database.\n" +
"See more at https://docs.airbyte.io/operator-guides/upgrading-airbyte",
cleanVersion1, cleanVersion2, MINIMUM_REQUIRED_VERSION.getVersion());
}

public static boolean isCompatible(final String v1, final String v2) {
Expand Down
92 changes: 15 additions & 77 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.ConfigSeedProvider;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.YamlSeedConfigPersistence;
import io.airbyte.db.Database;
import io.airbyte.db.instance.DatabaseMigrator;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
Expand All @@ -54,7 +52,6 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.InvalidInputExceptionMapper;
import io.airbyte.server.errors.InvalidJsonExceptionMapper;
import io.airbyte.server.errors.InvalidJsonInputExceptionMapper;
Expand Down Expand Up @@ -85,12 +82,7 @@ public class ServerApp implements ServerRunnable {

private static final Logger LOGGER = LoggerFactory.getLogger(ServerApp.class);
private static final int PORT = 8001;
/**
* We can't support automatic migration for kube before this version because we had a bug in kube
* which would cause airbyte db to erase state upon termination, as a result the automatic migration
* wouldn't run
*/
private static final AirbyteVersion KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION = new AirbyteVersion("0.26.5-alpha");

private final String airbyteVersion;
private final Set<Class<?>> customComponentClasses;
private final Set<Object> customComponents;
Expand Down Expand Up @@ -223,84 +215,30 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex
final SynchronousSchedulerClient bucketSpecCacheSchedulerClient =
new BucketSpecCacheSchedulerClient(syncSchedulerClient, configs.getSpecCacheBucket());
final SpecCachingSynchronousSchedulerClient cachingSchedulerClient = new SpecCachingSynchronousSchedulerClient(bucketSpecCacheSchedulerClient);
final SpecFetcher specFetcher = new SpecFetcher(cachingSchedulerClient);

Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion();
if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) {
final boolean isKubernetes = configs.getWorkerEnvironment() == WorkerEnvironment.KUBERNETES;
final boolean versionSupportsAutoMigrate =
new AirbyteVersion(airbyteDatabaseVersion.get()).patchVersionCompareTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION) >= 0;
if (!isKubernetes || versionSupportsAutoMigrate) {
runAutomaticMigration(configRepository, jobPersistence, specFetcher, airbyteVersion, airbyteDatabaseVersion.get());
// After migration, upgrade the DB version
airbyteDatabaseVersion = jobPersistence.getVersion();
} else {
LOGGER.info("Can not run automatic migration for Airbyte on KUBERNETES before version " + KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION.getVersion());
}

Optional<String> dbVersion = jobPersistence.getVersion();
if (dbVersion.isPresent() && !new AirbyteVersion(dbVersion.get()).compatibleWithMinRequiredVersion()) {
return new VersionMismatchServer(airbyteVersion, dbVersion.get(), PORT);
}

runFlywayMigration(configs, configDatabase, jobDatabase);

if (airbyteDatabaseVersion.isPresent() && AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion.get())) {
LOGGER.info("Starting server...");

return apiFactory.create(
schedulerJobClient,
cachingSchedulerClient,
temporalService,
configRepository,
jobPersistence,
configDatabase,
jobDatabase,
configs);
} else {
LOGGER.info("Start serving version mismatch errors. Automatic migration either failed or didn't run");
return new VersionMismatchServer(airbyteVersion, airbyteDatabaseVersion.orElseThrow(), PORT);
}
LOGGER.info("Starting server...");
return apiFactory.create(
schedulerJobClient,
cachingSchedulerClient,
temporalService,
configRepository,
jobPersistence,
configDatabase,
jobDatabase,
configs);
}

public static void main(final String[] args) throws Exception {
getServer(new ServerFactory.Api()).start();
}

/**
* Ideally when automatic migration runs, we should make sure that we acquire a lock on database and
* no other operation is allowed
*/
private static void runAutomaticMigration(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final SpecFetcher specFetcher,
final String airbyteVersion,
final String airbyteDatabaseVersion) {
LOGGER.info("Running Automatic Migration from version : " + airbyteDatabaseVersion + " to version : " + airbyteVersion);
try (final RunMigration runMigration = new RunMigration(
jobPersistence,
configRepository,
airbyteVersion,
YamlSeedConfigPersistence.get(),
specFetcher)) {
runMigration.run();
} catch (final Exception e) {
LOGGER.error("Automatic Migration failed ", e);
}
}

public static boolean isDatabaseVersionBehindAppVersion(final String airbyteVersion, final String airbyteDatabaseVersion) {
final boolean bothVersionsCompatible = AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion);
if (bothVersionsCompatible) {
return false;
}

final AirbyteVersion serverVersion = new AirbyteVersion(airbyteVersion);
final AirbyteVersion databaseVersion = new AirbyteVersion(airbyteDatabaseVersion);

if (databaseVersion.getMajorVersion().compareTo(serverVersion.getMajorVersion()) < 0) {
return true;
}

return databaseVersion.getMinorVersion().compareTo(serverVersion.getMinorVersion()) < 0;
}

private static void runFlywayMigration(final Configs configs, final Database configDatabase, final Database jobDatabase) {
final DatabaseMigrator configDbMigrator = new ConfigsDatabaseMigrator(configDatabase, ServerApp.class.getSimpleName());
final DatabaseMigrator jobDbMigrator = new JobsDatabaseMigrator(jobDatabase, ServerApp.class.getSimpleName());
Expand Down