Skip to content

Remove ConfigPersistence usage from SecretsMigrator #18747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
import io.airbyte.config.init.ApplyDefinitionsHelper;
import io.airbyte.config.init.DefinitionsProvider;
import io.airbyte.config.init.LocalDefinitionsProvider;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
Expand Down Expand Up @@ -68,7 +72,7 @@ public class BootloaderApp {
private final Runnable postLoadExecution;
private final FeatureFlags featureFlags;
private final SecretMigrator secretMigrator;
private ConfigPersistence configPersistence;
private ConfigRepository configRepository;
private DefinitionsProvider localDefinitionsProvider;
private Database configDatabase;
private Database jobDatabase;
Expand Down Expand Up @@ -128,9 +132,6 @@ public BootloaderApp(final Configs configs,

postLoadExecution = () -> {
try {
final ConfigRepository configRepository =
new ConfigRepository(configPersistence, configDatabase);

final ApplyDefinitionsHelper applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, localDefinitionsProvider);
applyDefinitionsHelper.apply();

Expand All @@ -141,7 +142,7 @@ public BootloaderApp(final Configs configs,
}
}
LOGGER.info("Loaded seed data..");
} catch (final IOException | JsonValidationException e) {
} catch (final IOException | JsonValidationException | ConfigNotFoundException e) {
throw new RuntimeException(e);
}
};
Expand Down Expand Up @@ -173,9 +174,6 @@ public void load() throws Exception {
runFlywayMigration(configs, configDbMigrator, jobDbMigrator);
LOGGER.info("Ran Flyway migrations.");

final ConfigRepository configRepository =
new ConfigRepository(configPersistence, configDatabase);

createWorkspaceIfNoneExists(configRepository);
LOGGER.info("Default workspace created.");

Expand Down Expand Up @@ -219,7 +217,7 @@ private static JobPersistence getJobPersistence(final Database jobDatabase) thro
private void initPersistences(final DSLContext configsDslContext, final DSLContext jobsDslContext) {
try {
configDatabase = getConfigDatabase(configsDslContext);
configPersistence = getConfigPersistence(configDatabase);
configRepository = new ConfigRepository(getConfigPersistence(configDatabase), configDatabase);
localDefinitionsProvider = getLocalDefinitionsProvider();
jobDatabase = getJobDatabase(jobsDslContext);
jobPersistence = getJobPersistence(jobDatabase);
Expand All @@ -244,10 +242,17 @@ public static void main(final String[] args) throws Exception {
// TODO Will be converted to an injected singleton during DI migration
final Database configDatabase = getConfigDatabase(configsDslContext);
final ConfigPersistence configPersistence = getConfigPersistence(configDatabase);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, configDatabase);
final Database jobDatabase = getJobDatabase(jobsDslContext);
final JobPersistence jobPersistence = getJobPersistence(jobDatabase);

final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configsDslContext, configs);
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configsDslContext, configs);
final SecretsRepositoryReader secretsRepositoryReader = new SecretsRepositoryReader(configRepository, secretsHydrator);
final SecretsRepositoryWriter secretsRepositoryWriter = new SecretsRepositoryWriter(configRepository, secretPersistence, Optional.empty());

final SecretMigrator secretMigrator =
new SecretMigrator(configPersistence, jobPersistence, SecretPersistence.getLongLived(configsDslContext, configs));
new SecretMigrator(secretsRepositoryReader, secretsRepositoryWriter, configRepository, jobPersistence, secretPersistence);
final Flyway configsFlyway = FlywayFactory.create(configsDataSource, BootloaderApp.class.getSimpleName(), ConfigsDatabaseMigrator.DB_IDENTIFIER,
ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
final Flyway jobsFlyway = FlywayFactory.create(jobsDataSource, BootloaderApp.class.getSimpleName(), JobsDatabaseMigrator.DB_IDENTIFIER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,27 @@

package io.airbyte.bootloader;

import static io.airbyte.config.persistence.split_secrets.SecretsHelpers.COORDINATE_FIELD;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.JsonPaths;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.split_secrets.SecretCoordinate;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHelpers;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Value;
Expand All @@ -37,7 +34,9 @@
@Slf4j
public class SecretMigrator {

private final ConfigPersistence configPersistence;
private final SecretsRepositoryReader secretsReader;
private final SecretsRepositoryWriter secretsWriter;
private final ConfigRepository configRepository;
private final JobPersistence jobPersistence;
private final Optional<SecretPersistence> secretPersistence;

Expand All @@ -55,34 +54,39 @@ static class ConnectorConfiguration {
* Then for all the secret that are stored in a plain text format, it will save the plain text in
* the secret manager and store the coordinate in the config DB.
*/
public void migrateSecrets() throws JsonValidationException, IOException {
public void migrateSecrets() throws JsonValidationException, IOException, ConfigNotFoundException {
if (secretPersistence.isEmpty()) {
log.info("No secret persistence is provided, the migration won't be run ");

return;
}
final List<StandardSourceDefinition> standardSourceDefinitions =
configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class);
final List<StandardSourceDefinition> standardSourceDefinitions = configRepository.listStandardSourceDefinitions(true);

final Map<UUID, JsonNode> definitionIdToSourceSpecs = standardSourceDefinitions
.stream().collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId,
def -> def.getSpec().getConnectionSpecification()));
final Map<UUID, ConnectorSpecification> definitionIdToSourceSpecs = standardSourceDefinitions
.stream().collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId, StandardSourceDefinition::getSpec));

final List<SourceConnection> sources = configPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class);
final List<SourceConnection> sourcesWithoutSecrets = configRepository.listSourceConnection();
final List<SourceConnection> sourcesWithSecrets = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This could be a map

final List<SourceConnection> sourcesWithSecrets = sourcesWithoutSecrets.map(source -> secretsReader.getSourceConnectionWithSecrets(source.getSourceId()))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it throws a checked exception, which is why i didn't put it in the lambda.

for (final SourceConnection source : sourcesWithoutSecrets) {
final SourceConnection sourceWithSecrets = secretsReader.getSourceConnectionWithSecrets(source.getSourceId());
sourcesWithSecrets.add(sourceWithSecrets);
}

migrateSources(sources, definitionIdToSourceSpecs);
migrateSources(sourcesWithSecrets, definitionIdToSourceSpecs);

final List<StandardDestinationDefinition> standardDestinationDefinitions =
configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION,
StandardDestinationDefinition.class);
final List<StandardDestinationDefinition> standardDestinationDefinitions = configRepository.listStandardDestinationDefinitions(true);

final Map<UUID, JsonNode> definitionIdToDestinationSpecs = standardDestinationDefinitions.stream()
.collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId,
def -> def.getSpec().getConnectionSpecification()));
final Map<UUID, ConnectorSpecification> definitionIdToDestinationSpecs = standardDestinationDefinitions.stream()
.collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId, StandardDestinationDefinition::getSpec));

final List<DestinationConnection> destinations = configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class);
final List<DestinationConnection> destinationsWithoutSecrets = configRepository.listDestinationConnection();
final List<DestinationConnection> destinationsWithSecrets = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about moving that to a map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same same

for (final DestinationConnection destination : destinationsWithoutSecrets) {
final DestinationConnection destinationWithoutSecrets = secretsReader.getDestinationConnectionWithSecrets(destination.getDestinationId());
destinationsWithSecrets.add(destinationWithoutSecrets);
}

migrateDestinations(destinations, definitionIdToDestinationSpecs);
migrateDestinations(destinationsWithSecrets, definitionIdToDestinationSpecs);

jobPersistence.setSecretMigrationDone();
}
Expand All @@ -91,120 +95,46 @@ public void migrateSecrets() throws JsonValidationException, IOException {
* This is migrating the secrets for the source actors
*/
@VisibleForTesting
void migrateSources(final List<SourceConnection> sources, final Map<UUID, JsonNode> definitionIdToSourceSpecs)
void migrateSources(final List<SourceConnection> sources, final Map<UUID, ConnectorSpecification> definitionIdToSourceSpecs)
throws JsonValidationException, IOException {
log.info("Migrating Sources");
final List<SourceConnection> sourceConnections = sources.stream()
.map(source -> {
final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration(
source.getWorkspaceId(),
source.getConfiguration(),
definitionIdToSourceSpecs.get(source.getSourceDefinitionId())),
() -> UUID.randomUUID());
source.setConfiguration(migratedConfig);
return source;
})
.toList();

for (final SourceConnection source : sourceConnections) {
configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source);
for (final SourceConnection source : sources) {
final Optional<ConnectorSpecification> specOptional = Optional.ofNullable(definitionIdToSourceSpecs.get(source.getSourceDefinitionId()));

if (specOptional.isPresent()) {
secretsWriter.writeSourceConnection(source, specOptional.get());
} else {
// if the spec can't be found, don't risk writing secrets to db. wipe out the configuration for the
// connector.
final SourceConnection sourceWithConfigRemoved = Jsons.clone(source);
sourceWithConfigRemoved.setConfiguration(Jsons.emptyObject());
secretsWriter.writeSourceConnection(sourceWithConfigRemoved, new ConnectorSpecification().withConnectionSpecification(Jsons.emptyObject()));
}
}
}

/**
* This is migrating the secrets for the destination actors
*/
@VisibleForTesting
void migrateDestinations(final List<DestinationConnection> destinations, final Map<UUID, JsonNode> definitionIdToDestinationSpecs)
void migrateDestinations(final List<DestinationConnection> destinations, final Map<UUID, ConnectorSpecification> definitionIdToDestinationSpecs)
throws JsonValidationException, IOException {
log.info("Migration Destinations");
for (final DestinationConnection destination : destinations) {
final Optional<ConnectorSpecification> specOptional =
Optional.ofNullable(definitionIdToDestinationSpecs.get(destination.getDestinationDefinitionId()));

final List<DestinationConnection> destinationConnections = destinations.stream().map(destination -> {
final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration(
destination.getWorkspaceId(),
destination.getConfiguration(),
definitionIdToDestinationSpecs.get(destination.getDestinationDefinitionId())),
() -> UUID.randomUUID());
destination.setConfiguration(migratedConfig);
return destination;
})
.toList();
for (final DestinationConnection destination : destinationConnections) {
configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destination.getDestinationId().toString(), destination);
}
}

/**
* This is a generic method to migrate an actor configuration It will extract the secret path form
* the provided spec and then replace them by coordinates in the actor configuration
*/
@VisibleForTesting
JsonNode migrateConfiguration(final ConnectorConfiguration connectorConfiguration, final Supplier<UUID> uuidProvider) {
if (connectorConfiguration.getSpec() == null) {
throw new IllegalStateException("No connector definition to match the connector");
}

final AtomicReference<JsonNode> connectorConfigurationJson = new AtomicReference<>(connectorConfiguration.getConfiguration());
final List<String> uniqSecretPaths = getSecretPath(connectorConfiguration.getSpec())
.stream()
.flatMap(secretPath -> getAllExplodedPath(connectorConfigurationJson.get(), secretPath).stream())
.toList();

final UUID workspaceId = connectorConfiguration.getWorkspace();
uniqSecretPaths.forEach(secretPath -> {
final Optional<JsonNode> secretValue = getValueForPath(connectorConfigurationJson.get(), secretPath);
if (secretValue.isEmpty()) {
throw new IllegalStateException("Missing secret for the path: " + secretPath);
}

// Only migrate plain text.
if (secretValue.get().isTextual()) {
final JsonNode stringSecretValue = secretValue.get();

final SecretCoordinate coordinate =
new SecretCoordinate(SecretsHelpers.getCoordinatorBase("airbyte_workspace_", workspaceId, uuidProvider), 1);
secretPersistence.get().write(coordinate, stringSecretValue.textValue());
connectorConfigurationJson.set(replaceAtJsonNode(connectorConfigurationJson.get(), secretPath,
Jsons.jsonNode(Map.of(COORDINATE_FIELD, coordinate.getFullCoordinate()))));
if (specOptional.isPresent()) {
secretsWriter.writeDestinationConnection(destination, specOptional.get());
} else {
log.error("Not migrating already migrated secrets");
// if the spec can't be found, don't risk writing secrets to db. wipe out the configuration for the
// connector.
final DestinationConnection destinationWithConfigRemoved = Jsons.clone(destination);
destinationWithConfigRemoved.setConfiguration(Jsons.emptyObject());
secretsWriter.writeDestinationConnection(destinationWithConfigRemoved,
new ConnectorSpecification().withConnectionSpecification(Jsons.emptyObject()));
}

});

return connectorConfigurationJson.get();
}

/**
* Wrapper to help to mock static methods
*/
@VisibleForTesting
JsonNode replaceAtJsonNode(final JsonNode connectorConfigurationJson, final String secretPath, final JsonNode replacement) {
return JsonPaths.replaceAtJsonNode(connectorConfigurationJson, secretPath, replacement);
}

/**
* Wrapper to help to mock static methods
*/
@VisibleForTesting
List<String> getSecretPath(final JsonNode specs) {
return SecretsHelpers.getSortedSecretPaths(specs);
}

/**
* Wrapper to help to mock static methods
*/
@VisibleForTesting
List<String> getAllExplodedPath(final JsonNode node, final String path) {
return JsonPaths.getPaths(node, path);
}

/**
* Wrapper to help to mock static methods
*/
@VisibleForTesting
Optional<JsonNode> getValueForPath(final JsonNode node, final String path) {
return JsonPaths.getSingleValue(node, path);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.config.persistence.split_secrets.LocalTestingSecretPersistence;
import io.airbyte.config.persistence.split_secrets.RealSecretsHydrator;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
Expand Down Expand Up @@ -185,10 +189,17 @@ void testBootloaderAppRunSecretMigration() throws Exception {
val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false);

val configPersistence = new DatabaseConfigPersistence(configDatabase, jsonSecretsProcessor);
val configRepository = new ConfigRepository(configPersistence, configDatabase);
val jobsPersistence = new DefaultJobPersistence(jobDatabase);

val secretsPersistence = SecretPersistence.getLongLived(configsDslContext, mockedConfigs);
final LocalTestingSecretPersistence localTestingSecretPersistence = new LocalTestingSecretPersistence(configDatabase);

val secretsReader = new SecretsRepositoryReader(configRepository, new RealSecretsHydrator(localTestingSecretPersistence));
val secretsWriter = new SecretsRepositoryWriter(configRepository, secretsPersistence, Optional.empty());

val spiedSecretMigrator =
spy(new SecretMigrator(configPersistence, jobsPersistence, SecretPersistence.getLongLived(configsDslContext, mockedConfigs)));
spy(new SecretMigrator(secretsReader, secretsWriter, configRepository, jobsPersistence, secretsPersistence));

// Although we are able to inject mocked configs into the Bootloader, a particular migration in the
// configs database requires the env var to be set. Flyway prevents injection, so we dynamically set
Expand All @@ -202,7 +213,6 @@ void testBootloaderAppRunSecretMigration() throws Exception {
initBootloader.load();

final DefinitionsProvider localDefinitions = new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, configDatabase);
final ConfigPersistence localConfigPersistence = new DefinitionProviderToConfigPersistenceAdapter(localDefinitions);
configRepository.loadDataNoSecrets(localConfigPersistence);

Expand Down
Loading