Skip to content

Commit aa5ed6d

Browse files
authored
Go back v0 as default (#22278)
* Downgrade protocol version max back to v0 * update config to specify v0 as max protocol version * revert back v0 ser/de to use the objects from the unversioned namespace * disable v1 migrations * fix the default to allow MigrationContainer to run without a migration * Add on the fly catalog downgrades to v0 when needed * Rollback normalization tag to 0.2.25 * Revert "Rollback normalization tag to 0.2.25" This reverts commit 99044eb. * Fix micronaut migration injection tests * Fix tests, matching back to the previous behavior * Format * Revert integration test back to the old behavior * Fix missed test revert
1 parent 6d65070 commit aa5ed6d

File tree

21 files changed

+183
-79
lines changed

21 files changed

+183
-79
lines changed

airbyte-bootloader/src/main/resources/application.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ airbyte:
2222
target:
2323
range:
2424
min-version: ${AIRBYTE_PROTOCOL_VERSION_MIN:0.0.0}
25-
max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:1.0.0}
25+
max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:0.3.0}
2626
secret:
2727
persistence: ${SECRET_PERSISTENCE:TESTING_CONFIG_DB_TABLE}
2828
store:

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/MigrationContainer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ public class MigrationContainer<T extends Migration> {
1616

1717
private final List<T> migrationsToRegister;
1818
private final SortedMap<String, T> migrations = new TreeMap<>();
19-
private String mostRecentMajorVersion = "";
19+
20+
// mostRecentMajorVersion defaults to v0 as no migration is required
21+
private String mostRecentMajorVersion = "0";
2022

2123
public MigrationContainer(final List<T> migrations) {
2224
this.migrationsToRegister = migrations;

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/AirbyteMessageMigrationV1.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@
2424
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
2525
import io.airbyte.protocol.models.JsonSchemaReferenceTypes;
2626
import io.airbyte.validation.json.JsonSchemaValidator;
27-
import jakarta.inject.Singleton;
2827
import java.util.Iterator;
2928
import java.util.Map.Entry;
3029
import java.util.Objects;
3130
import java.util.Optional;
3231

33-
@Singleton
32+
// Disable V1 Migration, uncomment to re-enable
33+
// @Singleton
3434
public class AirbyteMessageMigrationV1 implements AirbyteMessageMigration<io.airbyte.protocol.models.v0.AirbyteMessage, AirbyteMessage> {
3535

3636
private final JsonSchemaValidator validator;

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/CatalogMigrationV1Helper.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,102 @@ private static boolean hasV0DataType(final JsonNode schema) {
118118
return false;
119119
}
120120

121+
/**
122+
* Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected
123+
*
124+
* @param configuredAirbyteCatalog to migrate
125+
*/
126+
public static void downgradeSchemaIfNeeded(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
127+
if (containsV1DataTypes(configuredAirbyteCatalog)) {
128+
downgradeSchema(configuredAirbyteCatalog);
129+
}
130+
}
131+
132+
/**
133+
* Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected
134+
*
135+
* @param airbyteCatalog to migrate
136+
*/
137+
public static void downgradeSchemaIfNeeded(final AirbyteCatalog airbyteCatalog) {
138+
if (containsV1DataTypes(airbyteCatalog)) {
139+
downgradeSchema(airbyteCatalog);
140+
}
141+
}
142+
143+
/**
144+
* Performs an in-place migration of the schema from v1 to v0
145+
*
146+
* @param configuredAirbyteCatalog to migrate
147+
*/
148+
private static void downgradeSchema(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
149+
for (final var stream : configuredAirbyteCatalog.getStreams()) {
150+
SchemaMigrationV1.downgradeSchema(stream.getStream().getJsonSchema());
151+
}
152+
}
153+
154+
/**
155+
* Performs an in-place migration of the schema from v1 to v0
156+
*
157+
* @param airbyteCatalog to migrate
158+
*/
159+
private static void downgradeSchema(final AirbyteCatalog airbyteCatalog) {
160+
for (final var stream : airbyteCatalog.getStreams()) {
161+
SchemaMigrationV1.downgradeSchema(stream.getJsonSchema());
162+
}
163+
}
164+
165+
/**
166+
* Returns true if catalog contains v1 data types
167+
*/
168+
private static boolean containsV1DataTypes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
169+
if (configuredAirbyteCatalog == null) {
170+
return false;
171+
}
172+
173+
return configuredAirbyteCatalog
174+
.getStreams()
175+
.stream().findFirst()
176+
.map(ConfiguredAirbyteStream::getStream)
177+
.map(CatalogMigrationV1Helper::streamContainsV1DataTypes)
178+
.orElse(false);
179+
}
180+
181+
/**
182+
* Returns true if catalog contains v1 data types
183+
*/
184+
private static boolean containsV1DataTypes(final AirbyteCatalog airbyteCatalog) {
185+
if (airbyteCatalog == null) {
186+
return false;
187+
}
188+
189+
return airbyteCatalog
190+
.getStreams()
191+
.stream().findFirst()
192+
.map(CatalogMigrationV1Helper::streamContainsV1DataTypes)
193+
.orElse(false);
194+
}
195+
196+
private static boolean streamContainsV1DataTypes(final AirbyteStream airbyteStream) {
197+
if (airbyteStream == null || airbyteStream.getJsonSchema() == null) {
198+
return false;
199+
}
200+
return hasV1DataType(airbyteStream.getJsonSchema());
201+
}
202+
203+
/**
204+
* Performs of search of a v0 data type node, returns true at the first node found.
205+
*/
206+
private static boolean hasV1DataType(final JsonNode schema) {
207+
if (SchemaMigrationV1.isPrimitiveReferenceTypeDeclaration(schema)) {
208+
return true;
209+
}
210+
211+
for (final JsonNode subSchema : SchemaMigrations.findSubschemas(schema)) {
212+
if (hasV1DataType(subSchema)) {
213+
return true;
214+
}
215+
}
216+
return false;
217+
}
218+
121219
}

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/ConfiguredAirbyteCatalogMigrationV1.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
import io.airbyte.commons.version.Version;
1212
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
1313
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
14-
import jakarta.inject.Singleton;
1514

16-
@Singleton
15+
// Disable V1 Migration, uncomment to re-enable
16+
// @Singleton
1717
public class ConfiguredAirbyteCatalogMigrationV1
1818
implements ConfiguredAirbyteCatalogMigration<io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog, ConfiguredAirbyteCatalog> {
1919

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/v1/SchemaMigrationV1.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ static boolean isPrimitiveTypeDeclaration(final JsonNode schema) {
6565
* Detects any schema that looks like a reference type declaration, e.g.: { "$ref":
6666
* "WellKnownTypes.json...." } or { "oneOf": [{"$ref": "..."}, {"type": "object"}] }
6767
*/
68-
private static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) {
68+
static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) {
6969
if (!schema.isObject()) {
7070
// Non-object schemas (i.e. true/false) never need to be modified
7171
return false;

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0Deserializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package io.airbyte.commons.protocol.serde;
66

77
import io.airbyte.commons.version.AirbyteProtocolVersion;
8-
import io.airbyte.protocol.models.v0.AirbyteMessage;
8+
import io.airbyte.protocol.models.AirbyteMessage;
99
import jakarta.inject.Singleton;
1010

1111
@Singleton

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0Serializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package io.airbyte.commons.protocol.serde;
66

77
import io.airbyte.commons.version.AirbyteProtocolVersion;
8-
import io.airbyte.protocol.models.v0.AirbyteMessage;
8+
import io.airbyte.protocol.models.AirbyteMessage;
99
import jakarta.inject.Singleton;
1010

1111
@Singleton

airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/MigratorsMicronautTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class MigratorsMicronautTest {
2222

2323
// This should contain the list of all the supported majors of the airbyte protocol except the most
2424
// recent one since the migrations themselves are keyed on the lower version.
25-
final Set<String> SUPPORTED_VERSIONS = Set.of("0");
25+
final Set<String> SUPPORTED_VERSIONS = Set.of();
2626

2727
@Test
2828
void testAirbyteMessageMigrationInjection() {

airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/serde/AirbyteMessageV0SerDeTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
import static org.junit.jupiter.api.Assertions.assertEquals;
88

99
import io.airbyte.commons.json.Jsons;
10-
import io.airbyte.protocol.models.v0.AirbyteMessage;
11-
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
12-
import io.airbyte.protocol.models.v0.ConnectorSpecification;
10+
import io.airbyte.protocol.models.AirbyteMessage;
11+
import io.airbyte.protocol.models.AirbyteMessage.Type;
12+
import io.airbyte.protocol.models.ConnectorSpecification;
1313
import java.net.URI;
1414
import java.net.URISyntaxException;
1515
import org.junit.jupiter.api.Test;

airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider;
1313
import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory;
1414
import io.airbyte.commons.protocol.ConfiguredAirbyteCatalogMigrator;
15-
import io.airbyte.commons.protocol.migrations.v1.AirbyteMessageMigrationV1;
16-
import io.airbyte.commons.protocol.migrations.v1.ConfiguredAirbyteCatalogMigrationV1;
1715
import io.airbyte.commons.protocol.serde.AirbyteMessageV0Deserializer;
1816
import io.airbyte.commons.protocol.serde.AirbyteMessageV0Serializer;
1917
import io.airbyte.commons.protocol.serde.AirbyteMessageV1Deserializer;
@@ -45,10 +43,12 @@ void beforeEach() {
4543
List.of(new AirbyteMessageV0Serializer(), new AirbyteMessageV1Serializer())));
4644
serDeProvider.initialize();
4745
final AirbyteMessageMigrator airbyteMessageMigrator = new AirbyteMessageMigrator(
48-
List.of(new AirbyteMessageMigrationV1()));
46+
// TODO once data types v1 is re-enabled, this test should contain the migration
47+
List.of(/* new AirbyteMessageMigrationV1() */));
4948
airbyteMessageMigrator.initialize();
5049
final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator = new ConfiguredAirbyteCatalogMigrator(
51-
List.of(new ConfiguredAirbyteCatalogMigrationV1()));
50+
// TODO once data types v1 is re-enabled, this test should contain the migration
51+
List.of(/* new ConfiguredAirbyteCatalogMigrationV1() */));
5252
configuredAirbyteCatalogMigrator.initialize();
5353
migratorFactory = spy(new AirbyteProtocolVersionedMigratorFactory(airbyteMessageMigrator, configuredAirbyteCatalogMigrator));
5454
}

airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ public class EnvConfigs implements Configs {
218218
private static final long DEFAULT_MAX_SYNC_WORKERS = 5;
219219
private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5;
220220
private static final String DEFAULT_NETWORK = "host";
221-
private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MAX = new Version("1.0.0");
221+
private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MAX = new Version("0.3.0");
222222
private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MIN = new Version("0.0.0");
223223
private static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
224224
private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";

airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ public static StandardSync buildStandardSync(final Record record, final List<UUI
9898
private static ConfiguredAirbyteCatalog parseConfiguredAirbyteCatalog(final String configuredAirbyteCatalogString) {
9999
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = Jsons.deserialize(configuredAirbyteCatalogString, ConfiguredAirbyteCatalog.class);
100100
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
101-
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog);
101+
// TODO feature flag this for data types rollout
102+
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog);
103+
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(configuredAirbyteCatalog);
102104
return configuredAirbyteCatalog;
103105
}
104106

@@ -249,7 +251,9 @@ public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Rec
249251
public static AirbyteCatalog parseAirbyteCatalog(final String airbyteCatalogString) {
250252
final AirbyteCatalog airbyteCatalog = Jsons.deserialize(airbyteCatalogString, AirbyteCatalog.class);
251253
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
252-
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog);
254+
// TODO feature flag this for data types rollout
255+
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog);
256+
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(airbyteCatalog);
253257
return airbyteCatalog;
254258
}
255259

airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ void testSimpleInsertActorCatalog() throws IOException, JsonValidationException,
161161
configRepository.writeSourceConnectionNoSecrets(source);
162162

163163
final AirbyteCatalog actorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING));
164-
final AirbyteCatalog expectedActorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING_V1));
164+
final AirbyteCatalog expectedActorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING));
165165
configRepository.writeActorCatalogFetchEvent(
166166
actorCatalog, source.getSourceId(), DOCKER_IMAGE_TAG, CONFIG_HASH);
167167

@@ -201,7 +201,8 @@ void testSimpleInsertActorCatalog() throws IOException, JsonValidationException,
201201
assertEquals(expectedActorCatalog, Jsons.object(catalogNewConfig.get().getCatalog(), AirbyteCatalog.class));
202202

203203
final int catalogDbEntry2 = database.query(ctx -> ctx.selectCount().from(ACTOR_CATALOG)).fetchOne().into(int.class);
204-
assertEquals(2, catalogDbEntry2);
204+
// TODO this should be 2 once we re-enable datatypes v1
205+
assertEquals(1, catalogDbEntry2);
205206
}
206207

207208
@Test
@@ -484,13 +485,16 @@ void testGetStandardSyncUsingOperation() throws IOException {
484485
}
485486

486487
private List<StandardSync> copyWithV1Types(final List<StandardSync> syncs) {
487-
return syncs.stream()
488-
.map(standardSync -> {
489-
final StandardSync copiedStandardSync = Jsons.deserialize(Jsons.serialize(standardSync), StandardSync.class);
490-
copiedStandardSync.setCatalog(MockData.getConfiguredCatalogWithV1DataTypes());
491-
return copiedStandardSync;
492-
})
493-
.toList();
488+
return syncs;
489+
// TODO adjust with data types feature flag testing
490+
// return syncs.stream()
491+
// .map(standardSync -> {
492+
// final StandardSync copiedStandardSync = Jsons.deserialize(Jsons.serialize(standardSync),
493+
// StandardSync.class);
494+
// copiedStandardSync.setCatalog(MockData.getConfiguredCatalogWithV1DataTypes());
495+
// return copiedStandardSync;
496+
// })
497+
// .toList();
494498
}
495499

496500
private void assertSyncsMatch(final List<StandardSync> expectedSyncs, final List<StandardSync> actualSyncs) {

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -931,9 +931,13 @@ private static JobConfig parseJobConfigFromString(final String jobConfigString)
931931
final JobConfig jobConfig = Jsons.deserialize(jobConfigString, JobConfig.class);
932932
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
933933
if (jobConfig.getConfigType() == ConfigType.SYNC && jobConfig.getSync() != null) {
934-
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog());
934+
// TODO feature flag this for data types rollout
935+
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog());
936+
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog());
935937
} else if (jobConfig.getConfigType() == ConfigType.RESET_CONNECTION && jobConfig.getResetConnection() != null) {
936-
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog());
938+
// TODO feature flag this for data types rollout
939+
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog());
940+
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog());
937941
}
938942
return jobConfig;
939943
}
@@ -960,9 +964,13 @@ private static JobOutput parseJobOutputFromString(final String jobOutputString)
960964
final JobOutput jobOutput = Jsons.deserialize(jobOutputString, JobOutput.class);
961965
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
962966
if (jobOutput.getOutputType() == OutputType.DISCOVER_CATALOG && jobOutput.getDiscoverCatalog() != null) {
963-
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog());
967+
// TODO feature flag this for data types rollout
968+
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog());
969+
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog());
964970
} else if (jobOutput.getOutputType() == OutputType.SYNC && jobOutput.getSync() != null) {
965-
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog());
971+
// TODO feature flag this for data types rollout
972+
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog());
973+
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog());
966974
}
967975
return jobOutput;
968976
}

airbyte-server/src/main/resources/application.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ airbyte:
8181
root: ${WORKSPACE_ROOT}
8282
protocol:
8383
min-version: ${AIRBYTE_PROTOCOL_VERSION_MIN:0.0.0}
84-
max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:1.0.0}
84+
max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:0.3.0}
8585

8686
temporal:
8787
cloud:

airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -585,19 +585,6 @@ public List<JsonNode> retrieveSourceRecords(final Database database, final Strin
585585
return database.query(context -> context.fetch(String.format("SELECT * FROM %s;", table)))
586586
.stream()
587587
.map(Record::intoMap)
588-
.map(rec -> {
589-
// The protocol requires converting numbers to strings. source-postgres does that internally,
590-
// but we're querying the DB directly, so we have to do it manually.
591-
final Map<String, Object> stringifiedNumbers = new HashMap<>();
592-
for (final String key : rec.keySet()) {
593-
Object o = rec.get(key);
594-
if (o instanceof Number) {
595-
o = o.toString();
596-
}
597-
stringifiedNumbers.put(key, o);
598-
}
599-
return stringifiedNumbers;
600-
})
601588
.map(Jsons::jsonNode)
602589
.collect(Collectors.toList());
603590
}

0 commit comments

Comments
 (0)