|
29 | 29 | import io.airbyte.commons.resources.MoreResources;
|
30 | 30 | import io.airbyte.commons.version.AirbyteVersion;
|
31 | 31 | import io.airbyte.config.Configs;
|
32 |
| -import io.airbyte.config.Configs.WorkerEnvironment; |
33 | 32 | import io.airbyte.config.EnvConfigs;
|
34 | 33 | import io.airbyte.config.StandardWorkspace;
|
35 | 34 | import io.airbyte.config.helpers.LogClientSingleton;
|
36 | 35 | import io.airbyte.config.persistence.ConfigRepository;
|
37 | 36 | import io.airbyte.config.persistence.ConfigSeedProvider;
|
38 | 37 | import io.airbyte.config.persistence.DatabaseConfigPersistence;
|
39 |
| -import io.airbyte.config.persistence.YamlSeedConfigPersistence; |
40 | 38 | import io.airbyte.db.Database;
|
41 | 39 | import io.airbyte.db.instance.DatabaseMigrator;
|
42 | 40 | import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
|
|
54 | 52 | import io.airbyte.scheduler.persistence.JobPersistence;
|
55 | 53 | import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
|
56 | 54 | import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
|
57 |
| -import io.airbyte.server.converters.SpecFetcher; |
58 | 55 | import io.airbyte.server.errors.InvalidInputExceptionMapper;
|
59 | 56 | import io.airbyte.server.errors.InvalidJsonExceptionMapper;
|
60 | 57 | import io.airbyte.server.errors.InvalidJsonInputExceptionMapper;
|
@@ -85,12 +82,7 @@ public class ServerApp implements ServerRunnable {
|
85 | 82 |
|
86 | 83 | private static final Logger LOGGER = LoggerFactory.getLogger(ServerApp.class);
|
87 | 84 | private static final int PORT = 8001;
|
88 |
| - /** |
89 |
| - * We can't support automatic migration for kube before this version because we had a bug in kube |
90 |
| - * which would cause airbyte db to erase state upon termination, as a result the automatic migration |
91 |
| - * wouldn't run |
92 |
| - */ |
93 |
| - private static final AirbyteVersion KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION = new AirbyteVersion("0.26.5-alpha"); |
| 85 | + |
94 | 86 | private final String airbyteVersion;
|
95 | 87 | private final Set<Class<?>> customComponentClasses;
|
96 | 88 | private final Set<Object> customComponents;
|
@@ -223,84 +215,30 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex
|
223 | 215 | final SynchronousSchedulerClient bucketSpecCacheSchedulerClient =
|
224 | 216 | new BucketSpecCacheSchedulerClient(syncSchedulerClient, configs.getSpecCacheBucket());
|
225 | 217 | final SpecCachingSynchronousSchedulerClient cachingSchedulerClient = new SpecCachingSynchronousSchedulerClient(bucketSpecCacheSchedulerClient);
|
226 |
| - final SpecFetcher specFetcher = new SpecFetcher(cachingSchedulerClient); |
227 |
| - |
228 |
| - Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion(); |
229 |
| - if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) { |
230 |
| - final boolean isKubernetes = configs.getWorkerEnvironment() == WorkerEnvironment.KUBERNETES; |
231 |
| - final boolean versionSupportsAutoMigrate = |
232 |
| - new AirbyteVersion(airbyteDatabaseVersion.get()).patchVersionCompareTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION) >= 0; |
233 |
| - if (!isKubernetes || versionSupportsAutoMigrate) { |
234 |
| - runAutomaticMigration(configRepository, jobPersistence, specFetcher, airbyteVersion, airbyteDatabaseVersion.get()); |
235 |
| - // After migration, upgrade the DB version |
236 |
| - airbyteDatabaseVersion = jobPersistence.getVersion(); |
237 |
| - } else { |
238 |
| - LOGGER.info("Can not run automatic migration for Airbyte on KUBERNETES before version " + KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION.getVersion()); |
239 |
| - } |
| 218 | + |
| 219 | + Optional<String> dbVersion = jobPersistence.getVersion(); |
| 220 | + if (dbVersion.isPresent() && !new AirbyteVersion(dbVersion.get()).compatibleWithMinRequiredVersion()) { |
| 221 | + return new VersionMismatchServer(airbyteVersion, dbVersion.get(), PORT); |
240 | 222 | }
|
241 | 223 |
|
242 | 224 | runFlywayMigration(configs, configDatabase, jobDatabase);
|
243 | 225 |
|
244 |
| - if (airbyteDatabaseVersion.isPresent() && AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion.get())) { |
245 |
| - LOGGER.info("Starting server..."); |
246 |
| - |
247 |
| - return apiFactory.create( |
248 |
| - schedulerJobClient, |
249 |
| - cachingSchedulerClient, |
250 |
| - temporalService, |
251 |
| - configRepository, |
252 |
| - jobPersistence, |
253 |
| - configDatabase, |
254 |
| - jobDatabase, |
255 |
| - configs); |
256 |
| - } else { |
257 |
| - LOGGER.info("Start serving version mismatch errors. Automatic migration either failed or didn't run"); |
258 |
| - return new VersionMismatchServer(airbyteVersion, airbyteDatabaseVersion.orElseThrow(), PORT); |
259 |
| - } |
| 226 | + LOGGER.info("Starting server..."); |
| 227 | + return apiFactory.create( |
| 228 | + schedulerJobClient, |
| 229 | + cachingSchedulerClient, |
| 230 | + temporalService, |
| 231 | + configRepository, |
| 232 | + jobPersistence, |
| 233 | + configDatabase, |
| 234 | + jobDatabase, |
| 235 | + configs); |
260 | 236 | }
|
261 | 237 |
|
262 | 238 | public static void main(final String[] args) throws Exception {
|
263 | 239 | getServer(new ServerFactory.Api()).start();
|
264 | 240 | }
|
265 | 241 |
|
266 |
| - /** |
267 |
| - * Ideally when automatic migration runs, we should make sure that we acquire a lock on database and |
268 |
| - * no other operation is allowed |
269 |
| - */ |
270 |
| - private static void runAutomaticMigration(final ConfigRepository configRepository, |
271 |
| - final JobPersistence jobPersistence, |
272 |
| - final SpecFetcher specFetcher, |
273 |
| - final String airbyteVersion, |
274 |
| - final String airbyteDatabaseVersion) { |
275 |
| - LOGGER.info("Running Automatic Migration from version : " + airbyteDatabaseVersion + " to version : " + airbyteVersion); |
276 |
| - try (final RunMigration runMigration = new RunMigration( |
277 |
| - jobPersistence, |
278 |
| - configRepository, |
279 |
| - airbyteVersion, |
280 |
| - YamlSeedConfigPersistence.get(), |
281 |
| - specFetcher)) { |
282 |
| - runMigration.run(); |
283 |
| - } catch (final Exception e) { |
284 |
| - LOGGER.error("Automatic Migration failed ", e); |
285 |
| - } |
286 |
| - } |
287 |
| - |
288 |
| - public static boolean isDatabaseVersionBehindAppVersion(final String airbyteVersion, final String airbyteDatabaseVersion) { |
289 |
| - final boolean bothVersionsCompatible = AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion); |
290 |
| - if (bothVersionsCompatible) { |
291 |
| - return false; |
292 |
| - } |
293 |
| - |
294 |
| - final AirbyteVersion serverVersion = new AirbyteVersion(airbyteVersion); |
295 |
| - final AirbyteVersion databaseVersion = new AirbyteVersion(airbyteDatabaseVersion); |
296 |
| - |
297 |
| - if (databaseVersion.getMajorVersion().compareTo(serverVersion.getMajorVersion()) < 0) { |
298 |
| - return true; |
299 |
| - } |
300 |
| - |
301 |
| - return databaseVersion.getMinorVersion().compareTo(serverVersion.getMinorVersion()) < 0; |
302 |
| - } |
303 |
| - |
304 | 242 | private static void runFlywayMigration(final Configs configs, final Database configDatabase, final Database jobDatabase) {
|
305 | 243 | final DatabaseMigrator configDbMigrator = new ConfigsDatabaseMigrator(configDatabase, ServerApp.class.getSimpleName());
|
306 | 244 | final DatabaseMigrator jobDbMigrator = new JobsDatabaseMigrator(jobDatabase, ServerApp.class.getSimpleName());
|
|
0 commit comments