-
Notifications
You must be signed in to change notification settings - Fork 4.5k
🎉 introduce automatic migration at the startup of server for docker environment #3980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
9d7d21c
403964b
2879348
1f72c01
47d4cea
8ad8c45
6a618a0
153955b
bfbf077
1951f01
e5ea8f9
e95f76e
217de6d
8d7a028
b18b68e
d125c56
4ffa193
4063cad
f24b720
97ed4f7
0da61dd
0ea3d4d
5df5906
b7bc8a3
444a437
d469814
8d46a8d
d1af88e
a37ae68
94a9460
4fb77cb
ba15561
d33dba6
24128c1
e7b63e4
fa53018
b40b3d9
d2967a0
cd39b92
3df18e4
86ae0cb
4108203
90aa45d
6b48f43
9856eac
aa5736a
f766e75
83a0008
26694ec
9757571
31d3282
d177356
ff22b6a
fa4e0a8
0717e80
9c7c7d3
5f2894a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
package io.airbyte.migrate; | ||
|
||
import io.airbyte.commons.io.Archives; | ||
import io.airbyte.commons.version.AirbyteVersion; | ||
import java.io.IOException; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
|
@@ -41,11 +42,22 @@ public class MigrationRunner { | |
private static final Logger LOGGER = LoggerFactory.getLogger(MigrationRunner.class); | ||
|
||
public static void run(String[] args) throws IOException { | ||
|
||
final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_migrate"); | ||
|
||
MigrateConfig migrateConfig = parse(args); | ||
run(migrateConfig); | ||
} | ||
|
||
public static void run(MigrateConfig migrateConfig) throws IOException { | ||
final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_migrate"); | ||
AirbyteVersion airbyteVersion = new AirbyteVersion(migrateConfig.getTargetVersion()); | ||
if (!airbyteVersion.getPatchVersion().equals("0")) { | ||
String targetVersionWithoutPatch = "" + airbyteVersion.getMajorVersion() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: a String.format would make things easier to read. |
||
+ "." | ||
+ airbyteVersion.getMinorVersion() | ||
+ ".0-" | ||
+ airbyteVersion.getVersion().replace("\n", "").strip().split("-")[1]; | ||
migrateConfig = new MigrateConfig(migrateConfig.getInputPath(), migrateConfig.getOutputPath(), | ||
targetVersionWithoutPatch); | ||
} | ||
if (migrateConfig.getInputPath().toString().endsWith(".gz")) { | ||
LOGGER.info("Unpacking tarball"); | ||
final Path uncompressedInputPath = Files.createDirectories(workspaceRoot.resolve("uncompressed")); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,9 @@ required: | |
- catalog | ||
additionalProperties: false | ||
properties: | ||
prefix: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this file changing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We didn't introduce this change back when we wrote this migration. Thats why its required |
||
description: Prefix that will be prepended to the name of each stream when it is written to the destination. | ||
type: string | ||
sourceId: | ||
type: string | ||
format: uuid | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -439,10 +439,10 @@ public Optional<String> getVersion() throws IOException { | |
@Override | ||
public void setVersion(String airbyteVersion) throws IOException { | ||
database.query(ctx -> ctx.execute(String.format( | ||
"INSERT INTO %s VALUES('%s', '%s'), ('%s_init_db', '%s');", | ||
"INSERT INTO %s VALUES('%s', '%s'), ('%s_init_db', '%s') ON CONFLICT (key) DO UPDATE SET value = '%s'", | ||
AIRBYTE_METADATA_TABLE, | ||
AirbyteVersion.AIRBYTE_VERSION_KEY_NAME, airbyteVersion, | ||
current_timestamp(), airbyteVersion))); | ||
current_timestamp(), airbyteVersion, airbyteVersion))); | ||
} | ||
|
||
private static String current_timestamp() { | ||
|
@@ -454,6 +454,22 @@ public Map<DatabaseSchema, Stream<JsonNode>> exportDatabase() throws IOException | |
return exportDatabase(DEFAULT_SCHEMA); | ||
} | ||
|
||
@Override | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. putting this comment here because the line of code i want isn't in the diff. should the import do an atomic replace like we do for the config persistence? |
||
public Map<String, Stream<JsonNode>> exportEverythingInDefaultSchema() throws IOException { | ||
return exportEverything(DEFAULT_SCHEMA); | ||
} | ||
|
||
private Map<String, Stream<JsonNode>> exportEverything(final String schema) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems very similar to |
||
final List<String> tables = listTables(schema); | ||
final Map<String, Stream<JsonNode>> result = new HashMap<>(); | ||
|
||
for (final String table : tables) { | ||
result.put(table.toUpperCase(), exportTable(schema, table)); | ||
} | ||
|
||
return result; | ||
} | ||
|
||
private Map<DatabaseSchema, Stream<JsonNode>> exportDatabase(final String schema) throws IOException { | ||
final List<String> tables = listTables(schema); | ||
final Map<DatabaseSchema, Stream<JsonNode>> result = new HashMap<>(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -171,6 +171,8 @@ public interface JobPersistence { | |
*/ | ||
Map<DatabaseSchema, Stream<JsonNode>> exportDatabase() throws IOException; | ||
|
||
Map<String, Stream<JsonNode>> exportEverythingInDefaultSchema() throws IOException; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this name scoped by "default schema". I think the contract we are looking for here is that the Keep in mind we may want to allow using different databases in the future and hide them behind this interface. I mention that case only because it may make it clearer that mentioning the PG specific schema shouldn't be the caller's concern. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also i know in our conversations i kept saying exportEverything but probably the more common name fot this method would just be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually what is the difference between There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main difference between |
||
|
||
/** | ||
* Import all SQL tables from streams of JsonNode objects. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ dependencies { | |
implementation project(':airbyte-scheduler:models') | ||
implementation project(':airbyte-scheduler:persistence') | ||
implementation project(':airbyte-workers') | ||
implementation project(':airbyte-migration') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sort. |
||
|
||
testImplementation "org.postgresql:postgresql:42.2.18" | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.server; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.commons.io.Archives; | ||
import io.airbyte.commons.lang.CloseableConsumer; | ||
import io.airbyte.commons.lang.Exceptions; | ||
import io.airbyte.commons.yaml.Yamls; | ||
import io.airbyte.scheduler.persistence.JobPersistence; | ||
import java.io.BufferedWriter; | ||
import java.io.File; | ||
import java.io.FileWriter; | ||
import java.io.IOException; | ||
import java.nio.charset.Charset; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
import org.apache.commons.io.FileUtils; | ||
|
||
// TODO: Write a test case which compares the output dump with the output of ArchiveHandler export | ||
// for the same data | ||
public class ConfigDumpExport { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: add a doc string here to explain why this exist separate from the ArchiveHandler's export function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this just replace the archive handler's export function? it seems like we should just prefer this implementation to the existing one in the ArchiveHandler. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some users reported they've been storing their own tables/data in the Airbyte Postgres database too. So this dump would also export their custom tables/schema too? is that going to be a problem in such a use case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. woof. i don't think it'll be a problem for now. but probably something we need to be careful about when we add our postgres version? |
||
|
||
private static final String ARCHIVE_FILE_NAME = "airbyte_config_dump"; | ||
private static final String CONFIG_FOLDER_NAME = "airbyte_config"; | ||
private static final String DB_FOLDER_NAME = "airbyte_db"; | ||
private static final String VERSION_FILE_NAME = "VERSION"; | ||
|
||
private final ConfigDumpUtil configDumpUtil; | ||
private final JobPersistence jobPersistence; | ||
private final String version; | ||
|
||
public ConfigDumpExport(Path storageRoot, JobPersistence jobPersistence, String version) { | ||
this.configDumpUtil = new ConfigDumpUtil(storageRoot); | ||
this.jobPersistence = jobPersistence; | ||
this.version = version; | ||
} | ||
|
||
public File dump() { | ||
try { | ||
final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), ARCHIVE_FILE_NAME); | ||
final File dump = Files.createTempFile(ARCHIVE_FILE_NAME, ".tar.gz").toFile(); | ||
exportVersionFile(tempFolder); | ||
dumpConfigs(tempFolder); | ||
dumpDatabase(tempFolder); | ||
|
||
Archives.createArchive(tempFolder, dump.toPath()); | ||
return dump; | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
public void deleteOrphanDirectories() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. per the conversation we had, i think that the atomic replace in config persistence should handle this for us. ideally we shouldn't need to call this. the db should just handle it. |
||
try { | ||
configDumpUtil.orphanDirectories(); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
private void exportVersionFile(Path tempFolder) throws IOException { | ||
final File versionFile = Files.createFile(tempFolder.resolve(VERSION_FILE_NAME)).toFile(); | ||
FileUtils.writeStringToFile(versionFile, version, Charset.defaultCharset()); | ||
} | ||
|
||
private void dumpDatabase(Path parentFolder) throws Exception { | ||
final Map<String, Stream<JsonNode>> tables = jobPersistence.exportEverythingInDefaultSchema(); | ||
Files.createDirectories(parentFolder.resolve(DB_FOLDER_NAME)); | ||
for (Map.Entry<String, Stream<JsonNode>> table : tables.entrySet()) { | ||
final Path tablePath = buildTablePath(parentFolder, table.getKey()); | ||
writeTableToArchive(tablePath, table.getValue()); | ||
} | ||
} | ||
|
||
private void writeTableToArchive(final Path tablePath, final Stream<JsonNode> tableStream) | ||
throws Exception { | ||
Files.createDirectories(tablePath.getParent()); | ||
final BufferedWriter recordOutputWriter = new BufferedWriter( | ||
new FileWriter(tablePath.toFile())); | ||
final CloseableConsumer<JsonNode> recordConsumer = Yamls.listWriter(recordOutputWriter); | ||
tableStream.forEach(row -> Exceptions.toRuntime(() -> { | ||
recordConsumer.accept(row); | ||
})); | ||
recordConsumer.close(); | ||
} | ||
|
||
protected static Path buildTablePath(final Path storageRoot, final String tableName) { | ||
return storageRoot | ||
.resolve(DB_FOLDER_NAME) | ||
.resolve(String.format("%s.yaml", tableName.toUpperCase())); | ||
} | ||
|
||
public void dumpConfigs(Path parentFolder) throws IOException { | ||
List<String> directories = configDumpUtil.listDirectories(); | ||
for (String directory : directories) { | ||
List<JsonNode> configList = configDumpUtil.listConfig(directory); | ||
|
||
writeConfigsToArchive(parentFolder, directory, configList); | ||
} | ||
} | ||
|
||
private void writeConfigsToArchive(final Path storageRoot, | ||
final String schemaType, | ||
final List<JsonNode> configList) | ||
throws IOException { | ||
final Path configPath = buildConfigPath(storageRoot, schemaType); | ||
Files.createDirectories(configPath.getParent()); | ||
if (!configList.isEmpty()) { | ||
final List<JsonNode> sortedConfigs = configList.stream() | ||
.sorted(Comparator.comparing(JsonNode::toString)).collect( | ||
Collectors.toList()); | ||
Files.writeString(configPath, Yamls.serialize(sortedConfigs)); | ||
} else { | ||
// Create empty file | ||
Files.createFile(configPath); | ||
} | ||
} | ||
|
||
private static Path buildConfigPath(final Path storageRoot, final String schemaType) { | ||
return storageRoot.resolve(CONFIG_FOLDER_NAME) | ||
.resolve(String.format("%s.yaml", schemaType)); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.server; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.google.common.collect.Lists; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.config.ConfigSchema; | ||
import java.io.File; | ||
import java.io.IOException; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
import org.apache.commons.io.FileUtils; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ConfigDumpUtil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we put this behind the |
||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigDumpUtil.class); | ||
private final Path storageRoot; | ||
|
||
private static final String CONFIG_DIR = "config"; | ||
|
||
public ConfigDumpUtil(Path storageRoot) { | ||
cgardens marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.storageRoot = storageRoot.resolve(CONFIG_DIR); | ||
} | ||
|
||
public List<String> listDirectories() throws IOException { | ||
try (Stream<Path> files = Files.list(storageRoot)) { | ||
List<String> directoryName = files.map(c -> c.getFileName().toString()) | ||
.collect(Collectors.toList()); | ||
return directoryName; | ||
|
||
} | ||
} | ||
|
||
public void orphanDirectories() throws IOException { | ||
Set<String> configSchemas = Arrays.asList(ConfigSchema.values()).stream().map(c -> c.toString()) | ||
.collect( | ||
Collectors.toSet()); | ||
for (String directory : listDirectories()) { | ||
if (!configSchemas.contains(directory)) { | ||
File file = storageRoot.resolve(directory).toFile(); | ||
LOGGER.info("Deleting directory " + file); | ||
if (!FileUtils.deleteQuietly(file)) { | ||
LOGGER.warn("Could not delete directory " + file); | ||
} | ||
} | ||
} | ||
} | ||
|
||
public List<JsonNode> listConfig(String configType) throws IOException { | ||
final Path configTypePath = storageRoot.resolve(configType); | ||
if (!Files.exists(configTypePath)) { | ||
return Collections.emptyList(); | ||
} | ||
try (Stream<Path> files = Files.list(configTypePath)) { | ||
final List<String> ids = files | ||
.filter(p -> !p.endsWith(".json")) | ||
.map(p -> p.getFileName().toString().replace(".json", "")) | ||
.collect(Collectors.toList()); | ||
|
||
final List<JsonNode> configs = Lists.newArrayList(); | ||
for (String id : ids) { | ||
try { | ||
final Path configPath = storageRoot.resolve(configType).resolve(String.format("%s.json", id)); | ||
if (!Files.exists(configPath)) { | ||
throw new RuntimeException("Config NotFound"); | ||
} | ||
|
||
final JsonNode config = Jsons.deserialize(Files.readString(configPath), JsonNode.class); | ||
configs.add(config); | ||
} catch (RuntimeException e) { | ||
throw new IOException(e); | ||
} | ||
} | ||
|
||
return configs; | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yuck. but a good call would it make sense to move this logic into its own method? maybe a static on
AirbyteVersion
?