Skip to content

Commit ebb9126

Browse files
authored
Remove ConfigPersistence usage from SecretsMigrator (#18747)
1 parent 7b9a097 commit ebb9126

File tree

5 files changed

+126
-238
lines changed

5 files changed

+126
-238
lines changed

airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
import io.airbyte.config.init.ApplyDefinitionsHelper;
1818
import io.airbyte.config.init.DefinitionsProvider;
1919
import io.airbyte.config.init.LocalDefinitionsProvider;
20+
import io.airbyte.config.persistence.ConfigNotFoundException;
2021
import io.airbyte.config.persistence.ConfigPersistence;
2122
import io.airbyte.config.persistence.ConfigRepository;
2223
import io.airbyte.config.persistence.DatabaseConfigPersistence;
24+
import io.airbyte.config.persistence.SecretsRepositoryReader;
25+
import io.airbyte.config.persistence.SecretsRepositoryWriter;
2326
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
2427
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
28+
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
2529
import io.airbyte.db.Database;
2630
import io.airbyte.db.factory.DSLContextFactory;
2731
import io.airbyte.db.factory.DataSourceFactory;
@@ -68,7 +72,7 @@ public class BootloaderApp {
6872
private final Runnable postLoadExecution;
6973
private final FeatureFlags featureFlags;
7074
private final SecretMigrator secretMigrator;
71-
private ConfigPersistence configPersistence;
75+
private ConfigRepository configRepository;
7276
private DefinitionsProvider localDefinitionsProvider;
7377
private Database configDatabase;
7478
private Database jobDatabase;
@@ -128,9 +132,6 @@ public BootloaderApp(final Configs configs,
128132

129133
postLoadExecution = () -> {
130134
try {
131-
final ConfigRepository configRepository =
132-
new ConfigRepository(configPersistence, configDatabase);
133-
134135
final ApplyDefinitionsHelper applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, localDefinitionsProvider);
135136
applyDefinitionsHelper.apply();
136137

@@ -141,7 +142,7 @@ public BootloaderApp(final Configs configs,
141142
}
142143
}
143144
LOGGER.info("Loaded seed data..");
144-
} catch (final IOException | JsonValidationException e) {
145+
} catch (final IOException | JsonValidationException | ConfigNotFoundException e) {
145146
throw new RuntimeException(e);
146147
}
147148
};
@@ -173,9 +174,6 @@ public void load() throws Exception {
173174
runFlywayMigration(configs, configDbMigrator, jobDbMigrator);
174175
LOGGER.info("Ran Flyway migrations.");
175176

176-
final ConfigRepository configRepository =
177-
new ConfigRepository(configPersistence, configDatabase);
178-
179177
createWorkspaceIfNoneExists(configRepository);
180178
LOGGER.info("Default workspace created.");
181179

@@ -219,7 +217,7 @@ private static JobPersistence getJobPersistence(final Database jobDatabase) thro
219217
private void initPersistences(final DSLContext configsDslContext, final DSLContext jobsDslContext) {
220218
try {
221219
configDatabase = getConfigDatabase(configsDslContext);
222-
configPersistence = getConfigPersistence(configDatabase);
220+
configRepository = new ConfigRepository(getConfigPersistence(configDatabase), configDatabase);
223221
localDefinitionsProvider = getLocalDefinitionsProvider();
224222
jobDatabase = getJobDatabase(jobsDslContext);
225223
jobPersistence = getJobPersistence(jobDatabase);
@@ -244,10 +242,17 @@ public static void main(final String[] args) throws Exception {
244242
// TODO Will be converted to an injected singleton during DI migration
245243
final Database configDatabase = getConfigDatabase(configsDslContext);
246244
final ConfigPersistence configPersistence = getConfigPersistence(configDatabase);
245+
final ConfigRepository configRepository = new ConfigRepository(configPersistence, configDatabase);
247246
final Database jobDatabase = getJobDatabase(jobsDslContext);
248247
final JobPersistence jobPersistence = getJobPersistence(jobDatabase);
248+
249+
final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configsDslContext, configs);
250+
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configsDslContext, configs);
251+
final SecretsRepositoryReader secretsRepositoryReader = new SecretsRepositoryReader(configRepository, secretsHydrator);
252+
final SecretsRepositoryWriter secretsRepositoryWriter = new SecretsRepositoryWriter(configRepository, secretPersistence, Optional.empty());
253+
249254
final SecretMigrator secretMigrator =
250-
new SecretMigrator(configPersistence, jobPersistence, SecretPersistence.getLongLived(configsDslContext, configs));
255+
new SecretMigrator(secretsRepositoryReader, secretsRepositoryWriter, configRepository, jobPersistence, secretPersistence);
251256
final Flyway configsFlyway = FlywayFactory.create(configsDataSource, BootloaderApp.class.getSimpleName(), ConfigsDatabaseMigrator.DB_IDENTIFIER,
252257
ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
253258
final Flyway jobsFlyway = FlywayFactory.create(jobsDataSource, BootloaderApp.class.getSimpleName(), JobsDatabaseMigrator.DB_IDENTIFIER,

airbyte-bootloader/src/main/java/io/airbyte/bootloader/SecretMigrator.java

Lines changed: 56 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,27 @@
44

55
package io.airbyte.bootloader;
66

7-
import static io.airbyte.config.persistence.split_secrets.SecretsHelpers.COORDINATE_FIELD;
8-
97
import com.fasterxml.jackson.databind.JsonNode;
108
import com.google.common.annotations.VisibleForTesting;
11-
import io.airbyte.commons.json.JsonPaths;
129
import io.airbyte.commons.json.Jsons;
13-
import io.airbyte.config.ConfigSchema;
1410
import io.airbyte.config.DestinationConnection;
1511
import io.airbyte.config.SourceConnection;
1612
import io.airbyte.config.StandardDestinationDefinition;
1713
import io.airbyte.config.StandardSourceDefinition;
18-
import io.airbyte.config.persistence.ConfigPersistence;
19-
import io.airbyte.config.persistence.split_secrets.SecretCoordinate;
14+
import io.airbyte.config.persistence.ConfigNotFoundException;
15+
import io.airbyte.config.persistence.ConfigRepository;
16+
import io.airbyte.config.persistence.SecretsRepositoryReader;
17+
import io.airbyte.config.persistence.SecretsRepositoryWriter;
2018
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
21-
import io.airbyte.config.persistence.split_secrets.SecretsHelpers;
2219
import io.airbyte.persistence.job.JobPersistence;
20+
import io.airbyte.protocol.models.ConnectorSpecification;
2321
import io.airbyte.validation.json.JsonValidationException;
2422
import java.io.IOException;
23+
import java.util.ArrayList;
2524
import java.util.List;
2625
import java.util.Map;
2726
import java.util.Optional;
2827
import java.util.UUID;
29-
import java.util.concurrent.atomic.AtomicReference;
30-
import java.util.function.Supplier;
3128
import java.util.stream.Collectors;
3229
import lombok.AllArgsConstructor;
3330
import lombok.Value;
@@ -37,7 +34,9 @@
3734
@Slf4j
3835
public class SecretMigrator {
3936

40-
private final ConfigPersistence configPersistence;
37+
private final SecretsRepositoryReader secretsReader;
38+
private final SecretsRepositoryWriter secretsWriter;
39+
private final ConfigRepository configRepository;
4140
private final JobPersistence jobPersistence;
4241
private final Optional<SecretPersistence> secretPersistence;
4342

@@ -55,34 +54,39 @@ static class ConnectorConfiguration {
5554
* Then for all the secret that are stored in a plain text format, it will save the plain text in
5655
* the secret manager and store the coordinate in the config DB.
5756
*/
58-
public void migrateSecrets() throws JsonValidationException, IOException {
57+
public void migrateSecrets() throws JsonValidationException, IOException, ConfigNotFoundException {
5958
if (secretPersistence.isEmpty()) {
6059
log.info("No secret persistence is provided, the migration won't be run ");
6160

6261
return;
6362
}
64-
final List<StandardSourceDefinition> standardSourceDefinitions =
65-
configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class);
63+
final List<StandardSourceDefinition> standardSourceDefinitions = configRepository.listStandardSourceDefinitions(true);
6664

67-
final Map<UUID, JsonNode> definitionIdToSourceSpecs = standardSourceDefinitions
68-
.stream().collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId,
69-
def -> def.getSpec().getConnectionSpecification()));
65+
final Map<UUID, ConnectorSpecification> definitionIdToSourceSpecs = standardSourceDefinitions
66+
.stream().collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId, StandardSourceDefinition::getSpec));
7067

71-
final List<SourceConnection> sources = configPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class);
68+
final List<SourceConnection> sourcesWithoutSecrets = configRepository.listSourceConnection();
69+
final List<SourceConnection> sourcesWithSecrets = new ArrayList<>();
70+
for (final SourceConnection source : sourcesWithoutSecrets) {
71+
final SourceConnection sourceWithSecrets = secretsReader.getSourceConnectionWithSecrets(source.getSourceId());
72+
sourcesWithSecrets.add(sourceWithSecrets);
73+
}
7274

73-
migrateSources(sources, definitionIdToSourceSpecs);
75+
migrateSources(sourcesWithSecrets, definitionIdToSourceSpecs);
7476

75-
final List<StandardDestinationDefinition> standardDestinationDefinitions =
76-
configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION,
77-
StandardDestinationDefinition.class);
77+
final List<StandardDestinationDefinition> standardDestinationDefinitions = configRepository.listStandardDestinationDefinitions(true);
7878

79-
final Map<UUID, JsonNode> definitionIdToDestinationSpecs = standardDestinationDefinitions.stream()
80-
.collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId,
81-
def -> def.getSpec().getConnectionSpecification()));
79+
final Map<UUID, ConnectorSpecification> definitionIdToDestinationSpecs = standardDestinationDefinitions.stream()
80+
.collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId, StandardDestinationDefinition::getSpec));
8281

83-
final List<DestinationConnection> destinations = configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class);
82+
final List<DestinationConnection> destinationsWithoutSecrets = configRepository.listDestinationConnection();
83+
final List<DestinationConnection> destinationsWithSecrets = new ArrayList<>();
84+
for (final DestinationConnection destination : destinationsWithoutSecrets) {
85+
final DestinationConnection destinationWithoutSecrets = secretsReader.getDestinationConnectionWithSecrets(destination.getDestinationId());
86+
destinationsWithSecrets.add(destinationWithoutSecrets);
87+
}
8488

85-
migrateDestinations(destinations, definitionIdToDestinationSpecs);
89+
migrateDestinations(destinationsWithSecrets, definitionIdToDestinationSpecs);
8690

8791
jobPersistence.setSecretMigrationDone();
8892
}
@@ -91,120 +95,46 @@ public void migrateSecrets() throws JsonValidationException, IOException {
9195
* This is migrating the secrets for the source actors
9296
*/
9397
@VisibleForTesting
94-
void migrateSources(final List<SourceConnection> sources, final Map<UUID, JsonNode> definitionIdToSourceSpecs)
98+
void migrateSources(final List<SourceConnection> sources, final Map<UUID, ConnectorSpecification> definitionIdToSourceSpecs)
9599
throws JsonValidationException, IOException {
96100
log.info("Migrating Sources");
97-
final List<SourceConnection> sourceConnections = sources.stream()
98-
.map(source -> {
99-
final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration(
100-
source.getWorkspaceId(),
101-
source.getConfiguration(),
102-
definitionIdToSourceSpecs.get(source.getSourceDefinitionId())),
103-
() -> UUID.randomUUID());
104-
source.setConfiguration(migratedConfig);
105-
return source;
106-
})
107-
.toList();
108-
109-
for (final SourceConnection source : sourceConnections) {
110-
configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source);
101+
for (final SourceConnection source : sources) {
102+
final Optional<ConnectorSpecification> specOptional = Optional.ofNullable(definitionIdToSourceSpecs.get(source.getSourceDefinitionId()));
103+
104+
if (specOptional.isPresent()) {
105+
secretsWriter.writeSourceConnection(source, specOptional.get());
106+
} else {
107+
// if the spec can't be found, don't risk writing secrets to db. wipe out the configuration for the
108+
// connector.
109+
final SourceConnection sourceWithConfigRemoved = Jsons.clone(source);
110+
sourceWithConfigRemoved.setConfiguration(Jsons.emptyObject());
111+
secretsWriter.writeSourceConnection(sourceWithConfigRemoved, new ConnectorSpecification().withConnectionSpecification(Jsons.emptyObject()));
112+
}
111113
}
112114
}
113115

114116
/**
115117
* This is migrating the secrets for the destination actors
116118
*/
117119
@VisibleForTesting
118-
void migrateDestinations(final List<DestinationConnection> destinations, final Map<UUID, JsonNode> definitionIdToDestinationSpecs)
120+
void migrateDestinations(final List<DestinationConnection> destinations, final Map<UUID, ConnectorSpecification> definitionIdToDestinationSpecs)
119121
throws JsonValidationException, IOException {
120122
log.info("Migration Destinations");
123+
for (final DestinationConnection destination : destinations) {
124+
final Optional<ConnectorSpecification> specOptional =
125+
Optional.ofNullable(definitionIdToDestinationSpecs.get(destination.getDestinationDefinitionId()));
121126

122-
final List<DestinationConnection> destinationConnections = destinations.stream().map(destination -> {
123-
final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration(
124-
destination.getWorkspaceId(),
125-
destination.getConfiguration(),
126-
definitionIdToDestinationSpecs.get(destination.getDestinationDefinitionId())),
127-
() -> UUID.randomUUID());
128-
destination.setConfiguration(migratedConfig);
129-
return destination;
130-
})
131-
.toList();
132-
for (final DestinationConnection destination : destinationConnections) {
133-
configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destination.getDestinationId().toString(), destination);
134-
}
135-
}
136-
137-
/**
138-
* This is a generic method to migrate an actor configuration It will extract the secret path form
139-
* the provided spec and then replace them by coordinates in the actor configuration
140-
*/
141-
@VisibleForTesting
142-
JsonNode migrateConfiguration(final ConnectorConfiguration connectorConfiguration, final Supplier<UUID> uuidProvider) {
143-
if (connectorConfiguration.getSpec() == null) {
144-
throw new IllegalStateException("No connector definition to match the connector");
145-
}
146-
147-
final AtomicReference<JsonNode> connectorConfigurationJson = new AtomicReference<>(connectorConfiguration.getConfiguration());
148-
final List<String> uniqSecretPaths = getSecretPath(connectorConfiguration.getSpec())
149-
.stream()
150-
.flatMap(secretPath -> getAllExplodedPath(connectorConfigurationJson.get(), secretPath).stream())
151-
.toList();
152-
153-
final UUID workspaceId = connectorConfiguration.getWorkspace();
154-
uniqSecretPaths.forEach(secretPath -> {
155-
final Optional<JsonNode> secretValue = getValueForPath(connectorConfigurationJson.get(), secretPath);
156-
if (secretValue.isEmpty()) {
157-
throw new IllegalStateException("Missing secret for the path: " + secretPath);
158-
}
159-
160-
// Only migrate plain text.
161-
if (secretValue.get().isTextual()) {
162-
final JsonNode stringSecretValue = secretValue.get();
163-
164-
final SecretCoordinate coordinate =
165-
new SecretCoordinate(SecretsHelpers.getCoordinatorBase("airbyte_workspace_", workspaceId, uuidProvider), 1);
166-
secretPersistence.get().write(coordinate, stringSecretValue.textValue());
167-
connectorConfigurationJson.set(replaceAtJsonNode(connectorConfigurationJson.get(), secretPath,
168-
Jsons.jsonNode(Map.of(COORDINATE_FIELD, coordinate.getFullCoordinate()))));
127+
if (specOptional.isPresent()) {
128+
secretsWriter.writeDestinationConnection(destination, specOptional.get());
169129
} else {
170-
log.error("Not migrating already migrated secrets");
130+
// if the spec can't be found, don't risk writing secrets to db. wipe out the configuration for the
131+
// connector.
132+
final DestinationConnection destinationWithConfigRemoved = Jsons.clone(destination);
133+
destinationWithConfigRemoved.setConfiguration(Jsons.emptyObject());
134+
secretsWriter.writeDestinationConnection(destinationWithConfigRemoved,
135+
new ConnectorSpecification().withConnectionSpecification(Jsons.emptyObject()));
171136
}
172-
173-
});
174-
175-
return connectorConfigurationJson.get();
176-
}
177-
178-
/**
179-
* Wrapper to help to mock static methods
180-
*/
181-
@VisibleForTesting
182-
JsonNode replaceAtJsonNode(final JsonNode connectorConfigurationJson, final String secretPath, final JsonNode replacement) {
183-
return JsonPaths.replaceAtJsonNode(connectorConfigurationJson, secretPath, replacement);
184-
}
185-
186-
/**
187-
* Wrapper to help to mock static methods
188-
*/
189-
@VisibleForTesting
190-
List<String> getSecretPath(final JsonNode specs) {
191-
return SecretsHelpers.getSortedSecretPaths(specs);
192-
}
193-
194-
/**
195-
* Wrapper to help to mock static methods
196-
*/
197-
@VisibleForTesting
198-
List<String> getAllExplodedPath(final JsonNode node, final String path) {
199-
return JsonPaths.getPaths(node, path);
200-
}
201-
202-
/**
203-
* Wrapper to help to mock static methods
204-
*/
205-
@VisibleForTesting
206-
Optional<JsonNode> getValueForPath(final JsonNode node, final String path) {
207-
return JsonPaths.getSingleValue(node, path);
137+
}
208138
}
209139

210140
}

airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@
3030
import io.airbyte.config.persistence.ConfigPersistence;
3131
import io.airbyte.config.persistence.ConfigRepository;
3232
import io.airbyte.config.persistence.DatabaseConfigPersistence;
33+
import io.airbyte.config.persistence.SecretsRepositoryReader;
34+
import io.airbyte.config.persistence.SecretsRepositoryWriter;
3335
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
36+
import io.airbyte.config.persistence.split_secrets.LocalTestingSecretPersistence;
37+
import io.airbyte.config.persistence.split_secrets.RealSecretsHydrator;
3438
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
3539
import io.airbyte.db.factory.DSLContextFactory;
3640
import io.airbyte.db.factory.DataSourceFactory;
@@ -185,10 +189,17 @@ void testBootloaderAppRunSecretMigration() throws Exception {
185189
val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false);
186190

187191
val configPersistence = new DatabaseConfigPersistence(configDatabase, jsonSecretsProcessor);
192+
val configRepository = new ConfigRepository(configPersistence, configDatabase);
188193
val jobsPersistence = new DefaultJobPersistence(jobDatabase);
189194

195+
val secretsPersistence = SecretPersistence.getLongLived(configsDslContext, mockedConfigs);
196+
final LocalTestingSecretPersistence localTestingSecretPersistence = new LocalTestingSecretPersistence(configDatabase);
197+
198+
val secretsReader = new SecretsRepositoryReader(configRepository, new RealSecretsHydrator(localTestingSecretPersistence));
199+
val secretsWriter = new SecretsRepositoryWriter(configRepository, secretsPersistence, Optional.empty());
200+
190201
val spiedSecretMigrator =
191-
spy(new SecretMigrator(configPersistence, jobsPersistence, SecretPersistence.getLongLived(configsDslContext, mockedConfigs)));
202+
spy(new SecretMigrator(secretsReader, secretsWriter, configRepository, jobsPersistence, secretsPersistence));
192203

193204
// Although we are able to inject mocked configs into the Bootloader, a particular migration in the
194205
// configs database requires the env var to be set. Flyway prevents injection, so we dynamically set
@@ -202,7 +213,6 @@ void testBootloaderAppRunSecretMigration() throws Exception {
202213
initBootloader.load();
203214

204215
final DefinitionsProvider localDefinitions = new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS);
205-
final ConfigRepository configRepository = new ConfigRepository(configPersistence, configDatabase);
206216
final ConfigPersistence localConfigPersistence = new DefinitionProviderToConfigPersistenceAdapter(localDefinitions);
207217
configRepository.loadDataNoSecrets(localConfigPersistence);
208218

0 commit comments

Comments
 (0)