Skip to content

Commit 4262d26

Browse files
authored
Fix reset (#9801)
Fix the reset functionality. The current implementation of the reset functionalities had some bugs in it and wasn't working. This is fixing the reset and adding a test to it.
1 parent ccfe63a commit 4262d26

File tree

2 files changed

+87
-10
lines changed

2 files changed

+87
-10
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,18 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
8484
// Scheduling
8585
final ScheduleRetrieverInput scheduleRetrieverInput = new ScheduleRetrieverInput(
8686
connectionUpdaterInput.getConnectionId());
87+
88+
workflowState.setResetConnection(connectionUpdaterInput.isResetConnection());
89+
8790
final ScheduleRetrieverOutput scheduleRetrieverOutput = configFetchActivity.getTimeToWait(scheduleRetrieverInput);
88-
Workflow.await(scheduleRetrieverOutput.getTimeToWait(), () -> skipScheduling() || connectionUpdaterInput.isFromFailure());
91+
Workflow.await(scheduleRetrieverOutput.getTimeToWait(),
92+
() -> skipScheduling() || connectionUpdaterInput.isFromFailure());
8993

9094
if (!workflowState.isUpdated() && !workflowState.isDeleted()) {
9195
// Job and attempt creation
9296
maybeJobId = Optional.ofNullable(connectionUpdaterInput.getJobId()).or(() -> {
9397
final JobCreationOutput jobCreationOutput = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput(
94-
connectionUpdaterInput.getConnectionId(), connectionUpdaterInput.isResetConnection()));
98+
connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection()));
9599
connectionUpdaterInput.setJobId(jobCreationOutput.getJobId());
96100
return Optional.ofNullable(jobCreationOutput.getJobId());
97101
});
@@ -107,7 +111,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
107111
final SyncInput getSyncInputActivitySyncInput = new SyncInput(
108112
maybeAttemptId.get(),
109113
maybeJobId.get(),
110-
connectionUpdaterInput.isResetConnection());
114+
workflowState.isResetConnection());
111115

112116
jobCreationAndStatusUpdateActivity.reportJobStart(new ReportJobStartInput(
113117
maybeJobId.get()));
@@ -136,6 +140,10 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
136140

137141
final StandardSyncSummary standardSyncSummary = standardSyncOutput.get().getStandardSyncSummary();
138142

143+
if (workflowState.isResetConnection()) {
144+
workflowState.setResetConnection(false);
145+
}
146+
139147
if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) {
140148
failures.addAll(standardSyncOutput.get().getFailures());
141149
partialSuccess = standardSyncSummary.getTotalStats().getRecordsCommitted() > 0;
@@ -169,7 +177,9 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
169177
// The naming is very misleading, it is not a failure but the expected behavior...
170178
}
171179

172-
if (connectionUpdaterInput.isResetConnection()) {
180+
// The workflow state will be updated to true if a reset happened while a job was running.
181+
// We need to propagate that to the new run that will be continued as new.
182+
if (workflowState.isResetConnection()) {
173183
connectionUpdaterInput.setResetConnection(true);
174184
connectionUpdaterInput.setJobId(null);
175185
connectionUpdaterInput.setAttemptNumber(1);
@@ -281,10 +291,10 @@ public void connectionUpdated() {
281291

282292
@Override
283293
public void resetConnection() {
284-
if (!workflowState.isRunning()) {
294+
workflowState.setResetConnection(true);
295+
if (workflowState.isRunning()) {
285296
cancelJob();
286297
}
287-
workflowState.setResetConnection(true);
288298
}
289299

290300
@Override

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@
4848

4949
public class ConnectionManagerWorkflowTest {
5050

51-
private static final ConfigFetchActivity mConfigFetchActivity =
51+
private final ConfigFetchActivity mConfigFetchActivity =
5252
Mockito.mock(ConfigFetchActivity.class, Mockito.withSettings().withoutAnnotations());
53-
private static final ConnectionDeletionActivity mConnectionDeletionActivity =
53+
private final ConnectionDeletionActivity mConnectionDeletionActivity =
5454
Mockito.mock(ConnectionDeletionActivity.class, Mockito.withSettings().withoutAnnotations());
55-
private static final GenerateInputActivityImpl mGenerateInputActivityImpl =
55+
private final GenerateInputActivityImpl mGenerateInputActivityImpl =
5656
Mockito.mock(GenerateInputActivityImpl.class, Mockito.withSettings().withoutAnnotations());
57-
private static final JobCreationAndStatusUpdateActivity mJobCreationAndStatusUpdateActivity =
57+
private final JobCreationAndStatusUpdateActivity mJobCreationAndStatusUpdateActivity =
5858
Mockito.mock(JobCreationAndStatusUpdateActivity.class, Mockito.withSettings().withoutAnnotations());
5959

6060
private TestWorkflowEnvironment testEnv;
@@ -443,6 +443,73 @@ public void cancelRunning() {
443443
testEnv.shutdown();
444444
}
445445

446+
@Test
447+
@DisplayName("Test that resetting a-non running workflow starts a reset")
448+
public void resetStart() {
449+
450+
final UUID testId = UUID.randomUUID();
451+
final TestStateListener testStateListener = new TestStateListener();
452+
final WorkflowState workflowState = new WorkflowState(testId, testStateListener);
453+
454+
final ConnectionUpdaterInput input = new ConnectionUpdaterInput(
455+
UUID.randomUUID(),
456+
1L,
457+
1,
458+
false,
459+
1,
460+
workflowState,
461+
false);
462+
463+
WorkflowClient.start(workflow::run, input);
464+
testEnv.sleep(Duration.ofSeconds(30L));
465+
workflow.resetConnection();
466+
testEnv.sleep(Duration.ofSeconds(90L));
467+
468+
final Queue<ChangedStateEvent> events = testStateListener.events(testId);
469+
470+
Assertions.assertThat(events)
471+
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RESET && changedStateEvent.isValue())
472+
.hasSizeGreaterThanOrEqualTo(1);
473+
474+
Mockito.verify(mJobCreationAndStatusUpdateActivity).jobSuccess(Mockito.any());
475+
476+
testEnv.shutdown();
477+
}
478+
479+
@Test
480+
@DisplayName("Test that resetting a running workflow starts cancel the running workflow")
481+
public void resetCancelRunningWorkflow() {
482+
483+
final UUID testId = UUID.randomUUID();
484+
final TestStateListener testStateListener = new TestStateListener();
485+
final WorkflowState workflowState = new WorkflowState(testId, testStateListener);
486+
487+
final ConnectionUpdaterInput input = new ConnectionUpdaterInput(
488+
UUID.randomUUID(),
489+
1L,
490+
1,
491+
false,
492+
1,
493+
workflowState,
494+
false);
495+
496+
WorkflowClient.start(workflow::run, input);
497+
workflow.submitManualSync();
498+
testEnv.sleep(Duration.ofSeconds(30L));
499+
workflow.resetConnection();
500+
testEnv.sleep(Duration.ofSeconds(30L));
501+
502+
final Queue<ChangedStateEvent> events = testStateListener.events(testId);
503+
504+
Assertions.assertThat(events)
505+
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RESET && changedStateEvent.isValue())
506+
.hasSizeGreaterThanOrEqualTo(1);
507+
508+
Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelled(Mockito.any());
509+
510+
testEnv.shutdown();
511+
}
512+
446513
}
447514

448515
@Nested

0 commit comments

Comments
 (0)