-
Notifications
You must be signed in to change notification settings - Fork 4.5k
If an activity is failing, stuck the workflow and make it queriable #10121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a way to reset the job to not stuck / not quarantined as well?
* activity. | ||
*/ | ||
@SignalMethod | ||
void retryFailActivity(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retryFailedActivity
?
@@ -77,4 +91,22 @@ | |||
@QueryMethod | |||
JobInformation getJobInformation(); | |||
|
|||
@Data | |||
@NoArgsConstructor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need a no args constructor?
private <INPUT> void runActivity(Consumer<INPUT> consumer, INPUT input) { | ||
try { | ||
consumer.accept(input); | ||
} catch (Exception e) { | ||
log.error("Failed to run an activity for the connection " + connectionId, e); | ||
workflowState.setStuck(true); | ||
workflowState.setRetryFailedActivity(false); | ||
Workflow.await(() -> workflowState.isRetryFailedActivity()); | ||
log.error("Retrying an activity for the connection " + connectionId, e); | ||
workflowState.setStuck(false); | ||
workflowState.setRetryFailedActivity(false); | ||
runActivity(consumer, input); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we dry this up. the duplicate code here is going to be easy to have go out of sync.
private <INPUT> void runActivity(Consumer<INPUT> consumer, INPUT input) { | |
try { | |
consumer.accept(input); | |
} catch (Exception e) { | |
log.error("Failed to run an activity for the connection " + connectionId, e); | |
workflowState.setStuck(true); | |
workflowState.setRetryFailedActivity(false); | |
Workflow.await(() -> workflowState.isRetryFailedActivity()); | |
log.error("Retrying an activity for the connection " + connectionId, e); | |
workflowState.setStuck(false); | |
workflowState.setRetryFailedActivity(false); | |
runActivity(consumer, input); | |
} | |
} | |
private <INPUT> void runActivity2(final Consumer<INPUT> consumer, final INPUT input) { | |
// make the consumer look like a function so that we can reuse logic in runActivityWithOutput | |
runActivityWithOutput((inputInternal) -> { | |
consumer.accept(inputInternal); | |
return null; | |
}, input); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -334,4 +365,34 @@ private void continueAsNew(final ConnectionUpdaterInput connectionUpdaterInput) | |||
} | |||
} | |||
|
|||
private <INPUT, OUTPUT> OUTPUT runActivityWithOutput(Function<INPUT, OUTPUT> mapper, INPUT input) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only for a subset of activities. should the method name say short activities or mandatory activities or something? also can we add a javadoc comment to this method explaining why it is necessary, please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@Data | ||
@NoArgsConstructor | ||
@AllArgsConstructor | ||
class StuckInformation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we replace the word "stuck" with "quarantined", please? quarantined is a common state name for a workflow that needs manual intervention. within that we can add information on the reason it is stuck.
Yes, but it should be in another PR. |
Done |
It is needed for the internal temporal serialization |
Done |
What
After an activity failure, we are blocking the workflow. A new query method is available to query the workflows to get the list of workflow being stuck.
Then the activity can be retry with a signal.