Skip to content

Exclude connectors with unsupported protocol version from seed updates #19328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ public BootloaderApp(final Configs configs,

postLoadExecution = () -> {
try {
final ApplyDefinitionsHelper applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, this.definitionsProvider.get());
final ApplyDefinitionsHelper applyDefinitionsHelper =
new ApplyDefinitionsHelper(configRepository, this.definitionsProvider.get(), jobPersistence);
applyDefinitionsHelper.apply();

if (featureFlags.forceSecretMigration() || !jobPersistence.isSecretMigrated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ProtocolVersionChecker(final JobPersistence jobPersistence,
*/
public Optional<AirbyteProtocolVersionRange> validate(final boolean supportAutoUpgrade) throws IOException {
final Optional<AirbyteVersion> currentAirbyteVersion = getCurrentAirbyteVersion();
final AirbyteProtocolVersionRange currentRange = getCurrentProtocolVersionRange();
final Optional<AirbyteProtocolVersionRange> currentRange = jobPersistence.getCurrentProtocolVersionRange();
final AirbyteProtocolVersionRange targetRange = getTargetProtocolVersionRange();

// Checking if there is a pre-existing version of airbyte.
Expand All @@ -73,13 +73,13 @@ public Optional<AirbyteProtocolVersionRange> validate(final boolean supportAutoU
return Optional.of(targetRange);
}

if (currentRange.equals(targetRange)) {
if (currentRange.isEmpty() || currentRange.get().equals(targetRange)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; if (currentRangs.orElse(targetRange).equals(targetRange))

log.info("Using AirbyteProtocolVersion range [{}:{}]", targetRange.min().serialize(), targetRange.max().serialize());
return Optional.of(targetRange);
}

log.info("Detected an AirbyteProtocolVersion range change from [{}:{}] to [{}:{}]",
currentRange.min().serialize(), currentRange.max().serialize(),
currentRange.get().min().serialize(), currentRange.get().max().serialize(),
targetRange.min().serialize(), targetRange.max().serialize());

final Map<ActorType, Set<UUID>> conflicts = getConflictingActorDefinitions(targetRange);
Expand Down Expand Up @@ -123,22 +123,6 @@ protected Optional<AirbyteVersion> getCurrentAirbyteVersion() throws IOException
return jobPersistence.getVersion().map(AirbyteVersion::new);
}

protected AirbyteProtocolVersionRange getCurrentProtocolVersionRange() throws IOException {
Optional<Version> min = jobPersistence.getAirbyteProtocolVersionMin();
Optional<Version> max = jobPersistence.getAirbyteProtocolVersionMax();

if (min.isPresent() != max.isPresent()) {
// Flagging this because this would be highly suspicious but not bad enough that we should fail
// hard.
// If the new config is fine, the system should self-heal.
log.warn("Inconsistent AirbyteProtocolVersion found, only one of min/max was found. (min:{}, max:{})",
min.map(Version::serialize).orElse(""), max.map(Version::serialize).orElse(""));
}

return new AirbyteProtocolVersionRange(min.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION),
max.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION));
}

protected AirbyteProtocolVersionRange getTargetProtocolVersionRange() {
return new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,6 @@ void testFirstInstallCheck(final boolean supportAutoUpgrade) throws IOException
assertEquals(Optional.of(new AirbyteProtocolVersionRange(V0_0_0, V1_0_0)), protocolVersionChecker.validate(supportAutoUpgrade));
}

@Test
void testGetCurrentRange() throws IOException {
setCurrentProtocolRangeRange(V0_0_0, V1_0_0);

assertEquals(new AirbyteProtocolVersionRange(V0_0_0, V1_0_0), protocolVersionChecker.getCurrentProtocolVersionRange());
}

@Test
void testGetTargetRange() throws IOException {
setTargetProtocolRangeRange(V1_0_0, V2_0_0);
Expand Down Expand Up @@ -317,6 +310,7 @@ void testValidateSucceedsWhenNoProtocolRangeChangeWithoutDefinitionsProvider(fin
}

private void setCurrentProtocolRangeRange(final Version min, final Version max) throws IOException {
when(jobPersistence.getCurrentProtocolVersionRange()).thenReturn(Optional.of(new AirbyteProtocolVersionRange(min, max)));
when(jobPersistence.getAirbyteProtocolVersionMin()).thenReturn(Optional.of(min));
when(jobPersistence.getAirbyteProtocolVersionMax()).thenReturn(Optional.of(max));
}
Expand Down
1 change: 1 addition & 0 deletions airbyte-config/init/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {

implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
implementation project(':airbyte-persistence:job-persistence')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-json-validation')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,43 @@

package io.airbyte.config.init;

import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;

/**
* Helper class used to apply actor definitions from a DefinitionsProvider to the database. This is
* here to enable easy reuse of definition application logic in bootloader and cron.
*/
@Slf4j
public class ApplyDefinitionsHelper {

private final ConfigRepository configRepository;
private final DefinitionsProvider definitionsProvider;
private final JobPersistence jobPersistence;

// Remove once cloud has been migrated
@Deprecated(forRemoval = true)
public ApplyDefinitionsHelper(final ConfigRepository configRepository, final DefinitionsProvider definitionsProvider) {
this.configRepository = configRepository;
this.definitionsProvider = definitionsProvider;
this.jobPersistence = null;
}

public ApplyDefinitionsHelper(final ConfigRepository configRepository,
final DefinitionsProvider definitionsProvider,
final JobPersistence jobPersistence) {
this.configRepository = configRepository;
this.definitionsProvider = definitionsProvider;
this.jobPersistence = jobPersistence;
}

public void apply() throws JsonValidationException, IOException {
Expand All @@ -35,23 +53,70 @@ public void apply() throws JsonValidationException, IOException {
* @param updateAll - Whether we should overwrite all stored definitions
*/
public void apply(final boolean updateAll) throws JsonValidationException, IOException {
final Optional<AirbyteProtocolVersionRange> currentProtocolRange = getCurrentProtocolRange();

if (updateAll) {
final List<StandardSourceDefinition> latestSourceDefinitions = definitionsProvider.getSourceDefinitions();
for (final StandardSourceDefinition def : latestSourceDefinitions) {
for (final StandardSourceDefinition def : filterStandardSourceDefinitions(currentProtocolRange, latestSourceDefinitions)) {
configRepository.writeStandardSourceDefinition(def);
}

final List<StandardDestinationDefinition> latestDestinationDefinitions = definitionsProvider.getDestinationDefinitions();
for (final StandardDestinationDefinition def : latestDestinationDefinitions) {
for (final StandardDestinationDefinition def : filterStandardDestinationDefinitions(currentProtocolRange, latestDestinationDefinitions)) {
configRepository.writeStandardDestinationDefinition(def);
}
} else {
// todo (pedroslopez): Logic to apply definitions should be moved outside of the
// DatabaseConfigPersistence class and behavior standardized
configRepository.seedActorDefinitions(
definitionsProvider.getSourceDefinitions(),
definitionsProvider.getDestinationDefinitions());
filterStandardSourceDefinitions(currentProtocolRange, definitionsProvider.getSourceDefinitions()),
filterStandardDestinationDefinitions(currentProtocolRange, definitionsProvider.getDestinationDefinitions()));
}
}

private List<StandardDestinationDefinition> filterStandardDestinationDefinitions(final Optional<AirbyteProtocolVersionRange> protocolVersionRange,
final List<StandardDestinationDefinition> destDefs) {
if (protocolVersionRange.isEmpty()) {
return destDefs;
}

return destDefs.stream().filter(def -> {
final boolean isSupported = isProtocolVersionSupported(protocolVersionRange.get(), def.getSpec().getProtocolVersion());
if (!isSupported) {
log.warn("Destination {} {} has an incompatible protocol version ({})... ignoring.",
def.getDestinationDefinitionId(), def.getName(), def.getSpec().getProtocolVersion());
}
return isSupported;
}).toList();
}

private List<StandardSourceDefinition> filterStandardSourceDefinitions(final Optional<AirbyteProtocolVersionRange> protocolVersionRange,
final List<StandardSourceDefinition> sourceDefs) {
if (protocolVersionRange.isEmpty()) {
return sourceDefs;
}

return sourceDefs.stream().filter(def -> {
final boolean isSupported = isProtocolVersionSupported(protocolVersionRange.get(), def.getSpec().getProtocolVersion());
if (!isSupported) {
log.warn("Source {} {} has an incompatible protocol version ({})... ignoring.",
def.getSourceDefinitionId(), def.getName(), def.getSpec().getProtocolVersion());
}
return isSupported;
}).toList();
}

private boolean isProtocolVersionSupported(final AirbyteProtocolVersionRange protocolVersionRange, final String protocolVersion) {
return protocolVersionRange.isSupported(AirbyteProtocolVersion.getWithDefault(protocolVersion));
}

private Optional<AirbyteProtocolVersionRange> getCurrentProtocolRange() throws IOException {
if (jobPersistence == null) {
// TODO Remove this once cloud has been migrated and job persistence is always defined
return Optional.empty();
}

return jobPersistence.getCurrentProtocolVersionRange();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.Version;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class ApplyDefinitionsHelperTest {

Expand All @@ -29,41 +36,51 @@ class ApplyDefinitionsHelperTest {
private static final String DOCUMENTATION_URL = "https://wwww.example.com";
private static final String DOCKER_REPOSITORY = "airbyte/connector";
private static final String DOCKER_TAG = "0.1.0";
private static final String PROTOCOL_VERSION_1 = "1.0.0";
private static final String PROTOCOL_VERSION_2 = "2.0.0";
public static final StandardSourceDefinition SOURCE_DEF1 = new StandardSourceDefinition()
.withSourceDefinitionId(SOURCE_DEF_ID1)
.withDockerRepository(DOCKER_REPOSITORY)
.withDockerImageTag(DOCKER_TAG)
.withName(CONNECT_NAME1)
.withDocumentationUrl(DOCUMENTATION_URL);
.withDocumentationUrl(DOCUMENTATION_URL)
.withSpec(new ConnectorSpecification().withProtocolVersion(PROTOCOL_VERSION_1));
public static final StandardSourceDefinition SOURCE_DEF2 = new StandardSourceDefinition()
.withSourceDefinitionId(SOURCE_DEF_ID1)
.withDockerRepository(DOCKER_REPOSITORY)
.withDockerImageTag(DOCKER_TAG)
.withName(CONNECT_NAME2)
.withDocumentationUrl(DOCUMENTATION_URL);
.withDocumentationUrl(DOCUMENTATION_URL)
.withSpec(new ConnectorSpecification().withProtocolVersion(PROTOCOL_VERSION_2));

public static final StandardDestinationDefinition DEST_DEF1 = new StandardDestinationDefinition()
.withDestinationDefinitionId(DEST_DEF_ID2)
.withDockerRepository(DOCKER_REPOSITORY)
.withDockerImageTag(DOCKER_TAG)
.withName(CONNECT_NAME1)
.withDocumentationUrl(DOCUMENTATION_URL);
.withDocumentationUrl(DOCUMENTATION_URL)
.withSpec(new ConnectorSpecification().withProtocolVersion(PROTOCOL_VERSION_2));

public static final StandardDestinationDefinition DEST_DEF2 = new StandardDestinationDefinition()
.withDestinationDefinitionId(DEST_DEF_ID2)
.withDockerRepository(DOCKER_REPOSITORY)
.withDockerImageTag(DOCKER_TAG)
.withName(CONNECT_NAME2)
.withDocumentationUrl(DOCUMENTATION_URL);
.withDocumentationUrl(DOCUMENTATION_URL)
.withSpec(new ConnectorSpecification().withProtocolVersion(PROTOCOL_VERSION_1));

private ConfigRepository configRepository;
private DefinitionsProvider definitionsProvider;
private JobPersistence jobPersistence;
private ApplyDefinitionsHelper applyDefinitionsHelper;

@BeforeEach
void setup() throws JsonValidationException, IOException {
configRepository = mock(ConfigRepository.class);
definitionsProvider = mock(DefinitionsProvider.class);
jobPersistence = mock(JobPersistence.class);

applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, definitionsProvider);
applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, definitionsProvider, jobPersistence);

// default calls to empty.
when(configRepository.listStandardDestinationDefinitions(true)).thenReturn(Collections.emptyList());
Expand Down Expand Up @@ -132,4 +149,24 @@ void testApplyOSS() throws JsonValidationException, IOException {
verifyNoMoreInteractions(definitionsProvider);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void testDefinitionsFiltering(final boolean updateAll) throws JsonValidationException, IOException {
when(jobPersistence.getCurrentProtocolVersionRange())
.thenReturn(Optional.of(new AirbyteProtocolVersionRange(new Version("2.0.0"), new Version("3.0.0"))));

when(definitionsProvider.getSourceDefinitions()).thenReturn(List.of(SOURCE_DEF1, SOURCE_DEF2));
when(definitionsProvider.getDestinationDefinitions()).thenReturn(List.of(DEST_DEF1, DEST_DEF2));

applyDefinitionsHelper.apply(updateAll);

if (updateAll) {
verify(configRepository).writeStandardSourceDefinition(SOURCE_DEF2);
verify(configRepository).writeStandardDestinationDefinition(DEST_DEF1);
verifyNoMoreInteractions(configRepository);
} else {
verify(configRepository).seedActorDefinitions(List.of(SOURCE_DEF2), List.of(DEST_DEF1));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.airbyte.commons.text.Names;
import io.airbyte.commons.text.Sqls;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.AttemptFailureSummary;
Expand Down Expand Up @@ -838,6 +839,27 @@ public void setAirbyteProtocolVersionMin(final Version version) throws IOExcepti
setMetadata(AirbyteProtocolVersion.AIRBYTE_PROTOCOL_VERSION_MIN_KEY_NAME, version.serialize());
}

@Override
public Optional<AirbyteProtocolVersionRange> getCurrentProtocolVersionRange() throws IOException {
final Optional<Version> min = getAirbyteProtocolVersionMin();
final Optional<Version> max = getAirbyteProtocolVersionMax();

if (min.isPresent() != max.isPresent()) {
// Flagging this because this would be highly suspicious but not bad enough that we should fail
// hard.
// If the new config is fine, the system should self-heal.
LOGGER.warn("Inconsistent AirbyteProtocolVersion found, only one of min/max was found. (min:{}, max:{})",
min.map(Version::serialize).orElse(""), max.map(Version::serialize).orElse(""));
}

if (min.isEmpty() && max.isEmpty()) {
return Optional.empty();
}

return Optional.of(new AirbyteProtocolVersionRange(min.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION),
max.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION)));
}

private Stream<String> getMetadata(final String keyName) throws IOException {
return jobDatabase.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.persistence.job;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.Version;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.JobConfig;
Expand Down Expand Up @@ -262,6 +263,11 @@ List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID con
*/
void setAirbyteProtocolVersionMin(Version version) throws IOException;

/**
* Get the current Airbyte Protocol Version range if defined
*/
Optional<AirbyteProtocolVersionRange> getCurrentProtocolVersionRange() throws IOException;

/**
* Returns a deployment UUID.
*/
Expand Down
Loading