-
Notifications
You must be signed in to change notification settings - Fork 4.5k
If an activity is failing, stuck the workflow and make it queriable #10121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
90d7cfe
9468e4a
d997ce1
2713985
fa3412e
6aefd6e
a572187
6fc92d7
05a4b68
e7b7f87
5ae9a9d
9fe026f
f076615
dc75830
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
import io.temporal.workflow.SignalMethod; | ||
import io.temporal.workflow.WorkflowInterface; | ||
import io.temporal.workflow.WorkflowMethod; | ||
import java.util.UUID; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Data; | ||
import lombok.NoArgsConstructor; | ||
|
@@ -55,6 +56,19 @@ public interface ConnectionManagerWorkflow { | |
@SignalMethod | ||
void resetConnection(); | ||
|
||
/** | ||
* If an activity fails the workflow will be stuck. This signal activity can be used to retry the | ||
* activity. | ||
*/ | ||
@SignalMethod | ||
void retryFailActivity(); | ||
|
||
/** | ||
* Use for testing in order to simulate a job failure. | ||
*/ | ||
@SignalMethod | ||
void simulateFailure(); | ||
|
||
/** | ||
* Return the current state of the workflow. | ||
*/ | ||
|
@@ -77,4 +91,22 @@ class JobInformation { | |
@QueryMethod | ||
JobInformation getJobInformation(); | ||
|
||
@Data | ||
@NoArgsConstructor | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need a no args constructor? |
||
@AllArgsConstructor | ||
class StuckInformation { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we replace the word "stuck" with "quarantined", please? quarantined is a common state name for a workflow that needs manual intervention. within that we can add information on the reason it is stuck. |
||
|
||
private UUID connectionId; | ||
private long jobId; | ||
private int attemptId; | ||
private boolean isStuck; | ||
|
||
} | ||
|
||
/** | ||
* Return if a job is stuck or not with the job information | ||
*/ | ||
@QueryMethod | ||
StuckInformation getStuckInformation(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -45,6 +45,8 @@ | |||||||||||||||||||||||||||||||||||||||||||
import java.util.Optional; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.Set; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.UUID; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.function.Consumer; | ||||||||||||||||||||||||||||||||||||||||||||
import java.util.function.Function; | ||||||||||||||||||||||||||||||||||||||||||||
import lombok.extern.slf4j.Slf4j; | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
@Slf4j | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -95,19 +97,23 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr | |||||||||||||||||||||||||||||||||||||||||||
final ScheduleRetrieverOutput scheduleRetrieverOutput = configFetchActivity.getTimeToWait(scheduleRetrieverInput); | ||||||||||||||||||||||||||||||||||||||||||||
Workflow.await(scheduleRetrieverOutput.getTimeToWait(), | ||||||||||||||||||||||||||||||||||||||||||||
() -> skipScheduling() || connectionUpdaterInput.isFromFailure()); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
if (!workflowState.isUpdated() && !workflowState.isDeleted()) { | ||||||||||||||||||||||||||||||||||||||||||||
// Job and attempt creation | ||||||||||||||||||||||||||||||||||||||||||||
maybeJobId = Optional.ofNullable(connectionUpdaterInput.getJobId()).or(() -> { | ||||||||||||||||||||||||||||||||||||||||||||
final JobCreationOutput jobCreationOutput = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput( | ||||||||||||||||||||||||||||||||||||||||||||
connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection())); | ||||||||||||||||||||||||||||||||||||||||||||
final JobCreationOutput jobCreationOutput = | ||||||||||||||||||||||||||||||||||||||||||||
runActivityWithOutput( | ||||||||||||||||||||||||||||||||||||||||||||
(input) -> jobCreationAndStatusUpdateActivity.createNewJob(input), | ||||||||||||||||||||||||||||||||||||||||||||
new JobCreationInput( | ||||||||||||||||||||||||||||||||||||||||||||
connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection())); | ||||||||||||||||||||||||||||||||||||||||||||
connectionUpdaterInput.setJobId(jobCreationOutput.getJobId()); | ||||||||||||||||||||||||||||||||||||||||||||
return Optional.ofNullable(jobCreationOutput.getJobId()); | ||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
maybeAttemptId = Optional.ofNullable(connectionUpdaterInput.getAttemptId()).or(() -> maybeJobId.map(jobId -> { | ||||||||||||||||||||||||||||||||||||||||||||
final AttemptCreationOutput attemptCreationOutput = jobCreationAndStatusUpdateActivity.createNewAttempt(new AttemptCreationInput( | ||||||||||||||||||||||||||||||||||||||||||||
jobId)); | ||||||||||||||||||||||||||||||||||||||||||||
final AttemptCreationOutput attemptCreationOutput = | ||||||||||||||||||||||||||||||||||||||||||||
runActivityWithOutput( | ||||||||||||||||||||||||||||||||||||||||||||
(input) -> jobCreationAndStatusUpdateActivity.createNewAttempt(input), | ||||||||||||||||||||||||||||||||||||||||||||
new AttemptCreationInput( | ||||||||||||||||||||||||||||||||||||||||||||
jobId)); | ||||||||||||||||||||||||||||||||||||||||||||
connectionUpdaterInput.setAttemptId(attemptCreationOutput.getAttemptId()); | ||||||||||||||||||||||||||||||||||||||||||||
return attemptCreationOutput.getAttemptId(); | ||||||||||||||||||||||||||||||||||||||||||||
})); | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -118,10 +124,14 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr | |||||||||||||||||||||||||||||||||||||||||||
maybeJobId.get(), | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.isResetConnection()); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
jobCreationAndStatusUpdateActivity.reportJobStart(new ReportJobStartInput( | ||||||||||||||||||||||||||||||||||||||||||||
maybeJobId.get())); | ||||||||||||||||||||||||||||||||||||||||||||
runActivity( | ||||||||||||||||||||||||||||||||||||||||||||
(input) -> jobCreationAndStatusUpdateActivity.reportJobStart(input), | ||||||||||||||||||||||||||||||||||||||||||||
new ReportJobStartInput( | ||||||||||||||||||||||||||||||||||||||||||||
maybeJobId.get())); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
final SyncOutput syncWorkflowInputs = getSyncInputActivity.getSyncWorkflowInput(getSyncInputActivitySyncInput); | ||||||||||||||||||||||||||||||||||||||||||||
final SyncOutput syncWorkflowInputs = runActivityWithOutput( | ||||||||||||||||||||||||||||||||||||||||||||
(input) -> getSyncInputActivity.getSyncWorkflowInput(input), | ||||||||||||||||||||||||||||||||||||||||||||
getSyncInputActivitySyncInput); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
workflowState.setRunning(true); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
|
@@ -203,13 +213,14 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr | |||||||||||||||||||||||||||||||||||||||||||
} else if (workflowState.isDeleted()) { | ||||||||||||||||||||||||||||||||||||||||||||
// Stop the runs | ||||||||||||||||||||||||||||||||||||||||||||
final ConnectionDeletionInput connectionDeletionInput = new ConnectionDeletionInput(connectionUpdaterInput.getConnectionId()); | ||||||||||||||||||||||||||||||||||||||||||||
connectionDeletionActivity.deleteConnection(connectionDeletionInput); | ||||||||||||||||||||||||||||||||||||||||||||
runActivity((input) -> connectionDeletionActivity.deleteConnection(input), connectionDeletionInput); | ||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||
} else if (workflowState.isCancelled()) { | ||||||||||||||||||||||||||||||||||||||||||||
jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput( | ||||||||||||||||||||||||||||||||||||||||||||
maybeJobId.get(), | ||||||||||||||||||||||||||||||||||||||||||||
maybeAttemptId.get(), | ||||||||||||||||||||||||||||||||||||||||||||
FailureHelper.failureSummaryForCancellation(maybeJobId.get(), maybeAttemptId.get(), failures, partialSuccess))); | ||||||||||||||||||||||||||||||||||||||||||||
runActivity((input) -> jobCreationAndStatusUpdateActivity.jobCancelled(input), | ||||||||||||||||||||||||||||||||||||||||||||
new JobCancelledInput( | ||||||||||||||||||||||||||||||||||||||||||||
maybeJobId.get(), | ||||||||||||||||||||||||||||||||||||||||||||
maybeAttemptId.get(), | ||||||||||||||||||||||||||||||||||||||||||||
FailureHelper.failureSummaryForCancellation(maybeJobId.get(), maybeAttemptId.get(), failures, partialSuccess))); | ||||||||||||||||||||||||||||||||||||||||||||
resetNewConnectionInput(connectionUpdaterInput); | ||||||||||||||||||||||||||||||||||||||||||||
} else if (workflowState.isFailed()) { | ||||||||||||||||||||||||||||||||||||||||||||
reportFailure(connectionUpdaterInput); | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -227,7 +238,8 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr | |||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) { | ||||||||||||||||||||||||||||||||||||||||||||
jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput( | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.setSuccess(true); | ||||||||||||||||||||||||||||||||||||||||||||
runActivity((input) -> jobCreationAndStatusUpdateActivity.jobSuccess(input), new JobSuccessInput( | ||||||||||||||||||||||||||||||||||||||||||||
maybeJobId.get(), | ||||||||||||||||||||||||||||||||||||||||||||
maybeAttemptId.get(), | ||||||||||||||||||||||||||||||||||||||||||||
standardSyncOutput.orElse(null))); | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -236,7 +248,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) | |||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) { | ||||||||||||||||||||||||||||||||||||||||||||
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput( | ||||||||||||||||||||||||||||||||||||||||||||
runActivity((input) -> jobCreationAndStatusUpdateActivity.attemptFailure(input), new AttemptFailureInput( | ||||||||||||||||||||||||||||||||||||||||||||
connectionUpdaterInput.getJobId(), | ||||||||||||||||||||||||||||||||||||||||||||
connectionUpdaterInput.getAttemptId(), | ||||||||||||||||||||||||||||||||||||||||||||
standardSyncOutput.orElse(null), | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -250,7 +262,7 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) | |||||||||||||||||||||||||||||||||||||||||||
connectionUpdaterInput.setAttemptNumber(attemptNumber + 1); | ||||||||||||||||||||||||||||||||||||||||||||
connectionUpdaterInput.setFromFailure(true); | ||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||
jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput( | ||||||||||||||||||||||||||||||||||||||||||||
runActivity((input) -> jobCreationAndStatusUpdateActivity.jobFailure(input), new JobFailureInput( | ||||||||||||||||||||||||||||||||||||||||||||
connectionUpdaterInput.getJobId(), | ||||||||||||||||||||||||||||||||||||||||||||
"Job failed after too many retries for connection " + connectionId)); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
|
@@ -305,6 +317,16 @@ public void resetConnection() { | |||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||
public void retryFailActivity() { | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.setRetryFailedActivity(true); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||
public void simulateFailure() { | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.setFailed(true); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||
public WorkflowState getState() { | ||||||||||||||||||||||||||||||||||||||||||||
return workflowState; | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -317,6 +339,15 @@ public JobInformation getJobInformation() { | |||||||||||||||||||||||||||||||||||||||||||
maybeAttemptId.orElse(NON_RUNNING_ATTEMPT_ID)); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||
public StuckInformation getStuckInformation() { | ||||||||||||||||||||||||||||||||||||||||||||
return new StuckInformation( | ||||||||||||||||||||||||||||||||||||||||||||
connectionId, | ||||||||||||||||||||||||||||||||||||||||||||
maybeJobId.orElse(NON_RUNNING_JOB_ID), | ||||||||||||||||||||||||||||||||||||||||||||
maybeAttemptId.orElse(NON_RUNNING_ATTEMPT_ID), | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.isStuck()); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
private Boolean skipScheduling() { | ||||||||||||||||||||||||||||||||||||||||||||
return workflowState.isSkipScheduling() || workflowState.isDeleted() || workflowState.isUpdated() || workflowState.isResetConnection(); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -334,4 +365,34 @@ private void continueAsNew(final ConnectionUpdaterInput connectionUpdaterInput) | |||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
private <INPUT, OUTPUT> OUTPUT runActivityWithOutput(Function<INPUT, OUTPUT> mapper, INPUT input) { | ||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is only for a subset of activities. should the method name say short activities or mandatory activities or something? also can we add a javadoc comment to this method explaining why it is necessary, please? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||
return mapper.apply(input); | ||||||||||||||||||||||||||||||||||||||||||||
} catch (Exception e) { | ||||||||||||||||||||||||||||||||||||||||||||
log.error("Failed to run an activity for the connection " + connectionId, e); | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.setStuck(true); | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.setRetryFailedActivity(false); | ||||||||||||||||||||||||||||||||||||||||||||
Workflow.await(() -> workflowState.isRetryFailedActivity()); | ||||||||||||||||||||||||||||||||||||||||||||
log.error("Retrying an activity for the connection " + connectionId, e); | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.setStuck(false); | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.setRetryFailedActivity(false); | ||||||||||||||||||||||||||||||||||||||||||||
return runActivityWithOutput(mapper, input); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
private <INPUT> void runActivity(Consumer<INPUT> consumer, INPUT input) { | ||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||
consumer.accept(input); | ||||||||||||||||||||||||||||||||||||||||||||
} catch (Exception e) { | ||||||||||||||||||||||||||||||||||||||||||||
log.error("Failed to run an activity for the connection " + connectionId, e); | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.setStuck(true); | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.setRetryFailedActivity(false); | ||||||||||||||||||||||||||||||||||||||||||||
Workflow.await(() -> workflowState.isRetryFailedActivity()); | ||||||||||||||||||||||||||||||||||||||||||||
log.error("Retrying an activity for the connection " + connectionId, e); | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.setStuck(false); | ||||||||||||||||||||||||||||||||||||||||||||
workflowState.setRetryFailedActivity(false); | ||||||||||||||||||||||||||||||||||||||||||||
runActivity(consumer, input); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we dry this up. the duplicate code here is going to be easy to have go out of sync.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||||||||||||||||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retryFailedActivity
?