Skip to content

Commit b53d826

Browse files
authored
Separate connector upgrade from import (#5965)
* Remove connector update in dump importer * Remove seed persistence * Update connector definition with loadData method * Add override annotation * Pass in seed persistence * Remove import * Restore parameter order * Throw exception in FileSystemConfigPersistence#loadData
1 parent 332687a commit b53d826

File tree

8 files changed

+34
-106
lines changed

8 files changed

+34
-106
lines changed

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java

+2
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,6 @@ public interface ConfigPersistence {
4646

4747
Map<String, Stream<JsonNode>> dumpConfigs() throws IOException;
4848

49+
void loadData(ConfigPersistence seedPersistence) throws IOException;
50+
4951
}

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java

+4
Original file line numberDiff line numberDiff line change
@@ -248,4 +248,8 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
248248
return persistence.dumpConfigs();
249249
}
250250

251+
public void loadData(ConfigPersistence seedPersistence) throws IOException {
252+
persistence.loadData(seedPersistence);
253+
}
254+
251255
}

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public DatabaseConfigPersistence(Database database) {
7676
/**
7777
* Load or update the configs from the seed.
7878
*/
79-
public DatabaseConfigPersistence loadData(ConfigPersistence seedConfigPersistence) throws IOException {
79+
@Override
80+
public void loadData(ConfigPersistence seedConfigPersistence) throws IOException {
8081
database.transaction(ctx -> {
8182
boolean isInitialized = ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where());
8283
if (isInitialized) {
@@ -86,7 +87,6 @@ public DatabaseConfigPersistence loadData(ConfigPersistence seedConfigPersistenc
8687
}
8788
return null;
8889
});
89-
return this;
9090
}
9191

9292
public ValidatingConfigPersistence withValidation() {

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java

+6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.airbyte.config.AirbyteConfig;
3232
import io.airbyte.validation.json.JsonValidationException;
3333
import java.io.IOException;
34+
import java.io.UnsupportedEncodingException;
3435
import java.nio.file.Files;
3536
import java.nio.file.Path;
3637
import java.util.Collections;
@@ -198,6 +199,11 @@ public void replaceAllConfigs(Map<AirbyteConfig, Stream<?>> configs, boolean dry
198199
LOGGER.info("Deleted {}", oldConfigsDir);
199200
}
200201

202+
@Override
203+
public void loadData(ConfigPersistence seedPersistence) throws IOException {
204+
throw new UnsupportedEncodingException("This method is not supported in this implementation");
205+
}
206+
201207
private <T> T getConfigInternal(AirbyteConfig configType, String configId, Class<T> clazz)
202208
throws ConfigNotFoundException, IOException {
203209
// validate file with schema

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java

+5
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
8888
return decoratedPersistence.dumpConfigs();
8989
}
9090

91+
@Override
92+
public void loadData(ConfigPersistence seedPersistence) throws IOException {
93+
decoratedPersistence.loadData(seedPersistence);
94+
}
95+
9196
private <T> void validateJson(T config, AirbyteConfig configType) throws JsonValidationException {
9297
JsonNode schema = JsonSchemaValidator.getSchema(configType.getConfigSchemaFile());
9398
schemaValidator.ensure(schema, Jsons.jsonNode(config));

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java

+5
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,9 @@ public Map<String, Stream<JsonNode>> dumpConfigs() {
130130
e -> e.getValue().values().stream()));
131131
}
132132

133+
@Override
134+
public void loadData(ConfigPersistence seedPersistence) throws IOException {
135+
throw new UnsupportedOperationException("The seed config persistence is read only.");
136+
}
137+
133138
}

airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java

+7-99
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,11 @@
6262
import java.nio.file.Files;
6363
import java.nio.file.Path;
6464
import java.util.Collection;
65-
import java.util.Collections;
6665
import java.util.Comparator;
6766
import java.util.HashMap;
68-
import java.util.HashSet;
6967
import java.util.LinkedHashMap;
7068
import java.util.List;
7169
import java.util.Map;
72-
import java.util.Map.Entry;
7370
import java.util.Optional;
7471
import java.util.Set;
7572
import java.util.Spliterator;
@@ -131,7 +128,7 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist
131128
// 2. dry run
132129
try {
133130
checkImport(targetVersion, sourceRoot);
134-
importConfigsFromArchive(sourceRoot, seedPersistence, true);
131+
importConfigsFromArchive(sourceRoot, true);
135132
} catch (Exception e) {
136133
LOGGER.error("Dry run failed.", e);
137134
throw e;
@@ -140,8 +137,9 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist
140137
// 3. Import Postgres content
141138
importDatabaseFromArchive(sourceRoot, targetVersion);
142139

143-
// 4. Import Configs
144-
importConfigsFromArchive(sourceRoot, seedPersistence, false);
140+
// 4. Import Configs and update connector definitions
141+
importConfigsFromArchive(sourceRoot, false);
142+
configRepository.loadData(seedPersistence);
145143

146144
// 5. Set DB version
147145
LOGGER.info("Setting the DB Airbyte version to : " + targetVersion);
@@ -158,7 +156,7 @@ public void importDataWithSeed(String targetVersion, File archive, ConfigPersist
158156
configRepository.listStandardWorkspaces(true).forEach(workspace -> TrackingClientSingleton.get().identify(workspace.getWorkspaceId()));
159157
}
160158

161-
private void checkImport(String targetVersion, Path tempFolder) throws IOException, JsonValidationException {
159+
private void checkImport(String targetVersion, Path tempFolder) throws IOException {
162160
final Path versionFile = tempFolder.resolve(VERSION_FILE_NAME);
163161
final String importVersion = Files.readString(versionFile, Charset.defaultCharset())
164162
.replace("\n", "").strip();
@@ -179,21 +177,10 @@ private List<String> listDirectories(Path sourceRoot) throws IOException {
179177
}
180178
}
181179

182-
private <T> void importConfigsFromArchive(final Path sourceRoot, ConfigPersistence seedPersistence, final boolean dryRun)
183-
throws IOException, JsonValidationException {
184-
final Set<String> sourceDefinitionsInUse = new HashSet<>();
185-
final Set<String> destinationDefinitionsInUse = new HashSet<>();
186-
final boolean[] sourceProcessed = {false};
187-
final boolean[] destinationProcessed = {false};
180+
private void importConfigsFromArchive(final Path sourceRoot, final boolean dryRun) throws IOException {
188181
final List<String> directories = listDirectories(sourceRoot);
189-
// We sort the directories because we want to process SOURCE_CONNECTION before
190-
// STANDARD_SOURCE_DEFINITION and DESTINATION_CONNECTION before STANDARD_DESTINATION_DEFINITION
191-
// so that we can identify which definitions should not be upgraded to the latest version
192-
Collections.sort(directories);
193182
final Map<AirbyteConfig, Stream<?>> data = new LinkedHashMap<>();
194183

195-
final Map<ConfigSchema, Map<String, ?>> seeds = getSeeds(seedPersistence);
196-
197184
for (final String directory : directories) {
198185
final Optional<ConfigSchema> configSchemaOptional = Enums.toEnum(directory.replace(".yaml", ""), ConfigSchema.class);
199186

@@ -202,90 +189,11 @@ private <T> void importConfigsFromArchive(final Path sourceRoot, ConfigPersisten
202189
}
203190

204191
final ConfigSchema configSchema = configSchemaOptional.get();
205-
Stream<?> configs = readConfigsFromArchive(sourceRoot, configSchema);
206-
207-
// If there is no source or destination connection, mark them as processed respectively.
208-
if (configSchema == ConfigSchema.STANDARD_SOURCE_DEFINITION && !data.containsKey(ConfigSchema.SOURCE_CONNECTION)) {
209-
sourceProcessed[0] = true;
210-
} else if (configSchema == ConfigSchema.STANDARD_DESTINATION_DEFINITION && !data.containsKey(ConfigSchema.DESTINATION_CONNECTION)) {
211-
destinationProcessed[0] = true;
212-
}
213-
214-
configs = streamWithAdditionalOperation(
215-
sourceDefinitionsInUse,
216-
destinationDefinitionsInUse,
217-
sourceProcessed,
218-
destinationProcessed,
219-
configSchema,
220-
configs,
221-
seeds);
222-
data.put(configSchema, configs);
192+
data.put(configSchema, readConfigsFromArchive(sourceRoot, configSchema));
223193
}
224194
configRepository.replaceAllConfigs(data, dryRun);
225195
}
226196

227-
/**
228-
* Convert config dumps from {@link ConfigPersistence#dumpConfigs} to the desired format.
229-
*/
230-
@SuppressWarnings("unchecked")
231-
private static Map<ConfigSchema, Map<String, ?>> getSeeds(ConfigPersistence configSeedPersistence) throws IOException {
232-
Map<ConfigSchema, Map<String, ?>> allData = new HashMap<>(2);
233-
for (Map.Entry<String, Stream<JsonNode>> configStream : configSeedPersistence.dumpConfigs().entrySet()) {
234-
ConfigSchema configSchema = ConfigSchema.valueOf(configStream.getKey());
235-
Map<String, ?> configSeeds = configStream.getValue()
236-
.map(node -> Jsons.object(node, configSchema.getClassName()))
237-
.collect(Collectors.toMap(
238-
configSchema::getId,
239-
object -> object));
240-
allData.put(configSchema, configSeeds);
241-
}
242-
return allData;
243-
}
244-
245-
private Stream<?> streamWithAdditionalOperation(Set<String> sourceDefinitionsInUse,
246-
Set<String> destinationDefinitionsInUse,
247-
boolean[] sourceProcessed,
248-
boolean[] destinationProcessed,
249-
ConfigSchema configSchema,
250-
Stream<?> configs,
251-
Map<ConfigSchema, Map<String, ?>> latestSeeds) {
252-
if (configSchema == ConfigSchema.SOURCE_CONNECTION) {
253-
sourceProcessed[0] = true;
254-
configs = configs.peek(config -> sourceDefinitionsInUse.add(((SourceConnection) config).getSourceDefinitionId().toString()));
255-
} else if (configSchema == ConfigSchema.DESTINATION_CONNECTION) {
256-
destinationProcessed[0] = true;
257-
configs = configs.peek(config -> destinationDefinitionsInUse.add(((DestinationConnection) config).getDestinationDefinitionId().toString()));
258-
} else if (configSchema == ConfigSchema.STANDARD_SOURCE_DEFINITION) {
259-
Map<String, ?> sourceDefinitionSeeds = latestSeeds.get(configSchema);
260-
configs = getDefinitionStream(sourceDefinitionsInUse, sourceProcessed[0], configSchema, configs, sourceDefinitionSeeds);
261-
} else if (configSchema == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
262-
Map<String, ?> destinationDefinitionSeeds = latestSeeds.get(configSchema);
263-
configs = getDefinitionStream(destinationDefinitionsInUse, destinationProcessed[0], configSchema, configs, destinationDefinitionSeeds);
264-
}
265-
return configs;
266-
}
267-
268-
/**
269-
* This method combines the latest definitions with existing ones. If a connector is being used by
270-
* user, it will continue to be at the same version, otherwise it will be migrated to the latest
271-
* version
272-
*/
273-
private Stream<?> getDefinitionStream(Set<String> definitionsInUse,
274-
boolean definitionsPopulated,
275-
ConfigSchema configSchema,
276-
Stream<?> currentDefinitions,
277-
Map<String, ?> latestDefinitions) {
278-
if (!definitionsPopulated) {
279-
throw new RuntimeException("Trying to process " + configSchema + " without populating the definitions in use");
280-
}
281-
282-
return Streams.concat(
283-
// Keep all the definitions in use
284-
currentDefinitions.filter(c -> definitionsInUse.contains(configSchema.getId(c))),
285-
// Upgrade all the definitions not in use
286-
latestDefinitions.entrySet().stream().filter(c -> !definitionsInUse.contains(c.getKey())).map(Entry::getValue));
287-
}
288-
289197
private <T> Stream<T> readConfigsFromArchive(final Path storageRoot, final ConfigSchema schemaType)
290198
throws IOException {
291199

airbyte-server/src/main/java/io/airbyte/server/ServerApp.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import io.airbyte.config.EnvConfigs;
3434
import io.airbyte.config.StandardWorkspace;
3535
import io.airbyte.config.helpers.LogClientSingleton;
36-
import io.airbyte.config.persistence.ConfigPersistence;
3736
import io.airbyte.config.persistence.ConfigRepository;
3837
import io.airbyte.config.persistence.ConfigSeedProvider;
3938
import io.airbyte.config.persistence.DatabaseConfigPersistence;
@@ -179,10 +178,9 @@ public static ServerRunnable getServer(ServerFactory apiFactory) throws Exceptio
179178
configs.getConfigDatabasePassword(),
180179
configs.getConfigDatabaseUrl())
181180
.getAndInitialize();
182-
final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase)
183-
.loadData(ConfigSeedProvider.get(configs))
184-
.withValidation();
185-
final ConfigRepository configRepository = new ConfigRepository(configPersistence);
181+
final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase);
182+
configPersistence.loadData(ConfigSeedProvider.get(configs));
183+
final ConfigRepository configRepository = new ConfigRepository(configPersistence.withValidation());
186184

187185
LOGGER.info("Creating Scheduler persistence...");
188186
final Database jobDatabase = new JobsDatabaseInstance(

0 commit comments

Comments
 (0)