Skip to content

Commit 57d556d

Browse files
gosusnpakashkulk
authored andcommitted
Exclude connectors with unsupported protocol version from seed updates (#19328)
* Filter out connectors with unsupported protocol in ApplyDefinitionsHelper * Format * Remove code dupl
1 parent 47315db commit 57d556d

File tree

9 files changed

+164
-36
lines changed

9 files changed

+164
-36
lines changed

airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ public BootloaderApp(final Configs configs,
173173

174174
postLoadExecution = () -> {
175175
try {
176-
final ApplyDefinitionsHelper applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, this.definitionsProvider.get());
176+
final ApplyDefinitionsHelper applyDefinitionsHelper =
177+
new ApplyDefinitionsHelper(configRepository, this.definitionsProvider.get(), jobPersistence);
177178
applyDefinitionsHelper.apply();
178179

179180
if (featureFlags.forceSecretMigration() || !jobPersistence.isSecretMigrated()) {

airbyte-bootloader/src/main/java/io/airbyte/bootloader/ProtocolVersionChecker.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public ProtocolVersionChecker(final JobPersistence jobPersistence,
6161
*/
6262
public Optional<AirbyteProtocolVersionRange> validate(final boolean supportAutoUpgrade) throws IOException {
6363
final Optional<AirbyteVersion> currentAirbyteVersion = getCurrentAirbyteVersion();
64-
final AirbyteProtocolVersionRange currentRange = getCurrentProtocolVersionRange();
64+
final Optional<AirbyteProtocolVersionRange> currentRange = jobPersistence.getCurrentProtocolVersionRange();
6565
final AirbyteProtocolVersionRange targetRange = getTargetProtocolVersionRange();
6666

6767
// Checking if there is a pre-existing version of airbyte.
@@ -73,13 +73,13 @@ public Optional<AirbyteProtocolVersionRange> validate(final boolean supportAutoU
7373
return Optional.of(targetRange);
7474
}
7575

76-
if (currentRange.equals(targetRange)) {
76+
if (currentRange.isEmpty() || currentRange.get().equals(targetRange)) {
7777
log.info("Using AirbyteProtocolVersion range [{}:{}]", targetRange.min().serialize(), targetRange.max().serialize());
7878
return Optional.of(targetRange);
7979
}
8080

8181
log.info("Detected an AirbyteProtocolVersion range change from [{}:{}] to [{}:{}]",
82-
currentRange.min().serialize(), currentRange.max().serialize(),
82+
currentRange.get().min().serialize(), currentRange.get().max().serialize(),
8383
targetRange.min().serialize(), targetRange.max().serialize());
8484

8585
final Map<ActorType, Set<UUID>> conflicts = getConflictingActorDefinitions(targetRange);
@@ -123,22 +123,6 @@ protected Optional<AirbyteVersion> getCurrentAirbyteVersion() throws IOException
123123
return jobPersistence.getVersion().map(AirbyteVersion::new);
124124
}
125125

126-
protected AirbyteProtocolVersionRange getCurrentProtocolVersionRange() throws IOException {
127-
Optional<Version> min = jobPersistence.getAirbyteProtocolVersionMin();
128-
Optional<Version> max = jobPersistence.getAirbyteProtocolVersionMax();
129-
130-
if (min.isPresent() != max.isPresent()) {
131-
// Flagging this because this would be highly suspicious but not bad enough that we should fail
132-
// hard.
133-
// If the new config is fine, the system should self-heal.
134-
log.warn("Inconsistent AirbyteProtocolVersion found, only one of min/max was found. (min:{}, max:{})",
135-
min.map(Version::serialize).orElse(""), max.map(Version::serialize).orElse(""));
136-
}
137-
138-
return new AirbyteProtocolVersionRange(min.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION),
139-
max.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION));
140-
}
141-
142126
protected AirbyteProtocolVersionRange getTargetProtocolVersionRange() {
143127
return new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax());
144128
}

airbyte-bootloader/src/test/java/io/airbyte/bootloader/ProtocolVersionCheckerTest.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,6 @@ void testFirstInstallCheck(final boolean supportAutoUpgrade) throws IOException
6262
assertEquals(Optional.of(new AirbyteProtocolVersionRange(V0_0_0, V1_0_0)), protocolVersionChecker.validate(supportAutoUpgrade));
6363
}
6464

65-
@Test
66-
void testGetCurrentRange() throws IOException {
67-
setCurrentProtocolRangeRange(V0_0_0, V1_0_0);
68-
69-
assertEquals(new AirbyteProtocolVersionRange(V0_0_0, V1_0_0), protocolVersionChecker.getCurrentProtocolVersionRange());
70-
}
71-
7265
@Test
7366
void testGetTargetRange() throws IOException {
7467
setTargetProtocolRangeRange(V1_0_0, V2_0_0);
@@ -317,6 +310,7 @@ void testValidateSucceedsWhenNoProtocolRangeChangeWithoutDefinitionsProvider(fin
317310
}
318311

319312
private void setCurrentProtocolRangeRange(final Version min, final Version max) throws IOException {
313+
when(jobPersistence.getCurrentProtocolVersionRange()).thenReturn(Optional.of(new AirbyteProtocolVersionRange(min, max)));
320314
when(jobPersistence.getAirbyteProtocolVersionMin()).thenReturn(Optional.of(min));
321315
when(jobPersistence.getAirbyteProtocolVersionMax()).thenReturn(Optional.of(max));
322316
}

airbyte-config/init/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ dependencies {
77

88
implementation project(':airbyte-config:config-models')
99
implementation project(':airbyte-config:config-persistence')
10+
implementation project(':airbyte-persistence:job-persistence')
1011
implementation project(':airbyte-protocol:protocol-models')
1112
implementation project(':airbyte-commons-docker')
1213
implementation project(':airbyte-json-validation')

airbyte-config/init/src/main/java/io/airbyte/config/init/ApplyDefinitionsHelper.java

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,43 @@
44

55
package io.airbyte.config.init;
66

7+
import io.airbyte.commons.version.AirbyteProtocolVersion;
8+
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
79
import io.airbyte.config.StandardDestinationDefinition;
810
import io.airbyte.config.StandardSourceDefinition;
911
import io.airbyte.config.persistence.ConfigRepository;
12+
import io.airbyte.persistence.job.JobPersistence;
1013
import io.airbyte.validation.json.JsonValidationException;
1114
import java.io.IOException;
1215
import java.util.List;
16+
import java.util.Optional;
17+
import lombok.extern.slf4j.Slf4j;
1318

1419
/**
1520
* Helper class used to apply actor definitions from a DefinitionsProvider to the database. This is
1621
* here to enable easy reuse of definition application logic in bootloader and cron.
1722
*/
23+
@Slf4j
1824
public class ApplyDefinitionsHelper {
1925

2026
private final ConfigRepository configRepository;
2127
private final DefinitionsProvider definitionsProvider;
28+
private final JobPersistence jobPersistence;
2229

30+
// Remove once cloud has been migrated
31+
@Deprecated(forRemoval = true)
2332
public ApplyDefinitionsHelper(final ConfigRepository configRepository, final DefinitionsProvider definitionsProvider) {
2433
this.configRepository = configRepository;
2534
this.definitionsProvider = definitionsProvider;
35+
this.jobPersistence = null;
36+
}
37+
38+
public ApplyDefinitionsHelper(final ConfigRepository configRepository,
39+
final DefinitionsProvider definitionsProvider,
40+
final JobPersistence jobPersistence) {
41+
this.configRepository = configRepository;
42+
this.definitionsProvider = definitionsProvider;
43+
this.jobPersistence = jobPersistence;
2644
}
2745

2846
public void apply() throws JsonValidationException, IOException {
@@ -35,23 +53,70 @@ public void apply() throws JsonValidationException, IOException {
3553
* @param updateAll - Whether we should overwrite all stored definitions
3654
*/
3755
public void apply(final boolean updateAll) throws JsonValidationException, IOException {
56+
final Optional<AirbyteProtocolVersionRange> currentProtocolRange = getCurrentProtocolRange();
57+
3858
if (updateAll) {
3959
final List<StandardSourceDefinition> latestSourceDefinitions = definitionsProvider.getSourceDefinitions();
40-
for (final StandardSourceDefinition def : latestSourceDefinitions) {
60+
for (final StandardSourceDefinition def : filterStandardSourceDefinitions(currentProtocolRange, latestSourceDefinitions)) {
4161
configRepository.writeStandardSourceDefinition(def);
4262
}
4363

4464
final List<StandardDestinationDefinition> latestDestinationDefinitions = definitionsProvider.getDestinationDefinitions();
45-
for (final StandardDestinationDefinition def : latestDestinationDefinitions) {
65+
for (final StandardDestinationDefinition def : filterStandardDestinationDefinitions(currentProtocolRange, latestDestinationDefinitions)) {
4666
configRepository.writeStandardDestinationDefinition(def);
4767
}
4868
} else {
4969
// todo (pedroslopez): Logic to apply definitions should be moved outside of the
5070
// DatabaseConfigPersistence class and behavior standardized
5171
configRepository.seedActorDefinitions(
52-
definitionsProvider.getSourceDefinitions(),
53-
definitionsProvider.getDestinationDefinitions());
72+
filterStandardSourceDefinitions(currentProtocolRange, definitionsProvider.getSourceDefinitions()),
73+
filterStandardDestinationDefinitions(currentProtocolRange, definitionsProvider.getDestinationDefinitions()));
5474
}
5575
}
5676

77+
private List<StandardDestinationDefinition> filterStandardDestinationDefinitions(final Optional<AirbyteProtocolVersionRange> protocolVersionRange,
78+
final List<StandardDestinationDefinition> destDefs) {
79+
if (protocolVersionRange.isEmpty()) {
80+
return destDefs;
81+
}
82+
83+
return destDefs.stream().filter(def -> {
84+
final boolean isSupported = isProtocolVersionSupported(protocolVersionRange.get(), def.getSpec().getProtocolVersion());
85+
if (!isSupported) {
86+
log.warn("Destination {} {} has an incompatible protocol version ({})... ignoring.",
87+
def.getDestinationDefinitionId(), def.getName(), def.getSpec().getProtocolVersion());
88+
}
89+
return isSupported;
90+
}).toList();
91+
}
92+
93+
private List<StandardSourceDefinition> filterStandardSourceDefinitions(final Optional<AirbyteProtocolVersionRange> protocolVersionRange,
94+
final List<StandardSourceDefinition> sourceDefs) {
95+
if (protocolVersionRange.isEmpty()) {
96+
return sourceDefs;
97+
}
98+
99+
return sourceDefs.stream().filter(def -> {
100+
final boolean isSupported = isProtocolVersionSupported(protocolVersionRange.get(), def.getSpec().getProtocolVersion());
101+
if (!isSupported) {
102+
log.warn("Source {} {} has an incompatible protocol version ({})... ignoring.",
103+
def.getSourceDefinitionId(), def.getName(), def.getSpec().getProtocolVersion());
104+
}
105+
return isSupported;
106+
}).toList();
107+
}
108+
109+
private boolean isProtocolVersionSupported(final AirbyteProtocolVersionRange protocolVersionRange, final String protocolVersion) {
110+
return protocolVersionRange.isSupported(AirbyteProtocolVersion.getWithDefault(protocolVersion));
111+
}
112+
113+
private Optional<AirbyteProtocolVersionRange> getCurrentProtocolRange() throws IOException {
114+
if (jobPersistence == null) {
115+
// TODO Remove this once cloud has been migrated and job persistence is always defined
116+
return Optional.empty();
117+
}
118+
119+
return jobPersistence.getCurrentProtocolVersionRange();
120+
}
121+
57122
}

airbyte-config/init/src/test/java/io/airbyte/config/init/ApplyDefinitionsHelperTest.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,23 @@
99
import static org.mockito.Mockito.verifyNoMoreInteractions;
1010
import static org.mockito.Mockito.when;
1111

12+
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
13+
import io.airbyte.commons.version.Version;
1214
import io.airbyte.config.StandardDestinationDefinition;
1315
import io.airbyte.config.StandardSourceDefinition;
1416
import io.airbyte.config.persistence.ConfigRepository;
17+
import io.airbyte.persistence.job.JobPersistence;
18+
import io.airbyte.protocol.models.ConnectorSpecification;
1519
import io.airbyte.validation.json.JsonValidationException;
1620
import java.io.IOException;
1721
import java.util.Collections;
1822
import java.util.List;
23+
import java.util.Optional;
1924
import java.util.UUID;
2025
import org.junit.jupiter.api.BeforeEach;
2126
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.params.ParameterizedTest;
28+
import org.junit.jupiter.params.provider.ValueSource;
2229

2330
class ApplyDefinitionsHelperTest {
2431

@@ -29,41 +36,51 @@ class ApplyDefinitionsHelperTest {
2936
private static final String DOCUMENTATION_URL = "https://wwww.example.com";
3037
private static final String DOCKER_REPOSITORY = "airbyte/connector";
3138
private static final String DOCKER_TAG = "0.1.0";
39+
private static final String PROTOCOL_VERSION_1 = "1.0.0";
40+
private static final String PROTOCOL_VERSION_2 = "2.0.0";
3241
public static final StandardSourceDefinition SOURCE_DEF1 = new StandardSourceDefinition()
3342
.withSourceDefinitionId(SOURCE_DEF_ID1)
3443
.withDockerRepository(DOCKER_REPOSITORY)
3544
.withDockerImageTag(DOCKER_TAG)
3645
.withName(CONNECT_NAME1)
37-
.withDocumentationUrl(DOCUMENTATION_URL);
46+
.withDocumentationUrl(DOCUMENTATION_URL)
47+
.withSpec(new ConnectorSpecification().withProtocolVersion(PROTOCOL_VERSION_1));
3848
public static final StandardSourceDefinition SOURCE_DEF2 = new StandardSourceDefinition()
3949
.withSourceDefinitionId(SOURCE_DEF_ID1)
4050
.withDockerRepository(DOCKER_REPOSITORY)
4151
.withDockerImageTag(DOCKER_TAG)
4252
.withName(CONNECT_NAME2)
43-
.withDocumentationUrl(DOCUMENTATION_URL);
53+
.withDocumentationUrl(DOCUMENTATION_URL)
54+
.withSpec(new ConnectorSpecification().withProtocolVersion(PROTOCOL_VERSION_2));
55+
4456
public static final StandardDestinationDefinition DEST_DEF1 = new StandardDestinationDefinition()
4557
.withDestinationDefinitionId(DEST_DEF_ID2)
4658
.withDockerRepository(DOCKER_REPOSITORY)
4759
.withDockerImageTag(DOCKER_TAG)
4860
.withName(CONNECT_NAME1)
49-
.withDocumentationUrl(DOCUMENTATION_URL);
61+
.withDocumentationUrl(DOCUMENTATION_URL)
62+
.withSpec(new ConnectorSpecification().withProtocolVersion(PROTOCOL_VERSION_2));
63+
5064
public static final StandardDestinationDefinition DEST_DEF2 = new StandardDestinationDefinition()
5165
.withDestinationDefinitionId(DEST_DEF_ID2)
5266
.withDockerRepository(DOCKER_REPOSITORY)
5367
.withDockerImageTag(DOCKER_TAG)
5468
.withName(CONNECT_NAME2)
55-
.withDocumentationUrl(DOCUMENTATION_URL);
69+
.withDocumentationUrl(DOCUMENTATION_URL)
70+
.withSpec(new ConnectorSpecification().withProtocolVersion(PROTOCOL_VERSION_1));
5671

5772
private ConfigRepository configRepository;
5873
private DefinitionsProvider definitionsProvider;
74+
private JobPersistence jobPersistence;
5975
private ApplyDefinitionsHelper applyDefinitionsHelper;
6076

6177
@BeforeEach
6278
void setup() throws JsonValidationException, IOException {
6379
configRepository = mock(ConfigRepository.class);
6480
definitionsProvider = mock(DefinitionsProvider.class);
81+
jobPersistence = mock(JobPersistence.class);
6582

66-
applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, definitionsProvider);
83+
applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, definitionsProvider, jobPersistence);
6784

6885
// default calls to empty.
6986
when(configRepository.listStandardDestinationDefinitions(true)).thenReturn(Collections.emptyList());
@@ -132,4 +149,24 @@ void testApplyOSS() throws JsonValidationException, IOException {
132149
verifyNoMoreInteractions(definitionsProvider);
133150
}
134151

152+
@ParameterizedTest
153+
@ValueSource(booleans = {false, true})
154+
void testDefinitionsFiltering(final boolean updateAll) throws JsonValidationException, IOException {
155+
when(jobPersistence.getCurrentProtocolVersionRange())
156+
.thenReturn(Optional.of(new AirbyteProtocolVersionRange(new Version("2.0.0"), new Version("3.0.0"))));
157+
158+
when(definitionsProvider.getSourceDefinitions()).thenReturn(List.of(SOURCE_DEF1, SOURCE_DEF2));
159+
when(definitionsProvider.getDestinationDefinitions()).thenReturn(List.of(DEST_DEF1, DEST_DEF2));
160+
161+
applyDefinitionsHelper.apply(updateAll);
162+
163+
if (updateAll) {
164+
verify(configRepository).writeStandardSourceDefinition(SOURCE_DEF2);
165+
verify(configRepository).writeStandardDestinationDefinition(DEST_DEF1);
166+
verifyNoMoreInteractions(configRepository);
167+
} else {
168+
verify(configRepository).seedActorDefinitions(List.of(SOURCE_DEF2), List.of(DEST_DEF1));
169+
}
170+
}
171+
135172
}

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.airbyte.commons.text.Names;
2525
import io.airbyte.commons.text.Sqls;
2626
import io.airbyte.commons.version.AirbyteProtocolVersion;
27+
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
2728
import io.airbyte.commons.version.AirbyteVersion;
2829
import io.airbyte.commons.version.Version;
2930
import io.airbyte.config.AttemptFailureSummary;
@@ -838,6 +839,27 @@ public void setAirbyteProtocolVersionMin(final Version version) throws IOExcepti
838839
setMetadata(AirbyteProtocolVersion.AIRBYTE_PROTOCOL_VERSION_MIN_KEY_NAME, version.serialize());
839840
}
840841

842+
@Override
843+
public Optional<AirbyteProtocolVersionRange> getCurrentProtocolVersionRange() throws IOException {
844+
final Optional<Version> min = getAirbyteProtocolVersionMin();
845+
final Optional<Version> max = getAirbyteProtocolVersionMax();
846+
847+
if (min.isPresent() != max.isPresent()) {
848+
// Flagging this because this would be highly suspicious but not bad enough that we should fail
849+
// hard.
850+
// If the new config is fine, the system should self-heal.
851+
LOGGER.warn("Inconsistent AirbyteProtocolVersion found, only one of min/max was found. (min:{}, max:{})",
852+
min.map(Version::serialize).orElse(""), max.map(Version::serialize).orElse(""));
853+
}
854+
855+
if (min.isEmpty() && max.isEmpty()) {
856+
return Optional.empty();
857+
}
858+
859+
return Optional.of(new AirbyteProtocolVersionRange(min.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION),
860+
max.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION)));
861+
}
862+
841863
private Stream<String> getMetadata(final String keyName) throws IOException {
842864
return jobDatabase.query(ctx -> ctx.select()
843865
.from(AIRBYTE_METADATA_TABLE)

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.persistence.job;
66

77
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
89
import io.airbyte.commons.version.Version;
910
import io.airbyte.config.AttemptFailureSummary;
1011
import io.airbyte.config.JobConfig;
@@ -262,6 +263,11 @@ List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID con
262263
*/
263264
void setAirbyteProtocolVersionMin(Version version) throws IOException;
264265

266+
/**
267+
* Get the current Airbyte Protocol Version range if defined
268+
*/
269+
Optional<AirbyteProtocolVersionRange> getCurrentProtocolVersionRange() throws IOException;
270+
265271
/**
266272
* Returns a deployment UUID.
267273
*/

0 commit comments

Comments
 (0)