8
8
import io .airbyte .commons .features .FeatureFlags ;
9
9
import io .airbyte .commons .lang .CloseableShutdownHook ;
10
10
import io .airbyte .commons .resources .MoreResources ;
11
+ import io .airbyte .commons .version .AirbyteProtocolVersionRange ;
11
12
import io .airbyte .commons .version .AirbyteVersion ;
12
- import io .airbyte .commons .version .Version ;
13
13
import io .airbyte .config .Configs ;
14
14
import io .airbyte .config .EnvConfigs ;
15
15
import io .airbyte .config .Geography ;
@@ -70,7 +70,7 @@ public class BootloaderApp {
70
70
private final FeatureFlags featureFlags ;
71
71
private final SecretMigrator secretMigrator ;
72
72
private ConfigRepository configRepository ;
73
- private DefinitionsProvider localDefinitionsProvider ;
73
+ private Optional < DefinitionsProvider > definitionsProvider ;
74
74
private Database configDatabase ;
75
75
private Database jobDatabase ;
76
76
private JobPersistence jobPersistence ;
@@ -79,6 +79,13 @@ public class BootloaderApp {
79
79
private final DSLContext configsDslContext ;
80
80
private final DSLContext jobsDslContext ;
81
81
82
+ // This controls how we check the protocol version compatibility
83
+ // True means that the connectors will be forcefully upgraded regardless of whether they are used in
84
+ // an active sync or not.
85
+ // This should be moved to a Configs, however, this behavior is currently forced through hooks that
86
+ // are passed as the postLoadExecution.
87
+ private final boolean autoUpgradeConnectors ;
88
+
82
89
/**
83
90
* This method is exposed for Airbyte Cloud consumption. This lets us override the seed loading
84
91
* logic and customise Cloud connector versions. Please check with the Platform team before making
@@ -97,7 +104,9 @@ public BootloaderApp(final Configs configs,
97
104
final DSLContext configsDslContext ,
98
105
final DSLContext jobsDslContext ,
99
106
final Flyway configsFlyway ,
100
- final Flyway jobsFlyway ) {
107
+ final Flyway jobsFlyway ,
108
+ final Optional <DefinitionsProvider > definitionsProvider ,
109
+ final boolean autoUpgradeConnectors ) {
101
110
this .configs = configs ;
102
111
this .postLoadExecution = postLoadExecution ;
103
112
this .featureFlags = featureFlags ;
@@ -106,30 +115,65 @@ public BootloaderApp(final Configs configs,
106
115
this .configsFlyway = configsFlyway ;
107
116
this .jobsDslContext = jobsDslContext ;
108
117
this .jobsFlyway = jobsFlyway ;
118
+ this .definitionsProvider = definitionsProvider ;
119
+ this .autoUpgradeConnectors = autoUpgradeConnectors ;
109
120
110
121
initPersistences (configsDslContext , jobsDslContext );
111
122
}
112
123
124
+ // Temporary duplication of constructor, to remove once Cloud has been migrated to the one above.
125
+ @ Deprecated (forRemoval = true )
113
126
public BootloaderApp (final Configs configs ,
127
+ final Runnable postLoadExecution ,
114
128
final FeatureFlags featureFlags ,
115
129
final SecretMigrator secretMigrator ,
116
130
final DSLContext configsDslContext ,
117
131
final DSLContext jobsDslContext ,
118
132
final Flyway configsFlyway ,
119
133
final Flyway jobsFlyway ) {
120
134
this .configs = configs ;
135
+ this .postLoadExecution = postLoadExecution ;
121
136
this .featureFlags = featureFlags ;
122
137
this .secretMigrator = secretMigrator ;
123
138
this .configsDslContext = configsDslContext ;
124
139
this .configsFlyway = configsFlyway ;
125
140
this .jobsDslContext = jobsDslContext ;
126
141
this .jobsFlyway = jobsFlyway ;
142
+ this .autoUpgradeConnectors = false ;
143
+
144
+ try {
145
+ this .definitionsProvider = Optional .of (getLocalDefinitionsProvider ());
146
+ } catch (final IOException e ) {
147
+ LOGGER .error ("Unable to initialize persistence." , e );
148
+ }
149
+
150
+ initPersistences (configsDslContext , jobsDslContext );
151
+ }
152
+
153
+ public BootloaderApp (final Configs configs ,
154
+ final FeatureFlags featureFlags ,
155
+ final SecretMigrator secretMigrator ,
156
+ final DSLContext configsDslContext ,
157
+ final DSLContext jobsDslContext ,
158
+ final Flyway configsFlyway ,
159
+ final Flyway jobsFlyway ,
160
+ final DefinitionsProvider definitionsProvider ,
161
+ final boolean autoUpgradeConnectors ) {
162
+ this .configs = configs ;
163
+ this .featureFlags = featureFlags ;
164
+ this .secretMigrator = secretMigrator ;
165
+ this .configsDslContext = configsDslContext ;
166
+ this .configsFlyway = configsFlyway ;
167
+ this .jobsDslContext = jobsDslContext ;
168
+ this .jobsFlyway = jobsFlyway ;
169
+ this .definitionsProvider = Optional .of (definitionsProvider );
170
+ this .autoUpgradeConnectors = autoUpgradeConnectors ;
127
171
128
172
initPersistences (configsDslContext , jobsDslContext );
129
173
130
174
postLoadExecution = () -> {
131
175
try {
132
- final ApplyDefinitionsHelper applyDefinitionsHelper = new ApplyDefinitionsHelper (configRepository , localDefinitionsProvider );
176
+ final ApplyDefinitionsHelper applyDefinitionsHelper = new ApplyDefinitionsHelper (configRepository , this . definitionsProvider . get () );
133
177
applyDefinitionsHelper .apply ();
134
178
135
179
if (featureFlags .forceSecretMigration () || !jobPersistence .isSecretMigrated ()) {
@@ -159,10 +203,9 @@ public void load() throws Exception {
159
203
final AirbyteVersion currAirbyteVersion = configs .getAirbyteVersion ();
160
204
assertNonBreakingMigration (jobPersistence , currAirbyteVersion );
161
205
162
- final Version airbyteProtocolVersionMax = configs .getAirbyteProtocolVersionMax ();
163
- final Version airbyteProtocolVersionMin = configs .getAirbyteProtocolVersionMin ();
164
- // TODO ProtocolVersion validation should happen here
165
- trackProtocolVersion (airbyteProtocolVersionMin , airbyteProtocolVersionMax );
206
+ final ProtocolVersionChecker protocolVersionChecker =
207
+ new ProtocolVersionChecker (jobPersistence , configs , configRepository , definitionsProvider );
208
+ assertNonBreakingProtocolVersionConstraints (protocolVersionChecker , jobPersistence , autoUpgradeConnectors );
166
209
167
210
// TODO Will be converted to an injected singleton during DI migration
168
211
final DatabaseMigrator configDbMigrator = new ConfigsDatabaseMigrator (configDatabase , configsFlyway );
@@ -191,7 +234,7 @@ private static Database getConfigDatabase(final DSLContext dslContext) throws IO
191
234
return new Database (dslContext );
192
235
}
193
236
194
- private static DefinitionsProvider getLocalDefinitionsProvider () throws IOException {
237
+ static DefinitionsProvider getLocalDefinitionsProvider () throws IOException {
195
238
return new LocalDefinitionsProvider (LocalDefinitionsProvider .DEFAULT_SEED_DEFINITION_RESOURCE_CLASS );
196
239
}
197
240
@@ -207,7 +250,6 @@ private void initPersistences(final DSLContext configsDslContext, final DSLConte
207
250
try {
208
251
configDatabase = getConfigDatabase (configsDslContext );
209
252
configRepository = new ConfigRepository (configDatabase );
210
- localDefinitionsProvider = getLocalDefinitionsProvider ();
211
253
jobDatabase = getJobDatabase (jobsDslContext );
212
254
jobPersistence = getJobPersistence (jobDatabase );
213
255
} catch (final IOException e ) {
@@ -249,7 +291,11 @@ public static void main(final String[] args) throws Exception {
249
291
// Ensure that the database resources are closed on application shutdown
250
292
CloseableShutdownHook .registerRuntimeShutdownHook (configsDataSource , jobsDataSource , configsDslContext , jobsDslContext );
251
293
252
- final var bootloader = new BootloaderApp (configs , featureFlags , secretMigrator , configsDslContext , jobsDslContext , configsFlyway , jobsFlyway );
294
+ final DefinitionsProvider definitionsProvider = getLocalDefinitionsProvider ();
295
+
296
+ final var bootloader =
297
+ new BootloaderApp (configs , featureFlags , secretMigrator , configsDslContext , jobsDslContext , configsFlyway , jobsFlyway , definitionsProvider ,
298
+ false );
253
299
bootloader .load ();
254
300
}
255
301
}
@@ -307,10 +353,24 @@ private static void assertNonBreakingMigration(final JobPersistence jobPersisten
307
353
}
308
354
}
309
355
310
- private void trackProtocolVersion (final Version airbyteProtocolVersionMin , final Version airbyteProtocolVersionMax ) throws IOException {
311
- jobPersistence .setAirbyteProtocolVersionMin (airbyteProtocolVersionMin );
312
- jobPersistence .setAirbyteProtocolVersionMax (airbyteProtocolVersionMax );
313
- LOGGER .info ("AirbyteProtocol version support range [{}:{}]" , airbyteProtocolVersionMin .serialize (), airbyteProtocolVersionMax .serialize ());
356
+ private static void assertNonBreakingProtocolVersionConstraints (final ProtocolVersionChecker protocolVersionChecker ,
357
+ final JobPersistence jobPersistence ,
358
+ final boolean autoUpgradeConnectors )
359
+ throws Exception {
360
+ final Optional <AirbyteProtocolVersionRange > newProtocolRange = protocolVersionChecker .validate (autoUpgradeConnectors );
361
+ if (newProtocolRange .isEmpty ()) {
362
+ throw new RuntimeException (
363
+ "Aborting bootloader to avoid breaking existing connection after an upgrade. " +
364
+ "Please address airbyte protocol version support issues in the connectors before retrying." );
365
+ }
366
+ trackProtocolVersion (jobPersistence , newProtocolRange .get ());
367
+ }
368
+
369
+ private static void trackProtocolVersion (final JobPersistence jobPersistence , final AirbyteProtocolVersionRange protocolVersionRange )
370
+ throws IOException {
371
+ jobPersistence .setAirbyteProtocolVersionMin (protocolVersionRange .min ());
372
+ jobPersistence .setAirbyteProtocolVersionMax (protocolVersionRange .max ());
373
+ LOGGER .info ("AirbyteProtocol version support range [{}:{}]" , protocolVersionRange .min ().serialize (), protocolVersionRange .max ().serialize ());
314
374
}
315
375
316
376
static boolean isLegalUpgrade (final AirbyteVersion airbyteDatabaseVersion , final AirbyteVersion airbyteVersion ) {
0 commit comments