Skip to content

Commit 42d6866

Browse files
committed
feat: support secret refs for jobs (#15714)
1 parent 07b1096 commit 42d6866

File tree

10 files changed

+156
-59
lines changed

10 files changed

+156
-59
lines changed

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/JobInputHandler.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,15 @@
5252
import io.airbyte.config.helpers.StateMessageHelper;
5353
import io.airbyte.config.persistence.ActorDefinitionVersionHelper;
5454
import io.airbyte.config.persistence.ConfigInjector;
55+
import io.airbyte.config.secrets.ConfigWithSecretReferences;
56+
import io.airbyte.config.secrets.InlinedConfigWithSecretRefsKt;
5557
import io.airbyte.data.services.ConnectionService;
5658
import io.airbyte.data.services.DestinationService;
5759
import io.airbyte.data.services.ScopedConfigurationService;
5860
import io.airbyte.data.services.SourceService;
5961
import io.airbyte.data.services.shared.NetworkSecurityTokenKey;
62+
import io.airbyte.domain.models.SecretReferenceScopeType;
63+
import io.airbyte.domain.services.secrets.SecretReferenceService;
6064
import io.airbyte.featureflag.Connection;
6165
import io.airbyte.featureflag.Context;
6266
import io.airbyte.featureflag.FeatureFlagClient;
@@ -100,6 +104,7 @@ public class JobInputHandler {
100104
private final DestinationService destinationService;
101105
private final ApiPojoConverters apiPojoConverters;
102106
private final ScopedConfigurationService scopedConfigurationService;
107+
private final SecretReferenceService secretReferenceService;
103108

104109
private static final Logger LOGGER = LoggerFactory.getLogger(JobInputHandler.class);
105110

@@ -116,7 +121,8 @@ public JobInputHandler(final JobPersistence jobPersistence,
116121
final SourceService sourceService,
117122
final DestinationService destinationService,
118123
final ApiPojoConverters apiPojoConverters,
119-
final ScopedConfigurationService scopedConfigurationService) {
124+
final ScopedConfigurationService scopedConfigurationService,
125+
final SecretReferenceService secretReferenceService) {
120126
this.jobPersistence = jobPersistence;
121127
this.featureFlagClient = featureFlagClient;
122128
this.oAuthConfigSupplier = oAuthConfigSupplier;
@@ -130,6 +136,7 @@ public JobInputHandler(final JobPersistence jobPersistence,
130136
this.destinationService = destinationService;
131137
this.apiPojoConverters = apiPojoConverters;
132138
this.scopedConfigurationService = scopedConfigurationService;
139+
this.secretReferenceService = secretReferenceService;
133140
}
134141

135142
/**
@@ -160,12 +167,9 @@ public Object getJobInput(final SyncInput input) {
160167
sourceService.getStandardSourceDefinition(source.getSourceDefinitionId()),
161168
source.getWorkspaceId(),
162169
source.getSourceId());
163-
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
164-
source.getSourceDefinitionId(),
165-
source.getSourceId(),
166-
source.getWorkspaceId(),
167-
source.getConfiguration());
168-
attemptSyncConfig.setSourceConfiguration(configInjector.injectConfig(sourceConfiguration, source.getSourceDefinitionId()));
170+
final ConfigWithSecretReferences sourceConfiguration = getSourceConfiguration(source);
171+
final JsonNode sourceConfigWithInlinedRefs = InlinedConfigWithSecretRefsKt.toInlined(sourceConfiguration);
172+
attemptSyncConfig.setSourceConfiguration(sourceConfigWithInlinedRefs);
169173
} else if (JobConfig.ConfigType.RESET_CONNECTION.equals(jobConfigType)) {
170174
final JobResetConnectionConfig resetConnection = job.getConfig().getResetConnection();
171175
final ResetSourceConfiguration resetSourceConfiguration = resetConnection.getResetSourceConfiguration();
@@ -181,12 +185,9 @@ public Object getJobInput(final SyncInput input) {
181185
destinationService.getStandardDestinationDefinition(destination.getDestinationDefinitionId()),
182186
destination.getWorkspaceId(),
183187
destination.getDestinationId());
184-
final JsonNode destinationConfiguration = oAuthConfigSupplier.injectDestinationOAuthParameters(
185-
destination.getDestinationDefinitionId(),
186-
destination.getDestinationId(),
187-
destination.getWorkspaceId(),
188-
destination.getConfiguration());
189-
attemptSyncConfig.setDestinationConfiguration(configInjector.injectConfig(destinationConfiguration, destination.getDestinationDefinitionId()));
188+
final ConfigWithSecretReferences destinationConfiguration = getDestinationConfiguration(destination);
189+
final JsonNode destinationConfigWithInlinedRefs = InlinedConfigWithSecretRefsKt.toInlined(destinationConfiguration);
190+
attemptSyncConfig.setDestinationConfiguration(destinationConfigWithInlinedRefs);
190191

191192
final IntegrationLauncherConfig sourceLauncherConfig = getSourceIntegrationLauncherConfig(
192193
jobId,
@@ -266,16 +267,19 @@ public Object getCheckJobInput(final CheckInput input) {
266267
final ActorDefinitionVersion sourceVersion =
267268
actorDefinitionVersionHelper.getSourceVersion(sourceDefinition, source.getWorkspaceId(), source.getSourceId());
268269

269-
final JsonNode sourceConfiguration = getSourceConfiguration(source);
270-
final JsonNode destinationConfiguration = getDestinationConfiguration(destination);
270+
final ConfigWithSecretReferences sourceConfiguration = getSourceConfiguration(source);
271+
final JsonNode sourceConfigWithInlinedRefs = InlinedConfigWithSecretRefsKt.toInlined(sourceConfiguration);
272+
273+
final ConfigWithSecretReferences destinationConfiguration = getDestinationConfiguration(destination);
274+
final JsonNode destinationConfigWithInlinedRefs = InlinedConfigWithSecretRefsKt.toInlined(destinationConfiguration);
271275

272276
final IntegrationLauncherConfig sourceLauncherConfig = getSourceIntegrationLauncherConfig(
273277
jobId,
274278
attemptNumber,
275279
connectionId,
276280
jobSyncConfig,
277281
sourceVersion,
278-
sourceConfiguration);
282+
sourceConfigWithInlinedRefs);
279283

280284
final IntegrationLauncherConfig destinationLauncherConfig =
281285
getDestinationIntegrationLauncherConfig(
@@ -284,7 +288,7 @@ public Object getCheckJobInput(final CheckInput input) {
284288
connectionId,
285289
jobSyncConfig,
286290
destinationVersion,
287-
destinationConfiguration,
291+
destinationConfigWithInlinedRefs,
288292
Collections.emptyMap());
289293

290294
final ResourceRequirements sourceCheckResourceRequirements =
@@ -295,7 +299,7 @@ public Object getCheckJobInput(final CheckInput input) {
295299
final StandardCheckConnectionInput sourceCheckConnectionInput = new StandardCheckConnectionInput()
296300
.withActorType(ActorType.SOURCE)
297301
.withActorId(source.getSourceId())
298-
.withConnectionConfiguration(sourceConfiguration)
302+
.withConnectionConfiguration(sourceConfigWithInlinedRefs)
299303
.withResourceRequirements(sourceCheckResourceRequirements)
300304
.withActorContext(sourceContext)
301305
.withNetworkSecurityTokens(getNetworkSecurityTokens(jobSyncConfig.getWorkspaceId()));
@@ -308,7 +312,7 @@ public Object getCheckJobInput(final CheckInput input) {
308312
final StandardCheckConnectionInput destinationCheckConnectionInput = new StandardCheckConnectionInput()
309313
.withActorType(ActorType.DESTINATION)
310314
.withActorId(destination.getDestinationId())
311-
.withConnectionConfiguration(destinationConfiguration)
315+
.withConnectionConfiguration(destinationConfigWithInlinedRefs)
312316
.withResourceRequirements(destinationCheckResourceRequirements)
313317
.withActorContext(destinationContext)
314318
.withNetworkSecurityTokens(getNetworkSecurityTokens(jobSyncConfig.getWorkspaceId()));
@@ -441,20 +445,22 @@ private IntegrationLauncherConfig getDestinationIntegrationLauncherConfig(final
441445
.withAdditionalEnvironmentVariables(additionalEnviornmentVariables);
442446
}
443447

444-
private JsonNode getSourceConfiguration(final SourceConnection source) throws IOException {
445-
return configInjector.injectConfig(oAuthConfigSupplier.injectSourceOAuthParameters(
448+
private ConfigWithSecretReferences getSourceConfiguration(final SourceConnection source) throws IOException {
449+
final JsonNode injectedConfig = configInjector.injectConfig(oAuthConfigSupplier.injectSourceOAuthParameters(
446450
source.getSourceDefinitionId(),
447451
source.getSourceId(),
448452
source.getWorkspaceId(),
449453
source.getConfiguration()), source.getSourceDefinitionId());
454+
return secretReferenceService.getConfigWithSecretReferences(SecretReferenceScopeType.ACTOR, source.getSourceId(), injectedConfig);
450455
}
451456

452-
private JsonNode getDestinationConfiguration(final DestinationConnection destination) throws IOException {
453-
return configInjector.injectConfig(oAuthConfigSupplier.injectDestinationOAuthParameters(
457+
private ConfigWithSecretReferences getDestinationConfiguration(final DestinationConnection destination) throws IOException {
458+
final JsonNode injectedConfig = configInjector.injectConfig(oAuthConfigSupplier.injectDestinationOAuthParameters(
454459
destination.getDestinationDefinitionId(),
455460
destination.getDestinationId(),
456461
destination.getWorkspaceId(),
457462
destination.getConfiguration()), destination.getDestinationDefinitionId());
463+
return secretReferenceService.getConfigWithSecretReferences(SecretReferenceScopeType.ACTOR, destination.getDestinationId(), injectedConfig);
458464
}
459465

460466
private @NotNull List<String> getNetworkSecurityTokens(final UUID workspaceId) {

airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/DefaultSynchronousSchedulerClient.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
import io.airbyte.config.WorkloadPriority;
3333
import io.airbyte.config.persistence.ActorDefinitionVersionHelper;
3434
import io.airbyte.config.persistence.ConfigInjector;
35+
import io.airbyte.config.secrets.ConfigWithSecretReferences;
36+
import io.airbyte.config.secrets.InlinedConfigWithSecretRefsKt;
37+
import io.airbyte.domain.models.SecretReferenceScopeType;
38+
import io.airbyte.domain.services.secrets.SecretReferenceService;
3539
import io.airbyte.persistence.job.errorreporter.ConnectorJobReportingContext;
3640
import io.airbyte.persistence.job.errorreporter.JobErrorReporter;
3741
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
@@ -61,6 +65,7 @@ public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerCl
6165
private final JobTracker jobTracker;
6266
private final JobErrorReporter jobErrorReporter;
6367
private final OAuthConfigSupplier oAuthConfigSupplier;
68+
private final SecretReferenceService secretReferenceService;
6469

6570
private final ConfigInjector configInjector;
6671
private final ContextBuilder contextBuilder;
@@ -71,13 +76,15 @@ public DefaultSynchronousSchedulerClient(final TemporalClient temporalClient,
7176
final JobErrorReporter jobErrorReporter,
7277
final OAuthConfigSupplier oAuthConfigSupplier,
7378
final ConfigInjector configInjector,
74-
final ContextBuilder contextBuilder) {
79+
final ContextBuilder contextBuilder,
80+
final SecretReferenceService secretReferenceService) {
7581
this.temporalClient = temporalClient;
7682
this.jobTracker = jobTracker;
7783
this.jobErrorReporter = jobErrorReporter;
7884
this.oAuthConfigSupplier = oAuthConfigSupplier;
7985
this.configInjector = configInjector;
8086
this.contextBuilder = contextBuilder;
87+
this.secretReferenceService = secretReferenceService;
8188
}
8289

8390
@Override
@@ -88,22 +95,29 @@ public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConne
8895
final ResourceRequirements resourceRequirements)
8996
throws IOException {
9097
final String dockerImage = ActorDefinitionVersionHelper.getDockerImageName(sourceVersion);
91-
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
98+
final JsonNode configWithOauthParams = oAuthConfigSupplier.injectSourceOAuthParameters(
9299
source.getSourceDefinitionId(),
93100
source.getSourceId(),
94101
source.getWorkspaceId(),
95102
source.getConfiguration());
103+
final JsonNode injectedConfig = configInjector.injectConfig(configWithOauthParams, source.getSourceDefinitionId());
104+
105+
final ConfigWithSecretReferences sourceConfig =
106+
source.getSourceId() == null ? InlinedConfigWithSecretRefsKt.buildConfigWithSecretRefsJava(injectedConfig)
107+
: secretReferenceService.getConfigWithSecretReferences(
108+
SecretReferenceScopeType.ACTOR,
109+
source.getSourceId(),
110+
injectedConfig);
111+
96112
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
97113
.withActorType(ActorType.SOURCE)
98114
.withActorId(source.getSourceId())
99-
.withConnectionConfiguration(configInjector.injectConfig(sourceConfiguration, source.getSourceDefinitionId()))
115+
.withConnectionConfiguration(sourceConfig)
100116
.withDockerImage(dockerImage)
101117
.withProtocolVersion(new Version(sourceVersion.getProtocolVersion()))
102118
.withIsCustomConnector(isCustomConnector)
103119
.withResourceRequirements(resourceRequirements);
104120

105-
// TODO(pedro): Make sure we pass the (inlined) ConfigWithSecretRefs to the job
106-
107121
final UUID jobId = UUID.randomUUID();
108122
final ConnectorJobReportingContext jobReportingContext =
109123
new ConnectorJobReportingContext(jobId, dockerImage, sourceVersion.getReleaseStage(), sourceVersion.getInternalSupportLevel());
@@ -129,15 +143,24 @@ public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheck
129143
final ResourceRequirements resourceRequirements)
130144
throws IOException {
131145
final String dockerImage = ActorDefinitionVersionHelper.getDockerImageName(destinationVersion);
132-
final JsonNode destinationConfiguration = oAuthConfigSupplier.injectDestinationOAuthParameters(
146+
final JsonNode configWithOauthParams = oAuthConfigSupplier.injectDestinationOAuthParameters(
133147
destination.getDestinationDefinitionId(),
134148
destination.getDestinationId(),
135149
destination.getWorkspaceId(),
136150
destination.getConfiguration());
151+
final JsonNode injectedConfig = configInjector.injectConfig(configWithOauthParams, destination.getDestinationDefinitionId());
152+
153+
final ConfigWithSecretReferences destinationConfig =
154+
destination.getDestinationId() == null ? InlinedConfigWithSecretRefsKt.buildConfigWithSecretRefsJava(injectedConfig)
155+
: secretReferenceService.getConfigWithSecretReferences(
156+
SecretReferenceScopeType.ACTOR,
157+
destination.getDestinationId(),
158+
injectedConfig);
159+
137160
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
138161
.withActorType(ActorType.DESTINATION)
139162
.withActorId(destination.getDestinationId())
140-
.withConnectionConfiguration(configInjector.injectConfig(destinationConfiguration, destination.getDestinationDefinitionId()))
163+
.withConnectionConfiguration(destinationConfig)
141164
.withDockerImage(dockerImage)
142165
.withProtocolVersion(new Version(destinationVersion.getProtocolVersion()))
143166
.withIsCustomConnector(isCustomConnector)
@@ -168,13 +191,20 @@ public SynchronousResponse<UUID> createDiscoverSchemaJob(final SourceConnection
168191
final WorkloadPriority priority)
169192
throws IOException {
170193
final String dockerImage = ActorDefinitionVersionHelper.getDockerImageName(sourceVersion);
171-
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
194+
final JsonNode configWithOauthParams = oAuthConfigSupplier.injectSourceOAuthParameters(
172195
source.getSourceDefinitionId(),
173196
source.getSourceId(),
174197
source.getWorkspaceId(),
175198
source.getConfiguration());
199+
final JsonNode injectedConfig = configInjector.injectConfig(configWithOauthParams, source.getSourceDefinitionId());
200+
201+
final ConfigWithSecretReferences sourceConfig = secretReferenceService.getConfigWithSecretReferences(
202+
SecretReferenceScopeType.ACTOR,
203+
source.getSourceId(),
204+
injectedConfig);
205+
176206
final JobDiscoverCatalogConfig jobDiscoverCatalogConfig = new JobDiscoverCatalogConfig()
177-
.withConnectionConfiguration(configInjector.injectConfig(sourceConfiguration, source.getSourceDefinitionId()))
207+
.withConnectionConfiguration(sourceConfig)
178208
.withDockerImage(dockerImage)
179209
.withProtocolVersion(new Version(sourceVersion.getProtocolVersion()))
180210
.withSourceId(source.getSourceId().toString())

0 commit comments

Comments
 (0)