Skip to content

Commit f267746

Browse files
authored
Use MessageMigration for Source Connection Check. (#17656)
* More AirbyteVersion references fix * Propagate protocol version from sourceDef to SchedulerClient * Propagate protocol version to LauncherConfig * Add VersionedMigratorFactory * Update VersionedAirbyteStreamFactory * Fix Version Json serialization/deserialization * Plug message migration in CheckConnection for Sources
1 parent d9d2261 commit f267746

File tree

22 files changed

+222
-52
lines changed

22 files changed

+222
-52
lines changed

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageSerDeProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import com.google.common.annotations.VisibleForTesting;
88
import io.airbyte.commons.protocol.serde.AirbyteMessageDeserializer;
99
import io.airbyte.commons.protocol.serde.AirbyteMessageSerializer;
10-
import io.airbyte.commons.version.AirbyteVersion;
10+
import io.airbyte.commons.version.Version;
1111
import jakarta.annotation.PostConstruct;
1212
import jakarta.inject.Singleton;
1313
import java.util.Collections;
@@ -51,14 +51,14 @@ public void initialize() {
5151
/**
5252
* Returns the Deserializer for the version if known else empty
5353
*/
54-
public Optional<AirbyteMessageDeserializer<?>> getDeserializer(final AirbyteVersion version) {
54+
public Optional<AirbyteMessageDeserializer<?>> getDeserializer(final Version version) {
5555
return Optional.ofNullable(deserializers.get(version.getMajorVersion()));
5656
}
5757

5858
/**
5959
* Returns the Serializer for the version if known else empty
6060
*/
61-
public Optional<AirbyteMessageSerializer<?>> getSerializer(final AirbyteVersion version) {
61+
public Optional<AirbyteMessageSerializer<?>> getSerializer(final Version version) {
6262
return Optional.ofNullable(serializers.get(version.getMajorVersion()));
6363
}
6464

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.protocol;
6+
7+
import io.airbyte.commons.version.Version;
8+
import jakarta.inject.Singleton;
9+
10+
/**
11+
* Factory to build AirbyteMessageVersionedMigrator
12+
*/
13+
@Singleton
14+
public class AirbyteMessageVersionedMigratorFactory {
15+
16+
private final AirbyteMessageMigrator migrator;
17+
18+
public AirbyteMessageVersionedMigratorFactory(final AirbyteMessageMigrator migrator) {
19+
this.migrator = migrator;
20+
}
21+
22+
public <T> AirbyteMessageVersionedMigrator<T> getVersionedMigrator(final Version version) {
23+
return new AirbyteMessageVersionedMigrator<>(this.migrator, version);
24+
}
25+
26+
}

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigrationV0.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
import io.airbyte.commons.json.Jsons;
88
import io.airbyte.commons.version.Version;
99
import io.airbyte.protocol.models.AirbyteMessage;
10+
import jakarta.inject.Singleton;
1011

1112
/**
1213
* Demo migration to illustrate the template. This should be deleted once we added the v0 to v1
1314
* migration.
1415
*/
15-
// NOTE, to actually wire this migration, uncomment the annotation
16-
// @Singleton
16+
@Singleton
1717
public class AirbyteMessageMigrationV0
1818
implements AirbyteMessageMigration<AirbyteMessage, io.airbyte.protocol.models.v0.AirbyteMessage> {
1919

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/serde/AirbyteMessageDeserializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
package io.airbyte.commons.protocol.serde;
66

77
import com.fasterxml.jackson.databind.JsonNode;
8-
import io.airbyte.commons.version.AirbyteVersion;
8+
import io.airbyte.commons.version.Version;
99

1010
public interface AirbyteMessageDeserializer<T> {
1111

1212
T deserialize(final JsonNode json);
1313

14-
AirbyteVersion getTargetVersion();
14+
Version getTargetVersion();
1515

1616
}

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/serde/AirbyteMessageSerializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44

55
package io.airbyte.commons.protocol.serde;
66

7-
import io.airbyte.commons.version.AirbyteVersion;
7+
import io.airbyte.commons.version.Version;
88

99
public interface AirbyteMessageSerializer<T> {
1010

1111
String serialize(final T message);
1212

13-
AirbyteVersion getTargetVersion();
13+
Version getTargetVersion();
1414

1515
}

airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageMigratorMicronautTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
1010
import jakarta.inject.Inject;
1111
import java.util.HashSet;
12+
import java.util.List;
1213
import org.junit.jupiter.api.Test;
1314

1415
@MicronautTest
@@ -21,7 +22,7 @@ class AirbyteMessageMigratorMicronautTest {
2122
void testMigrationInjection() {
2223
// This should contain the list of all the supported majors of the airbyte protocol except the most
2324
// recent one since the migrations themselves are keyed on the lower version.
24-
assertEquals(new HashSet<>(), messageMigrator.getMigrationKeys());
25+
assertEquals(new HashSet<>(List.of("0")), messageMigrator.getMigrationKeys());
2526
}
2627

2728
}

airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/AirbyteMessageSerDeProviderTest.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
import io.airbyte.commons.protocol.serde.AirbyteMessageDeserializer;
1313
import io.airbyte.commons.protocol.serde.AirbyteMessageSerializer;
14-
import io.airbyte.commons.version.AirbyteVersion;
14+
import io.airbyte.commons.version.Version;
1515
import java.util.Optional;
1616
import org.junit.jupiter.api.BeforeEach;
1717
import org.junit.jupiter.api.Test;
@@ -29,55 +29,55 @@ class AirbyteMessageSerDeProviderTest {
2929
void beforeEach() {
3030
serDeProvider = new AirbyteMessageSerDeProvider();
3131

32-
deserV0 = buildDeserializer(new AirbyteVersion("0.1.0"));
33-
deserV1 = buildDeserializer(new AirbyteVersion("1.1.0"));
32+
deserV0 = buildDeserializer(new Version("0.1.0"));
33+
deserV1 = buildDeserializer(new Version("1.1.0"));
3434
serDeProvider.registerDeserializer(deserV0);
3535
serDeProvider.registerDeserializer(deserV1);
3636

37-
serV0 = buildSerializer(new AirbyteVersion("0.2.0"));
38-
serV1 = buildSerializer(new AirbyteVersion("1.0.0"));
37+
serV0 = buildSerializer(new Version("0.2.0"));
38+
serV1 = buildSerializer(new Version("1.0.0"));
3939
serDeProvider.registerSerializer(serV0);
4040
serDeProvider.registerSerializer(serV1);
4141
}
4242

4343
@Test
4444
void testGetDeserializer() {
45-
assertEquals(Optional.of(deserV0), serDeProvider.getDeserializer(new AirbyteVersion("0.1.0")));
46-
assertEquals(Optional.of(deserV0), serDeProvider.getDeserializer(new AirbyteVersion("0.2.0")));
47-
assertEquals(Optional.of(deserV1), serDeProvider.getDeserializer(new AirbyteVersion("1.1.0")));
48-
assertEquals(Optional.empty(), serDeProvider.getDeserializer(new AirbyteVersion("2.0.0")));
45+
assertEquals(Optional.of(deserV0), serDeProvider.getDeserializer(new Version("0.1.0")));
46+
assertEquals(Optional.of(deserV0), serDeProvider.getDeserializer(new Version("0.2.0")));
47+
assertEquals(Optional.of(deserV1), serDeProvider.getDeserializer(new Version("1.1.0")));
48+
assertEquals(Optional.empty(), serDeProvider.getDeserializer(new Version("2.0.0")));
4949
}
5050

5151
@Test
5252
void testGetSerializer() {
53-
assertEquals(Optional.of(serV0), serDeProvider.getSerializer(new AirbyteVersion("0.1.0")));
54-
assertEquals(Optional.of(serV1), serDeProvider.getSerializer(new AirbyteVersion("1.0.0")));
55-
assertEquals(Optional.empty(), serDeProvider.getSerializer(new AirbyteVersion("3.2.0")));
53+
assertEquals(Optional.of(serV0), serDeProvider.getSerializer(new Version("0.1.0")));
54+
assertEquals(Optional.of(serV1), serDeProvider.getSerializer(new Version("1.0.0")));
55+
assertEquals(Optional.empty(), serDeProvider.getSerializer(new Version("3.2.0")));
5656
}
5757

5858
@Test
5959
void testRegisterDeserializerShouldFailOnVersionCollision() {
60-
AirbyteMessageDeserializer<?> deser = buildDeserializer(new AirbyteVersion("0.2.0"));
60+
AirbyteMessageDeserializer<?> deser = buildDeserializer(new Version("0.2.0"));
6161
assertThrows(RuntimeException.class, () -> {
6262
serDeProvider.registerDeserializer(deser);
6363
});
6464
}
6565

6666
@Test
6767
void testRegisterSerializerShouldFailOnVersionCollision() {
68-
AirbyteMessageSerializer<?> ser = buildSerializer(new AirbyteVersion("0.5.0"));
68+
AirbyteMessageSerializer<?> ser = buildSerializer(new Version("0.5.0"));
6969
assertThrows(RuntimeException.class, () -> {
7070
serDeProvider.registerSerializer(ser);
7171
});
7272
}
7373

74-
private <T> AirbyteMessageDeserializer<T> buildDeserializer(AirbyteVersion version) {
74+
private <T> AirbyteMessageDeserializer<T> buildDeserializer(Version version) {
7575
final AirbyteMessageDeserializer<T> deser = mock(AirbyteMessageDeserializer.class);
7676
when(deser.getTargetVersion()).thenReturn(version);
7777
return deser;
7878
}
7979

80-
private <T> AirbyteMessageSerializer<T> buildSerializer(AirbyteVersion version) {
80+
private <T> AirbyteMessageSerializer<T> buildSerializer(Version version) {
8181
final AirbyteMessageSerializer<T> ser = mock(AirbyteMessageSerializer.class);
8282
when(ser.getTargetVersion()).thenReturn(version);
8383
return ser;

airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class DefaultAirbyteStreamFactory implements AirbyteStreamFactory {
3232

3333
private final MdcScope.Builder containerLogMdcBuilder;
3434
private final AirbyteProtocolPredicate protocolValidator;
35-
private final Logger logger;
35+
protected final Logger logger;
3636

3737
public DefaultAirbyteStreamFactory() {
3838
this(MdcScope.DEFAULT_BUILDER);

airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@
77
import com.fasterxml.jackson.databind.JsonNode;
88
import io.airbyte.commons.json.Jsons;
99
import io.airbyte.commons.logging.MdcScope;
10+
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider;
1011
import io.airbyte.commons.protocol.AirbyteMessageVersionedMigrator;
12+
import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory;
1113
import io.airbyte.commons.protocol.serde.AirbyteMessageDeserializer;
14+
import io.airbyte.commons.version.Version;
1215
import io.airbyte.protocol.models.AirbyteMessage;
1316
import java.util.stream.Stream;
1417
import org.slf4j.Logger;
@@ -26,19 +29,23 @@ public class VersionedAirbyteStreamFactory<T> extends DefaultAirbyteStreamFactor
2629

2730
private final AirbyteMessageDeserializer<T> deserializer;
2831
private final AirbyteMessageVersionedMigrator<T> migrator;
32+
private final Version protocolVersion;
2933

30-
public VersionedAirbyteStreamFactory(final AirbyteMessageDeserializer<T> deserializer,
31-
final AirbyteMessageVersionedMigrator<T> migrator) {
32-
this(deserializer, migrator, MdcScope.DEFAULT_BUILDER);
34+
public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProvider,
35+
final AirbyteMessageVersionedMigratorFactory migratorFactory,
36+
final Version protocolVersion) {
37+
this(serDeProvider, migratorFactory, protocolVersion, MdcScope.DEFAULT_BUILDER);
3338
}
3439

35-
public VersionedAirbyteStreamFactory(final AirbyteMessageDeserializer<T> deserializer,
36-
final AirbyteMessageVersionedMigrator<T> migrator,
40+
public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProvider,
41+
final AirbyteMessageVersionedMigratorFactory migratorFactory,
42+
final Version protocolVersion,
3743
final MdcScope.Builder containerLogMdcBuilder) {
3844
// TODO AirbyteProtocolPredicate needs to be updated to be protocol version aware
3945
super(new AirbyteProtocolPredicate(), LOGGER, containerLogMdcBuilder);
40-
this.deserializer = deserializer;
41-
this.migrator = migrator;
46+
this.deserializer = (AirbyteMessageDeserializer<T>) serDeProvider.getDeserializer(protocolVersion).orElseThrow();
47+
this.migrator = migratorFactory.getVersionedMigrator(protocolVersion);
48+
this.protocolVersion = protocolVersion;
4249
}
4350

4451
@Override
@@ -47,7 +54,7 @@ protected Stream<AirbyteMessage> toAirbyteMessage(final JsonNode json) {
4754
final io.airbyte.protocol.models.v0.AirbyteMessage message = migrator.upgrade(deserializer.deserialize(json));
4855
return Stream.of(convert(message));
4956
} catch (RuntimeException e) {
50-
LOGGER.warn("Failed to upgrade a message from version {}: {}", migrator.getVersion(), Jsons.serialize(json));
57+
logger.warn("Failed to upgrade a message from version {}: {}", protocolVersion, Jsons.serialize(json), e);
5158
return Stream.empty();
5259
}
5360
}

airbyte-commons-worker/src/main/resources/workers_models/IntegrationLauncherConfig.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,6 @@ properties:
1616
type: integer
1717
dockerImage:
1818
type: string
19+
protocolVersion:
20+
type: object
21+
existingJavaType: io.airbyte.commons.version.Version

airbyte-commons/src/main/java/io/airbyte/commons/version/Version.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@
44

55
package io.airbyte.commons.version;
66

7+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
8+
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
79
import com.google.common.base.Preconditions;
810
import java.util.Objects;
911

1012
/**
1113
* A semVer Version class that allows "dev" as a version.
1214
*/
1315
@SuppressWarnings({"PMD.AvoidFieldNameMatchingTypeName", "PMD.ConstructorCallsOverridableMethod"})
16+
@JsonDeserialize(using = VersionDeserializer.class)
17+
@JsonSerialize(using = VersionSerializer.class)
1418
public class Version {
1519

1620
public static final String DEV_VERSION_PREFIX = "dev";
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.version;
6+
7+
import com.fasterxml.jackson.core.JacksonException;
8+
import com.fasterxml.jackson.core.JsonParser;
9+
import com.fasterxml.jackson.databind.DeserializationContext;
10+
import com.fasterxml.jackson.databind.JsonNode;
11+
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
12+
import java.io.IOException;
13+
14+
public class VersionDeserializer extends StdDeserializer<Version> {
15+
16+
public VersionDeserializer() {
17+
this(null);
18+
}
19+
20+
public VersionDeserializer(Class<?> vc) {
21+
super(vc);
22+
}
23+
24+
@Override
25+
public Version deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JacksonException {
26+
final JsonNode node = p.getCodec().readTree(p);
27+
final String v = node.get("version").asText();
28+
return new Version(v);
29+
}
30+
31+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.version;
6+
7+
import com.fasterxml.jackson.core.JsonGenerator;
8+
import com.fasterxml.jackson.databind.SerializerProvider;
9+
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
10+
import java.io.IOException;
11+
12+
public class VersionSerializer extends StdSerializer<Version> {
13+
14+
public VersionSerializer() {
15+
this(null);
16+
}
17+
18+
public VersionSerializer(Class<Version> t) {
19+
super(t);
20+
}
21+
22+
@Override
23+
public void serialize(Version value, JsonGenerator gen, SerializerProvider provider) throws IOException {
24+
gen.writeStartObject();
25+
gen.writeStringField("version", value.version);
26+
gen.writeEndObject();
27+
}
28+
29+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.version;
6+
7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
9+
import io.airbyte.commons.json.Jsons;
10+
import org.junit.jupiter.api.Test;
11+
12+
class VersionTest {
13+
14+
@Test
15+
void testJsonSerializationDeserialization() {
16+
final String jsonString = """
17+
{"version": "1.2.3"}
18+
""";
19+
final Version expectedVersion = new Version("1.2.3");
20+
21+
final Version deserializedVersion = Jsons.deserialize(jsonString, Version.class);
22+
assertEquals(expectedVersion, deserializedVersion);
23+
24+
final Version deserializedVersionLoop = Jsons.deserialize(Jsons.serialize(deserializedVersion), Version.class);
25+
assertEquals(expectedVersion, deserializedVersionLoop);
26+
}
27+
28+
}

airbyte-config/config-models/src/main/resources/types/JobCheckConnectionConfig.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,6 @@ properties:
1515
existingJavaType: com.fasterxml.jackson.databind.JsonNode
1616
dockerImage:
1717
type: string
18+
protocolVersion:
19+
type: object
20+
existingJavaType: io.airbyte.commons.version.Version

0 commit comments

Comments
 (0)