Skip to content

Commit c61b8c2

Browse files
authored
Do not set cancel as true in a reset (#10228)
1 parent dedd1ea commit c61b8c2

File tree

4 files changed

+20
-4
lines changed

4 files changed

+20
-4
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
187187
// When cancelling a reset, we endure that the next workflow won't be a reset.
188188
// We are using a specific workflow state for that, this makes the set of the fact that we are going
189189
// to continue as a reset testable.
190-
if (workflowState.isResetConnection() && !workflowState.isCancelled()) {
190+
if (workflowState.isCancelledForReset()) {
191191
workflowState.setContinueAsReset(true);
192192
connectionUpdaterInput.setJobId(null);
193193
connectionUpdaterInput.setAttemptNumber(1);
@@ -205,7 +205,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
205205
final ConnectionDeletionInput connectionDeletionInput = new ConnectionDeletionInput(connectionUpdaterInput.getConnectionId());
206206
connectionDeletionActivity.deleteConnection(connectionDeletionInput);
207207
return;
208-
} else if (workflowState.isCancelled()) {
208+
} else if (workflowState.isCancelled() || workflowState.isCancelledForReset()) {
209209
jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(
210210
maybeJobId.get(),
211211
maybeAttemptId.get(),
@@ -301,7 +301,8 @@ public void connectionUpdated() {
301301
public void resetConnection() {
302302
workflowState.setResetConnection(true);
303303
if (workflowState.isRunning()) {
304-
cancelJob();
304+
workflowState.setCancelledForReset(true);
305+
syncWorkflowCancellationScope.cancel();
305306
}
306307
}
307308

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan
3030
private boolean failed = false;
3131
private boolean resetConnection = false;
3232
private boolean continueAsReset = false;
33+
private boolean cancelledForReset = false;
3334

3435
public void setRunning(final boolean running) {
3536
final ChangedStateEvent event = new ChangedStateEvent(
@@ -95,6 +96,14 @@ public void setContinueAsReset(final boolean continueAsReset) {
9596
this.continueAsReset = continueAsReset;
9697
}
9798

99+
public void setCancelledForReset(final boolean cancelledForReset) {
100+
final ChangedStateEvent event = new ChangedStateEvent(
101+
StateField.CANCELLED_FOR_RESET,
102+
cancelledForReset);
103+
stateChangedListener.addEvent(id, event);
104+
this.cancelledForReset = cancelledForReset;
105+
}
106+
98107
public void reset() {
99108
this.setRunning(false);
100109
this.setDeleted(false);
@@ -104,6 +113,7 @@ public void reset() {
104113
this.setFailed(false);
105114
this.setResetConnection(false);
106115
this.setContinueAsReset(false);
116+
this.setCancelledForReset(false);
107117
}
108118

109119
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ enum StateField {
3030
UPDATED,
3131
FAILED,
3232
RESET,
33-
CONTINUE_AS_RESET
33+
CONTINUE_AS_RESET,
34+
CANCELLED_FOR_RESET
3435
}
3536

3637
@Value

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,10 @@ public void resetCancelRunningWorkflow() {
508508

509509
final Queue<ChangedStateEvent> events = testStateListener.events(testId);
510510

511+
Assertions.assertThat(events)
512+
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.CANCELLED_FOR_RESET && changedStateEvent.isValue())
513+
.hasSizeGreaterThanOrEqualTo(1);
514+
511515
Assertions.assertThat(events)
512516
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RESET && changedStateEvent.isValue())
513517
.hasSizeGreaterThanOrEqualTo(1);

0 commit comments

Comments
 (0)