Skip to content

Commit ee150e3

Browse files
gosusnpedgao
andauthored
Update airbyte protocol migration (#20745)
* Extract MigrationContainer from AirbyteMessageMigrator * Add ConfiguredAirbyteCatalogMigrations * Add ConfiguredAirbyteCatalog to AirbyteMessageMigrations * Enable ConfiguredAirbyteCatalog migration * Fix tests * Remove extra this. * Add missing docs * Typo Co-authored-by: Edward Gao <[email protected]>
1 parent c1d7736 commit ee150e3

File tree

32 files changed

+593
-243
lines changed

32 files changed

+593
-243
lines changed

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java

Lines changed: 25 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@
66

77
import com.google.common.annotations.VisibleForTesting;
88
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration;
9+
import io.airbyte.commons.protocol.migrations.MigrationContainer;
910
import io.airbyte.commons.version.Version;
11+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
1012
import jakarta.annotation.PostConstruct;
1113
import jakarta.inject.Singleton;
12-
import java.util.Collection;
13-
import java.util.Collections;
1414
import java.util.List;
15+
import java.util.Optional;
1516
import java.util.Set;
16-
import java.util.SortedMap;
17-
import java.util.TreeMap;
1817

1918
/**
2019
* AirbyteProtocol Message Migrator
@@ -25,104 +24,59 @@
2524
@Singleton
2625
public class AirbyteMessageMigrator {
2726

28-
private final List<AirbyteMessageMigration<?, ?>> migrationsToRegister;
29-
private final SortedMap<String, AirbyteMessageMigration<?, ?>> migrations = new TreeMap<>();
30-
private String mostRecentMajorVersion = "";
27+
private final MigrationContainer<AirbyteMessageMigration<?, ?>> migrationContainer;
3128

32-
public AirbyteMessageMigrator(List<AirbyteMessageMigration<?, ?>> migrations) {
33-
migrationsToRegister = migrations;
34-
}
35-
36-
public AirbyteMessageMigrator() {
37-
this(Collections.emptyList());
29+
public AirbyteMessageMigrator(final List<AirbyteMessageMigration<?, ?>> migrations) {
30+
migrationContainer = new MigrationContainer<>(migrations);
3831
}
3932

4033
@PostConstruct
4134
public void initialize() {
42-
migrationsToRegister.forEach(this::registerMigration);
35+
migrationContainer.initialize();
4336
}
4437

4538
/**
4639
* Downgrade a message from the most recent version to the target version by chaining all the
4740
* required migrations
4841
*/
49-
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message, final Version target) {
50-
if (target.getMajorVersion().equals(mostRecentMajorVersion)) {
51-
return (PreviousVersion) message;
52-
}
53-
54-
Object result = message;
55-
Object[] selectedMigrations = selectMigrations(target).toArray();
56-
for (int i = selectedMigrations.length; i > 0; --i) {
57-
result = applyDowngrade((AirbyteMessageMigration<?, ?>) selectedMigrations[i - 1], result);
58-
}
59-
return (PreviousVersion) result;
42+
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message,
43+
final Version target,
44+
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
45+
return migrationContainer.downgrade(message, target, (migration, msg) -> applyDowngrade(migration, msg, configuredAirbyteCatalog));
6046
}
6147

6248
/**
6349
* Upgrade a message from the source version to the most recent version by chaining all the required
6450
* migrations
6551
*/
66-
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message, final Version source) {
67-
if (source.getMajorVersion().equals(mostRecentMajorVersion)) {
68-
return (CurrentVersion) message;
69-
}
70-
71-
Object result = message;
72-
for (var migration : selectMigrations(source)) {
73-
result = applyUpgrade(migration, result);
74-
}
75-
return (CurrentVersion) result;
52+
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message,
53+
final Version source,
54+
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
55+
return migrationContainer.upgrade(message, source, (migration, msg) -> applyUpgrade(migration, msg, configuredAirbyteCatalog));
7656
}
7757

7858
public Version getMostRecentVersion() {
79-
return new Version(mostRecentMajorVersion, "0", "0");
80-
}
81-
82-
private Collection<AirbyteMessageMigration<?, ?>> selectMigrations(final Version version) {
83-
final Collection<AirbyteMessageMigration<?, ?>> results = migrations.tailMap(version.getMajorVersion()).values();
84-
if (results.isEmpty()) {
85-
throw new RuntimeException("Unsupported migration version " + version.serialize());
86-
}
87-
return results;
59+
return migrationContainer.getMostRecentVersion();
8860
}
8961

9062
// Helper function to work around type casting
91-
private <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
92-
final Object message) {
93-
return migration.downgrade((CurrentVersion) message);
63+
private static <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
64+
final Object message,
65+
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
66+
return migration.downgrade((CurrentVersion) message, configuredAirbyteCatalog);
9467
}
9568

9669
// Helper function to work around type casting
97-
private <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
98-
final Object message) {
99-
return migration.upgrade((PreviousVersion) message);
100-
}
101-
102-
/**
103-
* Store migration in a sorted map key by the major of the lower version of the migration.
104-
*
105-
* The goal is to be able to retrieve the list of migrations to apply to get to/from a given
106-
* version. We are only keying on the lower version because the right side (most recent version of
107-
* the migration range) is always current version.
108-
*/
109-
@VisibleForTesting
110-
void registerMigration(final AirbyteMessageMigration<?, ?> migration) {
111-
final String key = migration.getPreviousVersion().getMajorVersion();
112-
if (!migrations.containsKey(key)) {
113-
migrations.put(key, migration);
114-
if (migration.getCurrentVersion().getMajorVersion().compareTo(mostRecentMajorVersion) > 0) {
115-
mostRecentMajorVersion = migration.getCurrentVersion().getMajorVersion();
116-
}
117-
} else {
118-
throw new RuntimeException("Trying to register a duplicated migration " + migration.getClass().getName());
119-
}
70+
private static <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
71+
final Object message,
72+
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
73+
return migration.upgrade((PreviousVersion) message, configuredAirbyteCatalog);
12074
}
12175

12276
// Used for inspection of the injection
12377
@VisibleForTesting
12478
Set<String> getMigrationKeys() {
125-
return migrations.keySet();
79+
return migrationContainer.getMigrationKeys();
12680
}
12781

12882
}

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigrator.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
import io.airbyte.commons.version.Version;
88
import io.airbyte.protocol.models.AirbyteMessage;
9+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
10+
import java.util.Optional;
911

1012
/**
1113
* Wraps message migration from a fixed version to the most recent version
@@ -20,12 +22,12 @@ public AirbyteMessageVersionedMigrator(final AirbyteMessageMigrator migrator, fi
2022
this.version = version;
2123
}
2224

23-
public OriginalMessageType downgrade(final AirbyteMessage message) {
24-
return migrator.downgrade(message, version);
25+
public OriginalMessageType downgrade(final AirbyteMessage message, final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
26+
return migrator.downgrade(message, version, configuredAirbyteCatalog);
2527
}
2628

27-
public AirbyteMessage upgrade(final OriginalMessageType message) {
28-
return migrator.upgrade(message, version);
29+
public AirbyteMessage upgrade(final OriginalMessageType message, final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
30+
return migrator.upgrade(message, version, configuredAirbyteCatalog);
2931
}
3032

3133
public Version getVersion() {

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigratorFactory.java

Lines changed: 0 additions & 30 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.protocol;
6+
7+
import io.airbyte.commons.version.Version;
8+
import jakarta.inject.Singleton;
9+
10+
/**
11+
* Factory to build AirbyteMessageVersionedMigrator
12+
*/
13+
@Singleton
14+
public class AirbyteProtocolVersionedMigratorFactory {
15+
16+
private final AirbyteMessageMigrator airbyteMessageMigrator;
17+
private final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator;
18+
19+
public AirbyteProtocolVersionedMigratorFactory(final AirbyteMessageMigrator airbyteMessageMigrator,
20+
final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator) {
21+
this.airbyteMessageMigrator = airbyteMessageMigrator;
22+
this.configuredAirbyteCatalogMigrator = configuredAirbyteCatalogMigrator;
23+
}
24+
25+
public <T> AirbyteMessageVersionedMigrator<T> getAirbyteMessageMigrator(final Version version) {
26+
return new AirbyteMessageVersionedMigrator<>(airbyteMessageMigrator, version);
27+
}
28+
29+
public final VersionedProtocolSerializer getProtocolSerializer(final Version version) {
30+
return new VersionedProtocolSerializer(configuredAirbyteCatalogMigrator, version);
31+
}
32+
33+
public Version getMostRecentVersion() {
34+
return airbyteMessageMigrator.getMostRecentVersion();
35+
}
36+
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.protocol;
6+
7+
import com.google.common.annotations.VisibleForTesting;
8+
import io.airbyte.commons.protocol.migrations.ConfiguredAirbyteCatalogMigration;
9+
import io.airbyte.commons.protocol.migrations.MigrationContainer;
10+
import io.airbyte.commons.version.Version;
11+
import jakarta.annotation.PostConstruct;
12+
import jakarta.inject.Singleton;
13+
import java.util.List;
14+
import java.util.Set;
15+
16+
@Singleton
17+
public class ConfiguredAirbyteCatalogMigrator {
18+
19+
private final MigrationContainer<ConfiguredAirbyteCatalogMigration<?, ?>> migrationContainer;
20+
21+
public ConfiguredAirbyteCatalogMigrator(final List<ConfiguredAirbyteCatalogMigration<?, ?>> migrations) {
22+
migrationContainer = new MigrationContainer<>(migrations);
23+
}
24+
25+
@PostConstruct
26+
public void initialize() {
27+
migrationContainer.initialize();
28+
}
29+
30+
/**
31+
* Downgrade a message from the most recent version to the target version by chaining all the
32+
* required migrations
33+
*/
34+
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message, final Version target) {
35+
return migrationContainer.downgrade(message, target, ConfiguredAirbyteCatalogMigrator::applyDowngrade);
36+
}
37+
38+
/**
39+
* Upgrade a message from the source version to the most recent version by chaining all the required
40+
* migrations
41+
*/
42+
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message, final Version source) {
43+
return migrationContainer.upgrade(message, source, ConfiguredAirbyteCatalogMigrator::applyUpgrade);
44+
}
45+
46+
public Version getMostRecentVersion() {
47+
return migrationContainer.getMostRecentVersion();
48+
}
49+
50+
// Helper function to work around type casting
51+
private static <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final ConfiguredAirbyteCatalogMigration<PreviousVersion, CurrentVersion> migration,
52+
final Object message) {
53+
return migration.downgrade((CurrentVersion) message);
54+
}
55+
56+
// Helper function to work around type casting
57+
private static <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final ConfiguredAirbyteCatalogMigration<PreviousVersion, CurrentVersion> migration,
58+
final Object message) {
59+
return migration.upgrade((PreviousVersion) message);
60+
}
61+
62+
// Used for inspection of the injection
63+
@VisibleForTesting
64+
Set<String> getMigrationKeys() {
65+
return migrationContainer.getMigrationKeys();
66+
}
67+
68+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.protocol;
6+
7+
import io.airbyte.commons.json.Jsons;
8+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
9+
10+
public class DefaultProtocolSerializer implements ProtocolSerializer {
11+
12+
@Override
13+
public String serialize(ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
14+
return Jsons.serialize(configuredAirbyteCatalog);
15+
}
16+
17+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.protocol;
6+
7+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
8+
9+
public interface ProtocolSerializer {
10+
11+
String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog);
12+
13+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.protocol;
6+
7+
import io.airbyte.commons.json.Jsons;
8+
import io.airbyte.commons.version.Version;
9+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
10+
11+
/**
12+
* Serialize a ConfiguredAirbyteCatalog to the specified version
13+
* <p>
14+
* This Serializer expects a ConfiguredAirbyteCatalog from the Current version of the platform,
15+
* converts it to the target protocol version before serializing it.
16+
*/
17+
public class VersionedProtocolSerializer implements ProtocolSerializer {
18+
19+
private final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator;
20+
private final Version protocolVersion;
21+
22+
public VersionedProtocolSerializer(final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator, final Version protocolVersion) {
23+
this.configuredAirbyteCatalogMigrator = configuredAirbyteCatalogMigrator;
24+
this.protocolVersion = protocolVersion;
25+
}
26+
27+
@Override
28+
public String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
29+
return Jsons.serialize(configuredAirbyteCatalogMigrator.downgrade(configuredAirbyteCatalog, protocolVersion));
30+
}
31+
32+
}

0 commit comments

Comments
 (0)