-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Remove ConfigPersistence usage from SecretsMigrator #18747
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
cgardens
merged 2 commits into
master
from
cgardens/remove_config_persistence_from_bootloader
Nov 1, 2022
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,30 +4,27 @@ | |
|
||
package io.airbyte.bootloader; | ||
|
||
import static io.airbyte.config.persistence.split_secrets.SecretsHelpers.COORDINATE_FIELD; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import io.airbyte.commons.json.JsonPaths; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.config.ConfigSchema; | ||
import io.airbyte.config.DestinationConnection; | ||
import io.airbyte.config.SourceConnection; | ||
import io.airbyte.config.StandardDestinationDefinition; | ||
import io.airbyte.config.StandardSourceDefinition; | ||
import io.airbyte.config.persistence.ConfigPersistence; | ||
import io.airbyte.config.persistence.split_secrets.SecretCoordinate; | ||
import io.airbyte.config.persistence.ConfigNotFoundException; | ||
import io.airbyte.config.persistence.ConfigRepository; | ||
import io.airbyte.config.persistence.SecretsRepositoryReader; | ||
import io.airbyte.config.persistence.SecretsRepositoryWriter; | ||
import io.airbyte.config.persistence.split_secrets.SecretPersistence; | ||
import io.airbyte.config.persistence.split_secrets.SecretsHelpers; | ||
import io.airbyte.persistence.job.JobPersistence; | ||
import io.airbyte.protocol.models.ConnectorSpecification; | ||
import io.airbyte.validation.json.JsonValidationException; | ||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.UUID; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Value; | ||
|
@@ -37,7 +34,9 @@ | |
@Slf4j | ||
public class SecretMigrator { | ||
|
||
private final ConfigPersistence configPersistence; | ||
private final SecretsRepositoryReader secretsReader; | ||
private final SecretsRepositoryWriter secretsWriter; | ||
private final ConfigRepository configRepository; | ||
private final JobPersistence jobPersistence; | ||
private final Optional<SecretPersistence> secretPersistence; | ||
|
||
|
@@ -55,34 +54,39 @@ static class ConnectorConfiguration { | |
* Then for all the secret that are stored in a plain text format, it will save the plain text in | ||
* the secret manager and store the coordinate in the config DB. | ||
*/ | ||
public void migrateSecrets() throws JsonValidationException, IOException { | ||
public void migrateSecrets() throws JsonValidationException, IOException, ConfigNotFoundException { | ||
if (secretPersistence.isEmpty()) { | ||
log.info("No secret persistence is provided, the migration won't be run "); | ||
|
||
return; | ||
} | ||
final List<StandardSourceDefinition> standardSourceDefinitions = | ||
configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class); | ||
final List<StandardSourceDefinition> standardSourceDefinitions = configRepository.listStandardSourceDefinitions(true); | ||
|
||
final Map<UUID, JsonNode> definitionIdToSourceSpecs = standardSourceDefinitions | ||
.stream().collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId, | ||
def -> def.getSpec().getConnectionSpecification())); | ||
final Map<UUID, ConnectorSpecification> definitionIdToSourceSpecs = standardSourceDefinitions | ||
.stream().collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId, StandardSourceDefinition::getSpec)); | ||
|
||
final List<SourceConnection> sources = configPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class); | ||
final List<SourceConnection> sourcesWithoutSecrets = configRepository.listSourceConnection(); | ||
final List<SourceConnection> sourcesWithSecrets = new ArrayList<>(); | ||
for (final SourceConnection source : sourcesWithoutSecrets) { | ||
final SourceConnection sourceWithSecrets = secretsReader.getSourceConnectionWithSecrets(source.getSourceId()); | ||
sourcesWithSecrets.add(sourceWithSecrets); | ||
} | ||
|
||
migrateSources(sources, definitionIdToSourceSpecs); | ||
migrateSources(sourcesWithSecrets, definitionIdToSourceSpecs); | ||
|
||
final List<StandardDestinationDefinition> standardDestinationDefinitions = | ||
configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, | ||
StandardDestinationDefinition.class); | ||
final List<StandardDestinationDefinition> standardDestinationDefinitions = configRepository.listStandardDestinationDefinitions(true); | ||
|
||
final Map<UUID, JsonNode> definitionIdToDestinationSpecs = standardDestinationDefinitions.stream() | ||
.collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId, | ||
def -> def.getSpec().getConnectionSpecification())); | ||
final Map<UUID, ConnectorSpecification> definitionIdToDestinationSpecs = standardDestinationDefinitions.stream() | ||
.collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId, StandardDestinationDefinition::getSpec)); | ||
|
||
final List<DestinationConnection> destinations = configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class); | ||
final List<DestinationConnection> destinationsWithoutSecrets = configRepository.listDestinationConnection(); | ||
final List<DestinationConnection> destinationsWithSecrets = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment about moving that to a map There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same same |
||
for (final DestinationConnection destination : destinationsWithoutSecrets) { | ||
final DestinationConnection destinationWithoutSecrets = secretsReader.getDestinationConnectionWithSecrets(destination.getDestinationId()); | ||
destinationsWithSecrets.add(destinationWithoutSecrets); | ||
} | ||
|
||
migrateDestinations(destinations, definitionIdToDestinationSpecs); | ||
migrateDestinations(destinationsWithSecrets, definitionIdToDestinationSpecs); | ||
|
||
jobPersistence.setSecretMigrationDone(); | ||
} | ||
|
@@ -91,120 +95,46 @@ public void migrateSecrets() throws JsonValidationException, IOException { | |
* This is migrating the secrets for the source actors | ||
*/ | ||
@VisibleForTesting | ||
void migrateSources(final List<SourceConnection> sources, final Map<UUID, JsonNode> definitionIdToSourceSpecs) | ||
void migrateSources(final List<SourceConnection> sources, final Map<UUID, ConnectorSpecification> definitionIdToSourceSpecs) | ||
throws JsonValidationException, IOException { | ||
log.info("Migrating Sources"); | ||
final List<SourceConnection> sourceConnections = sources.stream() | ||
.map(source -> { | ||
final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration( | ||
source.getWorkspaceId(), | ||
source.getConfiguration(), | ||
definitionIdToSourceSpecs.get(source.getSourceDefinitionId())), | ||
() -> UUID.randomUUID()); | ||
source.setConfiguration(migratedConfig); | ||
return source; | ||
}) | ||
.toList(); | ||
|
||
for (final SourceConnection source : sourceConnections) { | ||
configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source); | ||
for (final SourceConnection source : sources) { | ||
final Optional<ConnectorSpecification> specOptional = Optional.ofNullable(definitionIdToSourceSpecs.get(source.getSourceDefinitionId())); | ||
|
||
if (specOptional.isPresent()) { | ||
secretsWriter.writeSourceConnection(source, specOptional.get()); | ||
} else { | ||
// if the spec can't be found, don't risk writing secrets to db. wipe out the configuration for the | ||
// connector. | ||
final SourceConnection sourceWithConfigRemoved = Jsons.clone(source); | ||
sourceWithConfigRemoved.setConfiguration(Jsons.emptyObject()); | ||
secretsWriter.writeSourceConnection(sourceWithConfigRemoved, new ConnectorSpecification().withConnectionSpecification(Jsons.emptyObject())); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* This is migrating the secrets for the destination actors | ||
*/ | ||
@VisibleForTesting | ||
void migrateDestinations(final List<DestinationConnection> destinations, final Map<UUID, JsonNode> definitionIdToDestinationSpecs) | ||
void migrateDestinations(final List<DestinationConnection> destinations, final Map<UUID, ConnectorSpecification> definitionIdToDestinationSpecs) | ||
throws JsonValidationException, IOException { | ||
log.info("Migration Destinations"); | ||
for (final DestinationConnection destination : destinations) { | ||
final Optional<ConnectorSpecification> specOptional = | ||
Optional.ofNullable(definitionIdToDestinationSpecs.get(destination.getDestinationDefinitionId())); | ||
|
||
final List<DestinationConnection> destinationConnections = destinations.stream().map(destination -> { | ||
final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration( | ||
destination.getWorkspaceId(), | ||
destination.getConfiguration(), | ||
definitionIdToDestinationSpecs.get(destination.getDestinationDefinitionId())), | ||
() -> UUID.randomUUID()); | ||
destination.setConfiguration(migratedConfig); | ||
return destination; | ||
}) | ||
.toList(); | ||
for (final DestinationConnection destination : destinationConnections) { | ||
configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destination.getDestinationId().toString(), destination); | ||
} | ||
} | ||
|
||
/** | ||
* This is a generic method to migrate an actor configuration It will extract the secret path form | ||
* the provided spec and then replace them by coordinates in the actor configuration | ||
*/ | ||
@VisibleForTesting | ||
JsonNode migrateConfiguration(final ConnectorConfiguration connectorConfiguration, final Supplier<UUID> uuidProvider) { | ||
if (connectorConfiguration.getSpec() == null) { | ||
throw new IllegalStateException("No connector definition to match the connector"); | ||
} | ||
|
||
final AtomicReference<JsonNode> connectorConfigurationJson = new AtomicReference<>(connectorConfiguration.getConfiguration()); | ||
final List<String> uniqSecretPaths = getSecretPath(connectorConfiguration.getSpec()) | ||
.stream() | ||
.flatMap(secretPath -> getAllExplodedPath(connectorConfigurationJson.get(), secretPath).stream()) | ||
.toList(); | ||
|
||
final UUID workspaceId = connectorConfiguration.getWorkspace(); | ||
uniqSecretPaths.forEach(secretPath -> { | ||
final Optional<JsonNode> secretValue = getValueForPath(connectorConfigurationJson.get(), secretPath); | ||
if (secretValue.isEmpty()) { | ||
throw new IllegalStateException("Missing secret for the path: " + secretPath); | ||
} | ||
|
||
// Only migrate plain text. | ||
if (secretValue.get().isTextual()) { | ||
final JsonNode stringSecretValue = secretValue.get(); | ||
|
||
final SecretCoordinate coordinate = | ||
new SecretCoordinate(SecretsHelpers.getCoordinatorBase("airbyte_workspace_", workspaceId, uuidProvider), 1); | ||
secretPersistence.get().write(coordinate, stringSecretValue.textValue()); | ||
connectorConfigurationJson.set(replaceAtJsonNode(connectorConfigurationJson.get(), secretPath, | ||
Jsons.jsonNode(Map.of(COORDINATE_FIELD, coordinate.getFullCoordinate())))); | ||
if (specOptional.isPresent()) { | ||
secretsWriter.writeDestinationConnection(destination, specOptional.get()); | ||
} else { | ||
log.error("Not migrating already migrated secrets"); | ||
// if the spec can't be found, don't risk writing secrets to db. wipe out the configuration for the | ||
// connector. | ||
final DestinationConnection destinationWithConfigRemoved = Jsons.clone(destination); | ||
destinationWithConfigRemoved.setConfiguration(Jsons.emptyObject()); | ||
secretsWriter.writeDestinationConnection(destinationWithConfigRemoved, | ||
new ConnectorSpecification().withConnectionSpecification(Jsons.emptyObject())); | ||
} | ||
|
||
}); | ||
|
||
return connectorConfigurationJson.get(); | ||
} | ||
|
||
/** | ||
* Wrapper to help to mock static methods | ||
*/ | ||
@VisibleForTesting | ||
JsonNode replaceAtJsonNode(final JsonNode connectorConfigurationJson, final String secretPath, final JsonNode replacement) { | ||
return JsonPaths.replaceAtJsonNode(connectorConfigurationJson, secretPath, replacement); | ||
} | ||
|
||
/** | ||
* Wrapper to help to mock static methods | ||
*/ | ||
@VisibleForTesting | ||
List<String> getSecretPath(final JsonNode specs) { | ||
return SecretsHelpers.getSortedSecretPaths(specs); | ||
} | ||
|
||
/** | ||
* Wrapper to help to mock static methods | ||
*/ | ||
@VisibleForTesting | ||
List<String> getAllExplodedPath(final JsonNode node, final String path) { | ||
return JsonPaths.getPaths(node, path); | ||
} | ||
|
||
/** | ||
* Wrapper to help to mock static methods | ||
*/ | ||
@VisibleForTesting | ||
Optional<JsonNode> getValueForPath(final JsonNode node, final String path) { | ||
return JsonPaths.getSingleValue(node, path); | ||
} | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This could be a map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it throws a checked exception, which is why i didn't put it in the lambda.