Skip to content

Commit ab10996

Browse files
authored
If an activity is failing, stuck the workflow and make it queriable (#10121)
After an activity failure, we are blocking the workflow. A new query method is available to query the workflows to get the list of workflow being stuck. Then the activity can be retry with a signal.
1 parent 5054af0 commit ab10996

File tree

6 files changed

+354
-29
lines changed

6 files changed

+354
-29
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java

+32
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.temporal.workflow.SignalMethod;
1010
import io.temporal.workflow.WorkflowInterface;
1111
import io.temporal.workflow.WorkflowMethod;
12+
import java.util.UUID;
1213
import lombok.AllArgsConstructor;
1314
import lombok.Data;
1415
import lombok.NoArgsConstructor;
@@ -55,6 +56,19 @@ public interface ConnectionManagerWorkflow {
5556
@SignalMethod
5657
void resetConnection();
5758

59+
/**
60+
* If an activity fails the workflow will be stuck. This signal activity can be used to retry the
61+
* activity.
62+
*/
63+
@SignalMethod
64+
void retryFailedActivity();
65+
66+
/**
67+
* Use for testing in order to simulate a job failure.
68+
*/
69+
@SignalMethod
70+
void simulateFailure();
71+
5872
/**
5973
* Return the current state of the workflow.
6074
*/
@@ -77,4 +91,22 @@ class JobInformation {
7791
@QueryMethod
7892
JobInformation getJobInformation();
7993

94+
@Data
95+
@NoArgsConstructor
96+
@AllArgsConstructor
97+
class QuarantinedInformation {
98+
99+
private UUID connectionId;
100+
private long jobId;
101+
private int attemptId;
102+
private boolean isQuarantined;
103+
104+
}
105+
106+
/**
107+
* Return if a job is stuck or not with the job information
108+
*/
109+
@QueryMethod
110+
QuarantinedInformation getQuarantinedInformation();
111+
80112
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java

+80-17
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import java.util.Optional;
4646
import java.util.Set;
4747
import java.util.UUID;
48+
import java.util.function.Consumer;
49+
import java.util.function.Function;
4850
import lombok.extern.slf4j.Slf4j;
4951

5052
@Slf4j
@@ -95,19 +97,23 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
9597
final ScheduleRetrieverOutput scheduleRetrieverOutput = configFetchActivity.getTimeToWait(scheduleRetrieverInput);
9698
Workflow.await(scheduleRetrieverOutput.getTimeToWait(),
9799
() -> skipScheduling() || connectionUpdaterInput.isFromFailure());
98-
99100
if (!workflowState.isUpdated() && !workflowState.isDeleted()) {
100-
// Job and attempt creation
101101
maybeJobId = Optional.ofNullable(connectionUpdaterInput.getJobId()).or(() -> {
102-
final JobCreationOutput jobCreationOutput = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput(
103-
connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection()));
102+
final JobCreationOutput jobCreationOutput =
103+
runMandatoryActivityWithOutput(
104+
(input) -> jobCreationAndStatusUpdateActivity.createNewJob(input),
105+
new JobCreationInput(
106+
connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection()));
104107
connectionUpdaterInput.setJobId(jobCreationOutput.getJobId());
105108
return Optional.ofNullable(jobCreationOutput.getJobId());
106109
});
107110

108111
maybeAttemptId = Optional.ofNullable(connectionUpdaterInput.getAttemptId()).or(() -> maybeJobId.map(jobId -> {
109-
final AttemptCreationOutput attemptCreationOutput = jobCreationAndStatusUpdateActivity.createNewAttempt(new AttemptCreationInput(
110-
jobId));
112+
final AttemptCreationOutput attemptCreationOutput =
113+
runMandatoryActivityWithOutput(
114+
(input) -> jobCreationAndStatusUpdateActivity.createNewAttempt(input),
115+
new AttemptCreationInput(
116+
jobId));
111117
connectionUpdaterInput.setAttemptId(attemptCreationOutput.getAttemptId());
112118
return attemptCreationOutput.getAttemptId();
113119
}));
@@ -118,10 +124,14 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
118124
maybeJobId.get(),
119125
workflowState.isResetConnection());
120126

121-
jobCreationAndStatusUpdateActivity.reportJobStart(new ReportJobStartInput(
122-
maybeJobId.get()));
127+
runMandatoryActivity(
128+
(input) -> jobCreationAndStatusUpdateActivity.reportJobStart(input),
129+
new ReportJobStartInput(
130+
maybeJobId.get()));
123131

124-
final SyncOutput syncWorkflowInputs = getSyncInputActivity.getSyncWorkflowInput(getSyncInputActivitySyncInput);
132+
final SyncOutput syncWorkflowInputs = runMandatoryActivityWithOutput(
133+
(input) -> getSyncInputActivity.getSyncWorkflowInput(input),
134+
getSyncInputActivitySyncInput);
125135

126136
workflowState.setRunning(true);
127137

@@ -203,13 +213,14 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
203213
} else if (workflowState.isDeleted()) {
204214
// Stop the runs
205215
final ConnectionDeletionInput connectionDeletionInput = new ConnectionDeletionInput(connectionUpdaterInput.getConnectionId());
206-
connectionDeletionActivity.deleteConnection(connectionDeletionInput);
216+
runMandatoryActivity((input) -> connectionDeletionActivity.deleteConnection(input), connectionDeletionInput);
207217
return;
208218
} else if (workflowState.isCancelled() || workflowState.isCancelledForReset()) {
209-
jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(
210-
maybeJobId.get(),
211-
maybeAttemptId.get(),
212-
FailureHelper.failureSummaryForCancellation(maybeJobId.get(), maybeAttemptId.get(), failures, partialSuccess)));
219+
runMandatoryActivity((input) -> jobCreationAndStatusUpdateActivity.jobCancelled(input),
220+
new JobCancelledInput(
221+
maybeJobId.get(),
222+
maybeAttemptId.get(),
223+
FailureHelper.failureSummaryForCancellation(maybeJobId.get(), maybeAttemptId.get(), failures, partialSuccess)));
213224
resetNewConnectionInput(connectionUpdaterInput);
214225
} else if (workflowState.isFailed()) {
215226
reportFailure(connectionUpdaterInput);
@@ -227,7 +238,8 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
227238
}
228239

229240
private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) {
230-
jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(
241+
workflowState.setSuccess(true);
242+
runMandatoryActivity((input) -> jobCreationAndStatusUpdateActivity.jobSuccess(input), new JobSuccessInput(
231243
maybeJobId.get(),
232244
maybeAttemptId.get(),
233245
standardSyncOutput.orElse(null)));
@@ -236,7 +248,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput)
236248
}
237249

238250
private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) {
239-
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(
251+
runMandatoryActivity((input) -> jobCreationAndStatusUpdateActivity.attemptFailure(input), new AttemptFailureInput(
240252
connectionUpdaterInput.getJobId(),
241253
connectionUpdaterInput.getAttemptId(),
242254
standardSyncOutput.orElse(null),
@@ -250,7 +262,7 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput)
250262
connectionUpdaterInput.setAttemptNumber(attemptNumber + 1);
251263
connectionUpdaterInput.setFromFailure(true);
252264
} else {
253-
jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(
265+
runMandatoryActivity((input) -> jobCreationAndStatusUpdateActivity.jobFailure(input), new JobFailureInput(
254266
connectionUpdaterInput.getJobId(),
255267
"Job failed after too many retries for connection " + connectionId));
256268

@@ -306,6 +318,16 @@ public void resetConnection() {
306318
}
307319
}
308320

321+
@Override
322+
public void retryFailedActivity() {
323+
workflowState.setRetryFailedActivity(true);
324+
}
325+
326+
@Override
327+
public void simulateFailure() {
328+
workflowState.setFailed(true);
329+
}
330+
309331
@Override
310332
public WorkflowState getState() {
311333
return workflowState;
@@ -318,6 +340,15 @@ public JobInformation getJobInformation() {
318340
maybeAttemptId.orElse(NON_RUNNING_ATTEMPT_ID));
319341
}
320342

343+
@Override
344+
public QuarantinedInformation getQuarantinedInformation() {
345+
return new QuarantinedInformation(
346+
connectionId,
347+
maybeJobId.orElse(NON_RUNNING_JOB_ID),
348+
maybeAttemptId.orElse(NON_RUNNING_ATTEMPT_ID),
349+
workflowState.isQuarantined());
350+
}
351+
321352
private Boolean skipScheduling() {
322353
return workflowState.isSkipScheduling() || workflowState.isDeleted() || workflowState.isUpdated() || workflowState.isResetConnection();
323354
}
@@ -335,4 +366,36 @@ private void continueAsNew(final ConnectionUpdaterInput connectionUpdaterInput)
335366
}
336367
}
337368

369+
/**
370+
* This is running a lambda function that takes {@param input} as an input. If the run of the lambda
371+
* is thowing an exception, the workflow will be in a quarantined state and can then be manual
372+
* un-quarantined or a retry of the failed lambda can be trigger through a signal method.
373+
*
374+
* We aimed to use this method for call of the temporal activity.
375+
*/
376+
private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(Function<INPUT, OUTPUT> mapper, INPUT input) {
377+
try {
378+
return mapper.apply(input);
379+
} catch (Exception e) {
380+
log.error("Failed to run an activity for the connection " + connectionId, e);
381+
workflowState.setQuarantined(true);
382+
workflowState.setRetryFailedActivity(false);
383+
Workflow.await(() -> workflowState.isRetryFailedActivity());
384+
log.error("Retrying an activity for the connection " + connectionId, e);
385+
workflowState.setQuarantined(false);
386+
workflowState.setRetryFailedActivity(false);
387+
return runMandatoryActivityWithOutput(mapper, input);
388+
}
389+
}
390+
391+
/**
392+
* Similar to runMandatoryActivityWithOutput but for methods that don't return
393+
*/
394+
private <INPUT> void runMandatoryActivity(Consumer<INPUT> consumer, INPUT input) {
395+
runMandatoryActivityWithOutput((inputInternal) -> {
396+
consumer.accept(inputInternal);
397+
return null;
398+
}, input);
399+
}
400+
338401
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java

+30
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan
3030
private boolean failed = false;
3131
private boolean resetConnection = false;
3232
private boolean continueAsReset = false;
33+
private boolean retryFailedActivity = false;
34+
private boolean quarantined = false;
35+
private boolean success = true;
3336
private boolean cancelledForReset = false;
3437

3538
public void setRunning(final boolean running) {
@@ -96,6 +99,30 @@ public void setContinueAsReset(final boolean continueAsReset) {
9699
this.continueAsReset = continueAsReset;
97100
}
98101

102+
public void setRetryFailedActivity(final boolean retryFailedActivity) {
103+
final ChangedStateEvent event = new ChangedStateEvent(
104+
StateField.RETRY_FAILED_ACTIVITY,
105+
retryFailedActivity);
106+
stateChangedListener.addEvent(id, event);
107+
this.retryFailedActivity = retryFailedActivity;
108+
}
109+
110+
public void setQuarantined(final boolean quarantined) {
111+
final ChangedStateEvent event = new ChangedStateEvent(
112+
StateField.STUCK,
113+
quarantined);
114+
stateChangedListener.addEvent(id, event);
115+
this.quarantined = quarantined;
116+
}
117+
118+
public void setSuccess(final boolean success) {
119+
final ChangedStateEvent event = new ChangedStateEvent(
120+
StateField.SUCCESS,
121+
success);
122+
stateChangedListener.addEvent(id, event);
123+
this.success = success;
124+
}
125+
99126
public void setCancelledForReset(final boolean cancelledForReset) {
100127
final ChangedStateEvent event = new ChangedStateEvent(
101128
StateField.CANCELLED_FOR_RESET,
@@ -113,6 +140,9 @@ public void reset() {
113140
this.setFailed(false);
114141
this.setResetConnection(false);
115142
this.setContinueAsReset(false);
143+
this.setRetryFailedActivity(false);
144+
this.setSuccess(false);
145+
this.setQuarantined(false);
116146
this.setCancelledForReset(false);
117147
}
118148

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/TestStateListener.java

+4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ public class TestStateListener implements WorkflowStateChangedListener {
1414

1515
private static final ConcurrentHashMap<UUID, Queue<ChangedStateEvent>> events = new ConcurrentHashMap<>();
1616

17+
public static void reset() {
18+
events.clear();
19+
}
20+
1721
@Override
1822
public Queue<ChangedStateEvent> events(final UUID testId) {
1923
if (!events.containsKey(testId)) {

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ enum StateField {
3131
FAILED,
3232
RESET,
3333
CONTINUE_AS_RESET,
34-
CANCELLED_FOR_RESET
34+
RETRY_FAILED_ACTIVITY,
35+
STUCK,
36+
SUCCESS,
37+
CANCELLED_FOR_RESET,
3538
}
3639

3740
@Value

0 commit comments

Comments
 (0)