Skip to content

Commit dec1795

Browse files
committed
feat: deterministic workflow Id (#13948)
1 parent 4174406 commit dec1795

File tree

3 files changed

+21
-18
lines changed

3 files changed

+21
-18
lines changed

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ public TemporalResponse<ConnectorJobOutput> submitGetSpec(final UUID jobId,
413413
.withDockerImage(config.getDockerImage())
414414
.withIsCustomConnector(config.getIsCustomConnector());
415415
return execute(jobRunConfig,
416-
() -> getWorkflowStub(SpecWorkflow.class, TemporalJobType.GET_SPEC).run(jobRunConfig, launcherConfig));
416+
() -> getWorkflowStub(SpecWorkflow.class, TemporalJobType.GET_SPEC, jobId).run(jobRunConfig, launcherConfig));
417417

418418
}
419419

@@ -450,7 +450,7 @@ public TemporalResponse<ConnectorJobOutput> submitCheckConnection(final UUID job
450450
.withActorContext(context);
451451

452452
return execute(jobRunConfig,
453-
() -> getWorkflowStubWithTaskQueue(CheckConnectionWorkflow.class, taskQueue).run(jobRunConfig, launcherConfig, input));
453+
() -> getWorkflowStubWithTaskQueue(CheckConnectionWorkflow.class, taskQueue, jobId).run(jobRunConfig, launcherConfig, input));
454454
}
455455

456456
/**
@@ -484,7 +484,7 @@ public TemporalResponse<ConnectorJobOutput> submitDiscoverSchema(final UUID jobI
484484
.withResourceRequirements(config.getResourceRequirements()).withActorContext(context).withManual(true);
485485

486486
return execute(jobRunConfig,
487-
() -> getWorkflowStubWithTaskQueue(DiscoverCatalogWorkflow.class, taskQueue).run(jobRunConfig, launcherConfig, input));
487+
() -> getWorkflowStubWithTaskQueue(DiscoverCatalogWorkflow.class, taskQueue, jobId).run(jobRunConfig, launcherConfig, input));
488488
}
489489

490490
/**
@@ -542,12 +542,12 @@ <T> TemporalResponse<T> execute(final JobRunConfig jobRunConfig, final Supplier<
542542
return new TemporalResponse<>(operationOutput, metadata);
543543
}
544544

545-
private <T> T getWorkflowStub(final Class<T> workflowClass, final TemporalJobType jobType) {
546-
return workflowClientWrapped.newWorkflowStub(workflowClass, TemporalWorkflowUtils.buildWorkflowOptions(jobType));
545+
private <T> T getWorkflowStub(final Class<T> workflowClass, final TemporalJobType jobType, final UUID jobId) {
546+
return workflowClientWrapped.newWorkflowStub(workflowClass, TemporalWorkflowUtils.buildWorkflowOptions(jobType, jobId));
547547
}
548548

549-
private <T> T getWorkflowStubWithTaskQueue(final Class<T> workflowClass, final String taskQueue) {
550-
return workflowClientWrapped.newWorkflowStub(workflowClass, TemporalWorkflowUtils.buildWorkflowOptionsWithTaskQueue(taskQueue));
549+
private <T> T getWorkflowStubWithTaskQueue(final Class<T> workflowClass, final String taskQueue, final UUID jobId) {
550+
return workflowClientWrapped.newWorkflowStub(workflowClass, TemporalWorkflowUtils.buildWorkflowOptionsWithTaskQueue(taskQueue, jobId));
551551
}
552552

553553
/**

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalWorkflowUtils.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ public static WorkflowOptions buildWorkflowOptions(final TemporalJobType jobType
6666
* @param jobType job type
6767
* @return workflow options
6868
*/
69-
public static WorkflowOptions buildWorkflowOptions(final TemporalJobType jobType) {
70-
return buildWorkflowOptionsWithTaskQueue(jobType.name());
69+
public static WorkflowOptions buildWorkflowOptions(final TemporalJobType jobType, final UUID jobd) {
70+
return buildWorkflowOptionsWithTaskQueue(jobType.name(), jobd);
7171
}
7272

7373
/**
@@ -76,13 +76,14 @@ public static WorkflowOptions buildWorkflowOptions(final TemporalJobType jobType
7676
* @param taskQueue task queue
7777
* @return workflow options
7878
*/
79-
public static WorkflowOptions buildWorkflowOptionsWithTaskQueue(final String taskQueue) {
79+
public static WorkflowOptions buildWorkflowOptionsWithTaskQueue(final String taskQueue, final UUID jobID) {
8080
return WorkflowOptions.newBuilder()
8181
.setTaskQueue(taskQueue)
8282
.setWorkflowTaskTimeout(Duration.ofSeconds(27)) // TODO parker - temporarily increasing this to a recognizable number to see if it changes
8383
// error I'm seeing
8484
// todo (cgardens) we do not leverage Temporal retries.
8585
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
86+
.setWorkflowId(jobID.toString())
8687
.build();
8788
}
8889

airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -230,21 +230,22 @@ class TestJobSubmission {
230230
@Test
231231
void testSubmitGetSpec() {
232232
final SpecWorkflow specWorkflow = mock(SpecWorkflow.class);
233-
when(workflowClient.newWorkflowStub(SpecWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.GET_SPEC)))
233+
when(workflowClient.newWorkflowStub(SpecWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.GET_SPEC, JOB_UUID)))
234234
.thenReturn(specWorkflow);
235235
final JobGetSpecConfig getSpecConfig = new JobGetSpecConfig().withDockerImage(IMAGE_NAME1);
236236

237237
temporalClient.submitGetSpec(JOB_UUID, ATTEMPT_ID, WORKSPACE_ID, getSpecConfig);
238238
specWorkflow.run(JOB_RUN_CONFIG, UUID_LAUNCHER_CONFIG);
239-
verify(workflowClient).newWorkflowStub(SpecWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.GET_SPEC));
239+
verify(workflowClient).newWorkflowStub(SpecWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.GET_SPEC, JOB_UUID));
240240
}
241241

242242
@Test
243243
void testSubmitCheckConnection() {
244244
final CheckConnectionWorkflow checkConnectionWorkflow = mock(CheckConnectionWorkflow.class);
245245
when(
246-
workflowClient.newWorkflowStub(CheckConnectionWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CHECK_CONNECTION)))
247-
.thenReturn(checkConnectionWorkflow);
246+
workflowClient.newWorkflowStub(CheckConnectionWorkflow.class,
247+
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CHECK_CONNECTION, JOB_UUID)))
248+
.thenReturn(checkConnectionWorkflow);
248249
final JobCheckConnectionConfig checkConnectionConfig = new JobCheckConnectionConfig()
249250
.withDockerImage(IMAGE_NAME1)
250251
.withConnectionConfiguration(Jsons.emptyObject());
@@ -254,14 +255,15 @@ void testSubmitCheckConnection() {
254255
temporalClient.submitCheckConnection(JOB_UUID, ATTEMPT_ID, WORKSPACE_ID, CHECK_TASK_QUEUE, checkConnectionConfig, new ActorContext());
255256
checkConnectionWorkflow.run(JOB_RUN_CONFIG, UUID_LAUNCHER_CONFIG, input);
256257
verify(workflowClient).newWorkflowStub(CheckConnectionWorkflow.class,
257-
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CHECK_CONNECTION));
258+
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CHECK_CONNECTION, JOB_UUID));
258259
}
259260

260261
@Test
261262
void testSubmitDiscoverSchema() {
262263
final DiscoverCatalogWorkflow discoverCatalogWorkflow = mock(DiscoverCatalogWorkflow.class);
263-
when(workflowClient.newWorkflowStub(DiscoverCatalogWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.DISCOVER_SCHEMA)))
264-
.thenReturn(discoverCatalogWorkflow);
264+
when(workflowClient.newWorkflowStub(DiscoverCatalogWorkflow.class,
265+
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.DISCOVER_SCHEMA, JOB_UUID)))
266+
.thenReturn(discoverCatalogWorkflow);
265267
final JobDiscoverCatalogConfig checkConnectionConfig = new JobDiscoverCatalogConfig()
266268
.withDockerImage(IMAGE_NAME1)
267269
.withConnectionConfiguration(Jsons.emptyObject());
@@ -272,7 +274,7 @@ void testSubmitDiscoverSchema() {
272274
WorkloadPriority.DEFAULT);
273275
discoverCatalogWorkflow.run(JOB_RUN_CONFIG, UUID_LAUNCHER_CONFIG, input);
274276
verify(workflowClient).newWorkflowStub(DiscoverCatalogWorkflow.class,
275-
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.DISCOVER_SCHEMA));
277+
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.DISCOVER_SCHEMA, JOB_UUID));
276278
}
277279

278280
}

0 commit comments

Comments
 (0)