Skip to content

Commit a8a13bb

Browse files
authored
Cancel when running a delete (#13092)
* Cancel when running a delete * Add log
1 parent ae2187c commit a8a13bb

File tree

2 files changed

+69
-2
lines changed

2 files changed

+69
-2
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
122122
}
123123

124124
if (workflowState.isDeleted()) {
125+
if (workflowState.isRunning()) {
126+
log.info("Cancelling the current running job because a connection deletion was requested");
127+
reportCancelled(false);
128+
}
125129
log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow.");
126130
deleteConnectionBeforeTerminatingTheWorkflow();
127131
return;
@@ -700,6 +704,12 @@ private void deleteConnectionBeforeTerminatingTheWorkflow() {
700704
* Set a job as cancel and continue to the next job if and continue as a reset if needed
701705
*/
702706
private void reportCancelledAndContinueWith(final boolean isReset, final ConnectionUpdaterInput connectionUpdaterInput) {
707+
reportCancelled(isReset);
708+
resetNewConnectionInput(connectionUpdaterInput);
709+
prepareForNextRunAndContinueAsNew(connectionUpdaterInput);
710+
}
711+
712+
private void reportCancelled(final boolean isReset) {
703713
workflowState.setContinueAsReset(isReset);
704714
final Long jobId = workflowInternalState.getJobId();
705715
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
@@ -721,8 +731,6 @@ private void reportCancelledAndContinueWith(final boolean isReset, final Connect
721731
attemptNumber,
722732
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
723733
}
724-
resetNewConnectionInput(connectionUpdaterInput);
725-
prepareForNextRunAndContinueAsNew(connectionUpdaterInput);
726734
}
727735

728736
}

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,65 @@ public void cancelRunning() throws InterruptedException {
554554
.jobCancelledWithAttemptNumber(Mockito.argThat(new HasCancellationFailure(JOB_ID, ATTEMPT_ID)));
555555
}
556556

557+
@RepeatedTest(10)
558+
@Timeout(value = 10,
559+
unit = TimeUnit.SECONDS)
560+
@DisplayName("Test that cancelling a running workflow cancels the sync")
561+
public void deleteRunning() throws InterruptedException {
562+
563+
final UUID testId = UUID.randomUUID();
564+
final TestStateListener testStateListener = new TestStateListener();
565+
final WorkflowState workflowState = new WorkflowState(testId, testStateListener);
566+
567+
final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder()
568+
.connectionId(UUID.randomUUID())
569+
.jobId(JOB_ID)
570+
.attemptId(ATTEMPT_ID)
571+
.fromFailure(false)
572+
.attemptNumber(1)
573+
.workflowState(workflowState)
574+
.resetConnection(false)
575+
.fromJobResetFailure(false)
576+
.build();
577+
578+
startWorkflowAndWaitUntilReady(workflow, input);
579+
580+
// wait for workflow to initialize
581+
testEnv.sleep(Duration.ofMinutes(1));
582+
583+
workflow.submitManualSync();
584+
585+
// wait for the manual sync to start working
586+
testEnv.sleep(Duration.ofMinutes(1));
587+
588+
workflow.deleteConnection();
589+
590+
// TODO
591+
// For some reason this transiently fails if it is below the runtime.
592+
// However, this should be reported almost immediately. I think this is a bug.
593+
testEnv.sleep(Duration.ofMinutes(SleepingSyncWorkflow.RUN_TIME.toMinutes() + 1));
594+
595+
final Queue<ChangedStateEvent> eventQueue = testStateListener.events(testId);
596+
final List<ChangedStateEvent> events = new ArrayList<>(eventQueue);
597+
598+
for (final ChangedStateEvent event : events) {
599+
if (event.isValue()) {
600+
log.info("event = " + event);
601+
}
602+
}
603+
604+
Assertions.assertThat(events)
605+
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.CANCELLED && changedStateEvent.isValue())
606+
.hasSizeGreaterThanOrEqualTo(1);
607+
608+
Assertions.assertThat(events)
609+
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.DELETED && changedStateEvent.isValue())
610+
.hasSizeGreaterThanOrEqualTo(1);
611+
612+
Mockito.verify(mJobCreationAndStatusUpdateActivity)
613+
.jobCancelledWithAttemptNumber(Mockito.argThat(new HasCancellationFailure(JOB_ID, ATTEMPT_ID)));
614+
}
615+
557616
@RepeatedTest(10)
558617
@Timeout(value = 2,
559618
unit = TimeUnit.SECONDS)

0 commit comments

Comments
 (0)