From 3fd8f2bf5288fa26cfca1415d6fca78c2a0299e9 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 15 Sep 2021 01:31:17 -0700 Subject: [PATCH 1/4] Debug migration acceptance test --- .../main/java/io/airbyte/migrate/Migrate.java | 14 +++-------- .../persistence/DefaultJobPersistence.java | 2 ++ .../MigrationAcceptanceTest.java | 25 ++++--------------- 3 files changed, 10 insertions(+), 31 deletions(-) diff --git a/airbyte-migration/src/main/java/io/airbyte/migrate/Migrate.java b/airbyte-migration/src/main/java/io/airbyte/migrate/Migrate.java index a564a5233a936..c64deb30e57de 100644 --- a/airbyte-migration/src/main/java/io/airbyte/migrate/Migrate.java +++ b/airbyte-migration/src/main/java/io/airbyte/migrate/Migrate.java @@ -36,7 +36,6 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.commons.yaml.Yamls; import io.airbyte.validation.json.JsonSchemaValidator; -import io.airbyte.validation.json.JsonValidationException; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; @@ -109,8 +108,9 @@ public void run(MigrateConfig migrateConfig) throws IOException { // run migration // write output of each migration to disk. final Migration migration = migrations.get(i); - LOGGER.info("Migrating from version: {} to version {}.", migrations.get(i - 1).getVersion(), migration.getVersion()); + LOGGER.info("Migrating from {} to {}", migrations.get(i - 1).getVersion(), migration.getVersion()); final Path outputPath = runMigration(migration, inputPath); + LOGGER.info("Migration completed from {} to {}", migrations.get(i - 1).getVersion(), migration.getVersion()); IOs.writeFile(outputPath.resolve(VERSION_FILE_NAME), migration.getVersion()); inputPath = outputPath; } @@ -131,15 +131,7 @@ private Path runMigration(Migration migration, Path migrationInputRoot) throws I final Map> inputDataStreams = inputData.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, - entry -> MoreStreams.toStream(entry.getValue()) - .peek(r -> { - try { - jsonSchemaValidator.ensure(migration.getInputSchema().get(entry.getKey()), r); - } catch (JsonValidationException e) { - throw new IllegalArgumentException( - String.format("Input data schema does not match declared input schema %s.", entry.getKey().getName()), e); - } - }))); + entry -> MoreStreams.toStream(entry.getValue()))); final Map outputStreams = createOutputStreams(migration, tmpOutputDir); // make the java compiler happy (it can't resolve that RecordConsumer is, in fact, a diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 22c489b708cc9..dd35a42c03bd0 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -636,6 +636,8 @@ private Stream exportTable(final String schema, final String tableName .map(Field::getName) .collect(Collectors.toSet()); final JsonNode row = Jsons.deserialize(record.formatJSON(DB_JSON_FORMAT)); + System.out.println("Json row: " + row); + LOGGER.info("Json row: {}", row); // for json fields, deserialize them so they are treated as objects instead of strings. this is to // get around that formatJson doesn't handle deserializing them for us. jsonFieldNames.forEach(jsonFieldName -> ((ObjectNode) row).replace(jsonFieldName, Jsons.deserialize(row.get(jsonFieldName).asText()))); diff --git a/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java b/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java index e6b4bf7b39093..c08d4e2ff1c6f 100644 --- a/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java +++ b/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java @@ -51,14 +51,12 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.test.airbyte_test_container.AirbyteTestContainer; import java.io.File; -import java.io.FileInputStream; import java.net.URISyntaxException; import java.nio.file.Path; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.function.Consumer; import org.junit.jupiter.api.Test; @@ -79,15 +77,7 @@ public class MigrationAcceptanceTest { @Test public void testAutomaticMigration() throws Exception { - // default to version in env file but can override it. - final String targetVersion; - if (System.getenv("MIGRATION_TEST_VERSION") != null) { - targetVersion = System.getenv("MIGRATION_TEST_VERSION"); - } else { - final Properties prop = new Properties(); - prop.load(new FileInputStream(ENV_FILE)); - targetVersion = prop.getProperty("VERSION"); - } + final String targetVersion = "0.29.16-alpha"; LOGGER.info("Using version: {} as target version", targetVersion); firstRun(); @@ -95,13 +85,7 @@ public void testAutomaticMigration() throws Exception { } private Consumer logConsumerForServer(Set expectedLogs) { - return logLine -> expectedLogs.removeIf(entry -> { - if (logLine.contains("Migrating from version")) { - System.out.println("logLine = " + logLine); - System.out.println("logLine = " + logLine); - } - return logLine.contains(entry); - }); + return logLine -> expectedLogs.removeIf(logLine::contains); } @SuppressWarnings("UnstableApiUsage") @@ -133,7 +117,6 @@ private String targetVersionWithoutPatch(String targetVersion) { return AirbyteVersion.versionWithoutPatch(targetVersion).getVersion(); } - @SuppressWarnings("UnstableApiUsage") private void secondRun(String targetVersion) throws Exception { final Set logsToExpect = new HashSet<>(); logsToExpect.add("Version: " + targetVersion); @@ -145,7 +128,8 @@ private void secondRun(String targetVersion) throws Exception { logsToExpect.add("Migrating from version: 0.22.0-alpha to version 0.23.0-alpha."); logsToExpect.add("Migrations complete. Now on version: " + targetVersionWithoutPatch(targetVersion)); - final AirbyteTestContainer airbyteTestContainer = new AirbyteTestContainer.Builder(new File(Resources.getResource("docker-compose.yaml").toURI())) + final File dockerComposeFile = Path.of(System.getProperty("user.dir")).getParent().resolve("docker-compose.yaml").toFile(); + final AirbyteTestContainer airbyteTestContainer = new AirbyteTestContainer.Builder(dockerComposeFile) .setEnv(ENV_FILE) // override to use test mounts. .setEnvVariable("DATA_DOCKER_MOUNT", "airbyte_data_migration_test") @@ -153,6 +137,7 @@ private void secondRun(String targetVersion) throws Exception { .setEnvVariable("WORKSPACE_DOCKER_MOUNT", "airbyte_workspace_migration_test") .setEnvVariable("LOCAL_ROOT", "/tmp/airbyte_local_migration_test") .setEnvVariable("LOCAL_DOCKER_MOUNT", "/tmp/airbyte_local_migration_test") + .setEnvVariable("VERSION", targetVersion) .setLogListener("server", logConsumerForServer(logsToExpect)) .build(); From c67d2fc3f2b89815b74155e6a2b4cd6b827be5b6 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 15 Sep 2021 01:50:48 -0700 Subject: [PATCH 2/4] Move comment to exception message --- .../io/airbyte/migrate/MigrationCurrentSchemaTest.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/airbyte-migration/src/test/java/io/airbyte/migrate/MigrationCurrentSchemaTest.java b/airbyte-migration/src/test/java/io/airbyte/migrate/MigrationCurrentSchemaTest.java index f330df5c3b002..685f98de6c10f 100644 --- a/airbyte-migration/src/test/java/io/airbyte/migrate/MigrationCurrentSchemaTest.java +++ b/airbyte-migration/src/test/java/io/airbyte/migrate/MigrationCurrentSchemaTest.java @@ -30,14 +30,11 @@ public class MigrationCurrentSchemaTest { - /** - * The file-based migration is deprecated. We need to ensure that v0.29.0 is the last one. All new - * migrations should be written in Flyway. - */ @Test public void testLastMigration() { final Migration lastMigration = Migrations.MIGRATIONS.get(Migrations.MIGRATIONS.size() - 1); - assertEquals(Migrations.MIGRATION_V_0_29_0.getVersion(), lastMigration.getVersion()); + assertEquals(Migrations.MIGRATION_V_0_29_0.getVersion(), lastMigration.getVersion(), + "The file-based migration is deprecated. Please do not write a new migration this way. Use Flyway instead."); } } From 93c9b6ab8558be5143778a285f1835eaf229ab68 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 15 Sep 2021 01:50:59 -0700 Subject: [PATCH 3/4] Revert "Debug migration acceptance test" This reverts commit 3fd8f2bf5288fa26cfca1415d6fca78c2a0299e9. --- .../main/java/io/airbyte/migrate/Migrate.java | 14 ++++++++--- .../persistence/DefaultJobPersistence.java | 2 -- .../MigrationAcceptanceTest.java | 25 +++++++++++++++---- 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/airbyte-migration/src/main/java/io/airbyte/migrate/Migrate.java b/airbyte-migration/src/main/java/io/airbyte/migrate/Migrate.java index c64deb30e57de..a564a5233a936 100644 --- a/airbyte-migration/src/main/java/io/airbyte/migrate/Migrate.java +++ b/airbyte-migration/src/main/java/io/airbyte/migrate/Migrate.java @@ -36,6 +36,7 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.commons.yaml.Yamls; import io.airbyte.validation.json.JsonSchemaValidator; +import io.airbyte.validation.json.JsonValidationException; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; @@ -108,9 +109,8 @@ public void run(MigrateConfig migrateConfig) throws IOException { // run migration // write output of each migration to disk. final Migration migration = migrations.get(i); - LOGGER.info("Migrating from {} to {}", migrations.get(i - 1).getVersion(), migration.getVersion()); + LOGGER.info("Migrating from version: {} to version {}.", migrations.get(i - 1).getVersion(), migration.getVersion()); final Path outputPath = runMigration(migration, inputPath); - LOGGER.info("Migration completed from {} to {}", migrations.get(i - 1).getVersion(), migration.getVersion()); IOs.writeFile(outputPath.resolve(VERSION_FILE_NAME), migration.getVersion()); inputPath = outputPath; } @@ -131,7 +131,15 @@ private Path runMigration(Migration migration, Path migrationInputRoot) throws I final Map> inputDataStreams = inputData.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, - entry -> MoreStreams.toStream(entry.getValue()))); + entry -> MoreStreams.toStream(entry.getValue()) + .peek(r -> { + try { + jsonSchemaValidator.ensure(migration.getInputSchema().get(entry.getKey()), r); + } catch (JsonValidationException e) { + throw new IllegalArgumentException( + String.format("Input data schema does not match declared input schema %s.", entry.getKey().getName()), e); + } + }))); final Map outputStreams = createOutputStreams(migration, tmpOutputDir); // make the java compiler happy (it can't resolve that RecordConsumer is, in fact, a diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index dd35a42c03bd0..22c489b708cc9 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -636,8 +636,6 @@ private Stream exportTable(final String schema, final String tableName .map(Field::getName) .collect(Collectors.toSet()); final JsonNode row = Jsons.deserialize(record.formatJSON(DB_JSON_FORMAT)); - System.out.println("Json row: " + row); - LOGGER.info("Json row: {}", row); // for json fields, deserialize them so they are treated as objects instead of strings. this is to // get around that formatJson doesn't handle deserializing them for us. jsonFieldNames.forEach(jsonFieldName -> ((ObjectNode) row).replace(jsonFieldName, Jsons.deserialize(row.get(jsonFieldName).asText()))); diff --git a/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java b/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java index c08d4e2ff1c6f..e6b4bf7b39093 100644 --- a/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java +++ b/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java @@ -51,12 +51,14 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.test.airbyte_test_container.AirbyteTestContainer; import java.io.File; +import java.io.FileInputStream; import java.net.URISyntaxException; import java.nio.file.Path; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.function.Consumer; import org.junit.jupiter.api.Test; @@ -77,7 +79,15 @@ public class MigrationAcceptanceTest { @Test public void testAutomaticMigration() throws Exception { - final String targetVersion = "0.29.16-alpha"; + // default to version in env file but can override it. + final String targetVersion; + if (System.getenv("MIGRATION_TEST_VERSION") != null) { + targetVersion = System.getenv("MIGRATION_TEST_VERSION"); + } else { + final Properties prop = new Properties(); + prop.load(new FileInputStream(ENV_FILE)); + targetVersion = prop.getProperty("VERSION"); + } LOGGER.info("Using version: {} as target version", targetVersion); firstRun(); @@ -85,7 +95,13 @@ public void testAutomaticMigration() throws Exception { } private Consumer logConsumerForServer(Set expectedLogs) { - return logLine -> expectedLogs.removeIf(logLine::contains); + return logLine -> expectedLogs.removeIf(entry -> { + if (logLine.contains("Migrating from version")) { + System.out.println("logLine = " + logLine); + System.out.println("logLine = " + logLine); + } + return logLine.contains(entry); + }); } @SuppressWarnings("UnstableApiUsage") @@ -117,6 +133,7 @@ private String targetVersionWithoutPatch(String targetVersion) { return AirbyteVersion.versionWithoutPatch(targetVersion).getVersion(); } + @SuppressWarnings("UnstableApiUsage") private void secondRun(String targetVersion) throws Exception { final Set logsToExpect = new HashSet<>(); logsToExpect.add("Version: " + targetVersion); @@ -128,8 +145,7 @@ private void secondRun(String targetVersion) throws Exception { logsToExpect.add("Migrating from version: 0.22.0-alpha to version 0.23.0-alpha."); logsToExpect.add("Migrations complete. Now on version: " + targetVersionWithoutPatch(targetVersion)); - final File dockerComposeFile = Path.of(System.getProperty("user.dir")).getParent().resolve("docker-compose.yaml").toFile(); - final AirbyteTestContainer airbyteTestContainer = new AirbyteTestContainer.Builder(dockerComposeFile) + final AirbyteTestContainer airbyteTestContainer = new AirbyteTestContainer.Builder(new File(Resources.getResource("docker-compose.yaml").toURI())) .setEnv(ENV_FILE) // override to use test mounts. .setEnvVariable("DATA_DOCKER_MOUNT", "airbyte_data_migration_test") @@ -137,7 +153,6 @@ private void secondRun(String targetVersion) throws Exception { .setEnvVariable("WORKSPACE_DOCKER_MOUNT", "airbyte_workspace_migration_test") .setEnvVariable("LOCAL_ROOT", "/tmp/airbyte_local_migration_test") .setEnvVariable("LOCAL_DOCKER_MOUNT", "/tmp/airbyte_local_migration_test") - .setEnvVariable("VERSION", targetVersion) .setLogListener("server", logConsumerForServer(logsToExpect)) .build(); From 7f5fb5e9e61151aa2ba9f929b2158e9ef7a53a4d Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 15 Sep 2021 02:29:28 -0700 Subject: [PATCH 4/4] Disable auto migration acceptance test --- .../MigrationAcceptanceTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java b/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java index e6b4bf7b39093..d4db2e47935b5 100644 --- a/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java +++ b/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java @@ -61,6 +61,7 @@ import java.util.Properties; import java.util.Set; import java.util.function.Consumer; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +78,11 @@ public class MigrationAcceptanceTest { // assume env file is one directory level up from airbyte-tests. private final static File ENV_FILE = Path.of(System.getProperty("user.dir")).getParent().resolve(".env").toFile(); + /** + * This test is deprecated because it no longer works after the introduce of the Flyway migration. + */ @Test + @Disabled public void testAutomaticMigration() throws Exception { // default to version in env file but can override it. final String targetVersion;