Skip to content

Commit d0f2181

Browse files
Interface changes to support separating secrets from the config (#6065)
* Interface changes to support separating secrets from the config * Cleanup from PR comments and whitespace
1 parent 4a0d364 commit d0f2181

File tree

6 files changed

+34
-27
lines changed

6 files changed

+34
-27
lines changed

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,12 @@ public DestinationConnection getDestinationConnection(final UUID destinationId)
188188
return persistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), DestinationConnection.class);
189189
}
190190

191-
public void writeDestinationConnection(final DestinationConnection destinationConnection) throws JsonValidationException, IOException {
191+
public void writeDestinationConnection(final DestinationConnection destinationConnection, final ConnectorSpecification connectorSpecification)
192+
throws JsonValidationException, IOException {
193+
// actual validation is only for sanity checking
194+
final JsonSchemaValidator validator = new JsonSchemaValidator();
195+
validator.ensure(connectorSpecification.getConnectionSpecification(), destinationConnection.getConfiguration());
196+
192197
persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection);
193198
}
194199

airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -149,13 +149,13 @@ public void testSource() throws IOException, JsonValidationException {
149149
@Test
150150
public void testDestination() throws IOException, JsonValidationException {
151151
configRepository.writeStandardDestinationDefinition(DEST_DEF);
152-
configRepository.writeDestinationConnection(DEST);
152+
configRepository.writeDestinationConnection(DEST, emptyConnectorSpec);
153153

154154
final UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
155155
assertEquals(WORKSPACE_ID, retrievedWorkspace);
156156

157157
// check that caching is working
158-
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID()));
158+
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID()), emptyConnectorSpec);
159159
final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
160160
assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate);
161161
}
@@ -165,7 +165,7 @@ public void testConnection() throws IOException, JsonValidationException {
165165
configRepository.writeStandardSource(SOURCE_DEF);
166166
configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec);
167167
configRepository.writeStandardDestinationDefinition(DEST_DEF);
168-
configRepository.writeDestinationConnection(DEST);
168+
configRepository.writeDestinationConnection(DEST, emptyConnectorSpec);
169169

170170
// set up connection
171171
configRepository.writeStandardSync(CONNECTION);
@@ -181,7 +181,7 @@ public void testConnection() throws IOException, JsonValidationException {
181181
// check that caching is working
182182
final UUID newWorkspace = UUID.randomUUID();
183183
configRepository.writeSourceConnection(Jsons.clone(SOURCE).withWorkspaceId(newWorkspace), emptyConnectorSpec);
184-
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace));
184+
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace), emptyConnectorSpec);
185185
final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
186186
assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate);
187187
}
@@ -205,7 +205,7 @@ public void testConnectionAndJobs() throws IOException, JsonValidationException
205205
configRepository.writeStandardSource(SOURCE_DEF);
206206
configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec);
207207
configRepository.writeStandardDestinationDefinition(DEST_DEF);
208-
configRepository.writeDestinationConnection(DEST);
208+
configRepository.writeDestinationConnection(DEST, emptyConnectorSpec);
209209
configRepository.writeStandardSync(CONNECTION);
210210

211211
// test jobs

airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,12 @@
4949
import io.airbyte.config.persistence.ConfigPersistence;
5050
import io.airbyte.config.persistence.ConfigRepository;
5151
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
52-
import io.airbyte.protocol.models.ConnectorSpecification;
5352
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
5453
import io.airbyte.scheduler.persistence.JobPersistence;
5554
import io.airbyte.scheduler.persistence.WorkspaceHelper;
5655
import io.airbyte.server.converters.SpecFetcher;
5756
import io.airbyte.server.errors.IdNotFoundKnownException;
57+
import io.airbyte.server.handlers.DestinationHandler;
5858
import io.airbyte.server.handlers.SourceHandler;
5959
import io.airbyte.validation.json.JsonSchemaValidator;
6060
import io.airbyte.validation.json.JsonValidationException;
@@ -414,19 +414,17 @@ private <T> void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b
414414
return sourceConnection;
415415
},
416416
(sourceConnection) -> {
417-
final ConnectorSpecification spec;
418417
// make sure connector definition exists
419418
try {
420419
final StandardSourceDefinition sourceDefinition =
421420
configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId());
422421
if (sourceDefinition == null) {
423422
return;
424423
}
425-
spec = SourceHandler.getSpecFromSourceDefinitionId(specFetcher, sourceDefinition);
424+
configRepository.writeSourceConnection(sourceConnection, SourceHandler.getSpecFromSourceDefinitionId(specFetcher, sourceDefinition));
426425
} catch (ConfigNotFoundException e) {
427426
return;
428427
}
429-
configRepository.writeSourceConnection(sourceConnection, spec);
430428
}));
431429
case STANDARD_DESTINATION_DEFINITION -> importDestinationDefinitionIntoWorkspace(configs);
432430
case DESTINATION_CONNECTION -> destinationIdMap.putAll(importIntoWorkspace(
@@ -442,13 +440,15 @@ private <T> void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b
442440
(destinationConnection) -> {
443441
// make sure connector definition exists
444442
try {
445-
if (configRepository.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId()) == null) {
443+
StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(
444+
destinationConnection.getDestinationDefinitionId());
445+
if (destinationDefinition == null) {
446446
return;
447447
}
448+
configRepository.writeDestinationConnection(destinationConnection, DestinationHandler.getSpec(specFetcher, destinationDefinition));
448449
} catch (ConfigNotFoundException e) {
449450
return;
450451
}
451-
configRepository.writeDestinationConnection(destinationConnection);
452452
}));
453453
case STANDARD_SYNC -> standardSyncs = configs;
454454
case STANDARD_SYNC_OPERATION -> operationIdMap.putAll(importIntoWorkspace(

airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,14 @@ private void validateDestination(final ConnectorSpecification spec, final JsonNo
205205
validator.ensure(spec.getConnectionSpecification(), configuration);
206206
}
207207

208-
private ConnectorSpecification getSpec(UUID destinationDefinitionId)
208+
public ConnectorSpecification getSpec(UUID destinationDefinitionId)
209209
throws JsonValidationException, IOException, ConfigNotFoundException {
210-
final StandardDestinationDefinition destinationDef = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
211-
final String imageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag());
212-
return specFetcher.execute(imageName);
210+
return getSpec(specFetcher, configRepository.getStandardDestinationDefinition(destinationDefinitionId));
211+
}
212+
213+
public static ConnectorSpecification getSpec(SpecFetcher specFetcher, StandardDestinationDefinition destinationDef)
214+
throws JsonValidationException, IOException, ConfigNotFoundException {
215+
return specFetcher.execute(DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag()));
213216
}
214217

215218
private void persistDestinationConnection(final String name,
@@ -218,16 +221,15 @@ private void persistDestinationConnection(final String name,
218221
final UUID destinationId,
219222
final JsonNode configurationJson,
220223
final boolean tombstone)
221-
throws JsonValidationException, IOException {
224+
throws JsonValidationException, IOException, ConfigNotFoundException {
222225
final DestinationConnection destinationConnection = new DestinationConnection()
223226
.withName(name)
224227
.withDestinationDefinitionId(destinationDefinitionId)
225228
.withWorkspaceId(workspaceId)
226229
.withDestinationId(destinationId)
227230
.withConfiguration(configurationJson)
228231
.withTombstone(tombstone);
229-
230-
configRepository.writeDestinationConnection(destinationConnection);
232+
configRepository.writeDestinationConnection(destinationConnection, getSpec(destinationDefinitionId));
231233
}
232234

233235
private DestinationRead buildDestinationRead(final UUID destinationId) throws JsonValidationException, IOException, ConfigNotFoundException {

airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ public void testImportIntoWorkspaceWithConflicts() throws JsonValidationExceptio
190190
Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId).withSourceId(not(eq(sourceConnection.getSourceId()))),
191191
eq(emptyConnectorSpec));
192192
verify(configRepository).writeDestinationConnection(
193-
Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId()))));
193+
Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId()))),
194+
eq(emptyConnectorSpec));
194195
verify(configRepository)
195196
.writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId).withOperationId(not(eq(operation.getOperationId()))));
196197
verify(configRepository).writeStandardSync(Jsons.clone(connection).withConnectionId(not(eq(connection.getConnectionId()))));
@@ -241,7 +242,7 @@ public void testImportIntoWorkspaceWithoutConflicts() throws JsonValidationExcep
241242
verify(configRepository).writeSourceConnection(
242243
Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId),
243244
emptyConnectorSpec);
244-
verify(configRepository).writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId));
245+
verify(configRepository).writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId), emptyConnectorSpec);
245246
verify(configRepository).writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId));
246247
verify(configRepository).writeStandardSync(connection);
247248
}

airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ class DestinationHandlerTest {
7171
private ConfigRepository configRepository;
7272
private StandardDestinationDefinition standardDestinationDefinition;
7373
private DestinationDefinitionSpecificationRead destinationDefinitionSpecificationRead;
74-
private DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody;
7574
private DestinationConnection destinationConnection;
7675
private DestinationHandler destinationHandler;
7776
private ConnectionsHandler connectionsHandler;
@@ -104,8 +103,8 @@ void setUp() throws IOException {
104103
imageName =
105104
DockerUtils.getTaggedImageName(standardDestinationDefinition.getDockerRepository(), standardDestinationDefinition.getDockerImageTag());
106105

107-
destinationDefinitionIdRequestBody =
108-
new DestinationDefinitionIdRequestBody().destinationDefinitionId(standardDestinationDefinition.getDestinationDefinitionId());
106+
DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody = new DestinationDefinitionIdRequestBody().destinationDefinitionId(
107+
standardDestinationDefinition.getDestinationDefinitionId());
109108

110109
connectorSpecification = ConnectorSpecificationHelpers.generateConnectorSpecification();
111110

@@ -154,7 +153,7 @@ void testCreateDestination() throws JsonValidationException, ConfigNotFoundExcep
154153
assertEquals(expectedDestinationRead, actualDestinationRead);
155154

156155
verify(validator).ensure(destinationDefinitionSpecificationRead.getConnectionSpecification(), destinationConnection.getConfiguration());
157-
verify(configRepository).writeDestinationConnection(destinationConnection);
156+
verify(configRepository).writeDestinationConnection(destinationConnection, connectorSpecification);
158157
verify(secretsProcessor)
159158
.maskSecrets(destinationConnection.getConfiguration(), destinationDefinitionSpecificationRead.getConnectionSpecification());
160159
}
@@ -181,7 +180,7 @@ void testDeleteDestination() throws JsonValidationException, ConfigNotFoundExcep
181180

182181
destinationHandler.deleteDestination(destinationId);
183182

184-
verify(configRepository).writeDestinationConnection(expectedDestinationConnection);
183+
verify(configRepository).writeDestinationConnection(expectedDestinationConnection, connectorSpecification);
185184
verify(connectionsHandler).listConnectionsForWorkspace(workspaceIdRequestBody);
186185
verify(connectionsHandler).deleteConnection(connectionRead);
187186
}
@@ -225,7 +224,7 @@ void testUpdateDestination() throws JsonValidationException, ConfigNotFoundExcep
225224
assertEquals(expectedDestinationRead, actualDestinationRead);
226225

227226
verify(secretsProcessor).maskSecrets(newConfiguration, destinationDefinitionSpecificationRead.getConnectionSpecification());
228-
verify(configRepository).writeDestinationConnection(expectedDestinationConnection);
227+
verify(configRepository).writeDestinationConnection(expectedDestinationConnection, connectorSpecification);
229228
verify(validator).ensure(destinationDefinitionSpecificationRead.getConnectionSpecification(), newConfiguration);
230229
}
231230

0 commit comments

Comments
 (0)