diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/version/AirbyteVersion.java b/airbyte-commons/src/main/java/io/airbyte/commons/version/AirbyteVersion.java index 8e39e688589b2..d19215b691b3a 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/version/AirbyteVersion.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/version/AirbyteVersion.java @@ -31,6 +31,14 @@ */ public class AirbyteVersion { + /** + * We moved the configs from the data volume to the configs database, and introduced Flyway + * migration in v0.29. The data volume and the file-based migration are deprecated in v0.30. Users + * must first upgrade to the last version before v0.30 to migrate their configs from the data + * volume, and integrate with the new Flyway system. + */ + public static final AirbyteVersion MINIMUM_REQUIRED_VERSION = new AirbyteVersion("0.29.17-alpha"); + private static final String DEV_VERSION = "dev"; public static final String AIRBYTE_VERSION_KEY_NAME = "airbyte_version"; @@ -39,6 +47,9 @@ public class AirbyteVersion { private final String minor; private final String patch; + /** + * Expected input format: x.y.z-w + */ public AirbyteVersion(final String version) { Preconditions.checkNotNull(version); this.version = version; @@ -112,6 +123,10 @@ public int patchVersionCompareTo(final AirbyteVersion another) { return compareVersion(patch, another.patch); } + public boolean compatibleWithMinRequiredVersion() { + return this.patchVersionCompareTo(MINIMUM_REQUIRED_VERSION) >= 0; + } + /** * Version string needs to be converted to integer for comparison, because string comparison does * not handle version string with different digits correctly. For example: @@ -132,8 +147,9 @@ public static String getErrorMessage(final String version1, final String version final String cleanVersion2 = version2.replace("\n", "").strip(); return String.format( "Version mismatch between %s and %s.\n" + - "Please upgrade or reset your Airbyte Database, see more at https://docs.airbyte.io/operator-guides/upgrading-airbyte", - cleanVersion1, cleanVersion2); + "Please upgrade to at least %s, or reset your Airbyte Database.\n" + + "See more at https://docs.airbyte.io/operator-guides/upgrading-airbyte", + cleanVersion1, cleanVersion2, MINIMUM_REQUIRED_VERSION.getVersion()); } public static boolean isCompatible(final String v1, final String v2) { diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 997e872103653..365695565bbb8 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -29,14 +29,12 @@ import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; -import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.EnvConfigs; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.ConfigSeedProvider; import io.airbyte.config.persistence.DatabaseConfigPersistence; -import io.airbyte.config.persistence.YamlSeedConfigPersistence; import io.airbyte.db.Database; import io.airbyte.db.instance.DatabaseMigrator; import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; @@ -54,7 +52,6 @@ import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier; import io.airbyte.scheduler.persistence.job_tracker.JobTracker; -import io.airbyte.server.converters.SpecFetcher; import io.airbyte.server.errors.InvalidInputExceptionMapper; import io.airbyte.server.errors.InvalidJsonExceptionMapper; import io.airbyte.server.errors.InvalidJsonInputExceptionMapper; @@ -85,12 +82,7 @@ public class ServerApp implements ServerRunnable { private static final Logger LOGGER = LoggerFactory.getLogger(ServerApp.class); private static final int PORT = 8001; - /** - * We can't support automatic migration for kube before this version because we had a bug in kube - * which would cause airbyte db to erase state upon termination, as a result the automatic migration - * wouldn't run - */ - private static final AirbyteVersion KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION = new AirbyteVersion("0.26.5-alpha"); + private final String airbyteVersion; private final Set> customComponentClasses; private final Set customComponents; @@ -223,84 +215,30 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex final SynchronousSchedulerClient bucketSpecCacheSchedulerClient = new BucketSpecCacheSchedulerClient(syncSchedulerClient, configs.getSpecCacheBucket()); final SpecCachingSynchronousSchedulerClient cachingSchedulerClient = new SpecCachingSynchronousSchedulerClient(bucketSpecCacheSchedulerClient); - final SpecFetcher specFetcher = new SpecFetcher(cachingSchedulerClient); - - Optional airbyteDatabaseVersion = jobPersistence.getVersion(); - if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) { - final boolean isKubernetes = configs.getWorkerEnvironment() == WorkerEnvironment.KUBERNETES; - final boolean versionSupportsAutoMigrate = - new AirbyteVersion(airbyteDatabaseVersion.get()).patchVersionCompareTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION) >= 0; - if (!isKubernetes || versionSupportsAutoMigrate) { - runAutomaticMigration(configRepository, jobPersistence, specFetcher, airbyteVersion, airbyteDatabaseVersion.get()); - // After migration, upgrade the DB version - airbyteDatabaseVersion = jobPersistence.getVersion(); - } else { - LOGGER.info("Can not run automatic migration for Airbyte on KUBERNETES before version " + KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION.getVersion()); - } + + Optional dbVersion = jobPersistence.getVersion(); + if (dbVersion.isPresent() && !new AirbyteVersion(dbVersion.get()).compatibleWithMinRequiredVersion()) { + return new VersionMismatchServer(airbyteVersion, dbVersion.get(), PORT); } runFlywayMigration(configs, configDatabase, jobDatabase); - if (airbyteDatabaseVersion.isPresent() && AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion.get())) { - LOGGER.info("Starting server..."); - - return apiFactory.create( - schedulerJobClient, - cachingSchedulerClient, - temporalService, - configRepository, - jobPersistence, - configDatabase, - jobDatabase, - configs); - } else { - LOGGER.info("Start serving version mismatch errors. Automatic migration either failed or didn't run"); - return new VersionMismatchServer(airbyteVersion, airbyteDatabaseVersion.orElseThrow(), PORT); - } + LOGGER.info("Starting server..."); + return apiFactory.create( + schedulerJobClient, + cachingSchedulerClient, + temporalService, + configRepository, + jobPersistence, + configDatabase, + jobDatabase, + configs); } public static void main(final String[] args) throws Exception { getServer(new ServerFactory.Api()).start(); } - /** - * Ideally when automatic migration runs, we should make sure that we acquire a lock on database and - * no other operation is allowed - */ - private static void runAutomaticMigration(final ConfigRepository configRepository, - final JobPersistence jobPersistence, - final SpecFetcher specFetcher, - final String airbyteVersion, - final String airbyteDatabaseVersion) { - LOGGER.info("Running Automatic Migration from version : " + airbyteDatabaseVersion + " to version : " + airbyteVersion); - try (final RunMigration runMigration = new RunMigration( - jobPersistence, - configRepository, - airbyteVersion, - YamlSeedConfigPersistence.get(), - specFetcher)) { - runMigration.run(); - } catch (final Exception e) { - LOGGER.error("Automatic Migration failed ", e); - } - } - - public static boolean isDatabaseVersionBehindAppVersion(final String airbyteVersion, final String airbyteDatabaseVersion) { - final boolean bothVersionsCompatible = AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion); - if (bothVersionsCompatible) { - return false; - } - - final AirbyteVersion serverVersion = new AirbyteVersion(airbyteVersion); - final AirbyteVersion databaseVersion = new AirbyteVersion(airbyteDatabaseVersion); - - if (databaseVersion.getMajorVersion().compareTo(serverVersion.getMajorVersion()) < 0) { - return true; - } - - return databaseVersion.getMinorVersion().compareTo(serverVersion.getMinorVersion()) < 0; - } - private static void runFlywayMigration(final Configs configs, final Database configDatabase, final Database jobDatabase) { final DatabaseMigrator configDbMigrator = new ConfigsDatabaseMigrator(configDatabase, ServerApp.class.getSimpleName()); final DatabaseMigrator jobDbMigrator = new JobsDatabaseMigrator(jobDatabase, ServerApp.class.getSimpleName());