Skip to content

If an activity is failing, stuck the workflow and make it queriable #10121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -55,6 +56,19 @@ public interface ConnectionManagerWorkflow {
@SignalMethod
void resetConnection();

/**
* If an activity fails the workflow will be stuck. This signal activity can be used to retry the
* activity.
*/
@SignalMethod
void retryFailActivity();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retryFailedActivity ?


/**
* Use for testing in order to simulate a job failure.
*/
@SignalMethod
void simulateFailure();

/**
* Return the current state of the workflow.
*/
Expand All @@ -77,4 +91,22 @@ class JobInformation {
@QueryMethod
JobInformation getJobInformation();

@Data
@NoArgsConstructor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a no args constructor?

@AllArgsConstructor
class StuckInformation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we replace the word "stuck" with "quarantined", please? quarantined is a common state name for a workflow that needs manual intervention. within that we can add information on the reason it is stuck.


private UUID connectionId;
private long jobId;
private int attemptId;
private boolean isStuck;

}

/**
* Return if a job is stuck or not with the job information
*/
@QueryMethod
StuckInformation getStuckInformation();

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand Down Expand Up @@ -95,19 +97,23 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
final ScheduleRetrieverOutput scheduleRetrieverOutput = configFetchActivity.getTimeToWait(scheduleRetrieverInput);
Workflow.await(scheduleRetrieverOutput.getTimeToWait(),
() -> skipScheduling() || connectionUpdaterInput.isFromFailure());

if (!workflowState.isUpdated() && !workflowState.isDeleted()) {
// Job and attempt creation
maybeJobId = Optional.ofNullable(connectionUpdaterInput.getJobId()).or(() -> {
final JobCreationOutput jobCreationOutput = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput(
connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection()));
final JobCreationOutput jobCreationOutput =
runActivityWithOutput(
(input) -> jobCreationAndStatusUpdateActivity.createNewJob(input),
new JobCreationInput(
connectionUpdaterInput.getConnectionId(), workflowState.isResetConnection()));
connectionUpdaterInput.setJobId(jobCreationOutput.getJobId());
return Optional.ofNullable(jobCreationOutput.getJobId());
});

maybeAttemptId = Optional.ofNullable(connectionUpdaterInput.getAttemptId()).or(() -> maybeJobId.map(jobId -> {
final AttemptCreationOutput attemptCreationOutput = jobCreationAndStatusUpdateActivity.createNewAttempt(new AttemptCreationInput(
jobId));
final AttemptCreationOutput attemptCreationOutput =
runActivityWithOutput(
(input) -> jobCreationAndStatusUpdateActivity.createNewAttempt(input),
new AttemptCreationInput(
jobId));
connectionUpdaterInput.setAttemptId(attemptCreationOutput.getAttemptId());
return attemptCreationOutput.getAttemptId();
}));
Expand All @@ -118,10 +124,14 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
maybeJobId.get(),
workflowState.isResetConnection());

jobCreationAndStatusUpdateActivity.reportJobStart(new ReportJobStartInput(
maybeJobId.get()));
runActivity(
(input) -> jobCreationAndStatusUpdateActivity.reportJobStart(input),
new ReportJobStartInput(
maybeJobId.get()));

final SyncOutput syncWorkflowInputs = getSyncInputActivity.getSyncWorkflowInput(getSyncInputActivitySyncInput);
final SyncOutput syncWorkflowInputs = runActivityWithOutput(
(input) -> getSyncInputActivity.getSyncWorkflowInput(input),
getSyncInputActivitySyncInput);

workflowState.setRunning(true);

Expand Down Expand Up @@ -203,13 +213,14 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
} else if (workflowState.isDeleted()) {
// Stop the runs
final ConnectionDeletionInput connectionDeletionInput = new ConnectionDeletionInput(connectionUpdaterInput.getConnectionId());
connectionDeletionActivity.deleteConnection(connectionDeletionInput);
runActivity((input) -> connectionDeletionActivity.deleteConnection(input), connectionDeletionInput);
return;
} else if (workflowState.isCancelled()) {
jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(
maybeJobId.get(),
maybeAttemptId.get(),
FailureHelper.failureSummaryForCancellation(maybeJobId.get(), maybeAttemptId.get(), failures, partialSuccess)));
runActivity((input) -> jobCreationAndStatusUpdateActivity.jobCancelled(input),
new JobCancelledInput(
maybeJobId.get(),
maybeAttemptId.get(),
FailureHelper.failureSummaryForCancellation(maybeJobId.get(), maybeAttemptId.get(), failures, partialSuccess)));
resetNewConnectionInput(connectionUpdaterInput);
} else if (workflowState.isFailed()) {
reportFailure(connectionUpdaterInput);
Expand All @@ -227,7 +238,8 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
}

private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) {
jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(
workflowState.setSuccess(true);
runActivity((input) -> jobCreationAndStatusUpdateActivity.jobSuccess(input), new JobSuccessInput(
maybeJobId.get(),
maybeAttemptId.get(),
standardSyncOutput.orElse(null)));
Expand All @@ -236,7 +248,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput)
}

private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) {
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(
runActivity((input) -> jobCreationAndStatusUpdateActivity.attemptFailure(input), new AttemptFailureInput(
connectionUpdaterInput.getJobId(),
connectionUpdaterInput.getAttemptId(),
standardSyncOutput.orElse(null),
Expand All @@ -250,7 +262,7 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput)
connectionUpdaterInput.setAttemptNumber(attemptNumber + 1);
connectionUpdaterInput.setFromFailure(true);
} else {
jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(
runActivity((input) -> jobCreationAndStatusUpdateActivity.jobFailure(input), new JobFailureInput(
connectionUpdaterInput.getJobId(),
"Job failed after too many retries for connection " + connectionId));

Expand Down Expand Up @@ -305,6 +317,16 @@ public void resetConnection() {
}
}

@Override
public void retryFailActivity() {
workflowState.setRetryFailedActivity(true);
}

@Override
public void simulateFailure() {
workflowState.setFailed(true);
}

@Override
public WorkflowState getState() {
return workflowState;
Expand All @@ -317,6 +339,15 @@ public JobInformation getJobInformation() {
maybeAttemptId.orElse(NON_RUNNING_ATTEMPT_ID));
}

@Override
public StuckInformation getStuckInformation() {
return new StuckInformation(
connectionId,
maybeJobId.orElse(NON_RUNNING_JOB_ID),
maybeAttemptId.orElse(NON_RUNNING_ATTEMPT_ID),
workflowState.isStuck());
}

private Boolean skipScheduling() {
return workflowState.isSkipScheduling() || workflowState.isDeleted() || workflowState.isUpdated() || workflowState.isResetConnection();
}
Expand All @@ -334,4 +365,34 @@ private void continueAsNew(final ConnectionUpdaterInput connectionUpdaterInput)
}
}

private <INPUT, OUTPUT> OUTPUT runActivityWithOutput(Function<INPUT, OUTPUT> mapper, INPUT input) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only for a subset of activities. should the method name say short activities or mandatory activities or something? also can we add a javadoc comment to this method explaining why it is necessary, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

try {
return mapper.apply(input);
} catch (Exception e) {
log.error("Failed to run an activity for the connection " + connectionId, e);
workflowState.setStuck(true);
workflowState.setRetryFailedActivity(false);
Workflow.await(() -> workflowState.isRetryFailedActivity());
log.error("Retrying an activity for the connection " + connectionId, e);
workflowState.setStuck(false);
workflowState.setRetryFailedActivity(false);
return runActivityWithOutput(mapper, input);
}
}

private <INPUT> void runActivity(Consumer<INPUT> consumer, INPUT input) {
try {
consumer.accept(input);
} catch (Exception e) {
log.error("Failed to run an activity for the connection " + connectionId, e);
workflowState.setStuck(true);
workflowState.setRetryFailedActivity(false);
Workflow.await(() -> workflowState.isRetryFailedActivity());
log.error("Retrying an activity for the connection " + connectionId, e);
workflowState.setStuck(false);
workflowState.setRetryFailedActivity(false);
runActivity(consumer, input);
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we dry this up. the duplicate code here is going to be easy to have go out of sync.

Suggested change
private <INPUT> void runActivity(Consumer<INPUT> consumer, INPUT input) {
try {
consumer.accept(input);
} catch (Exception e) {
log.error("Failed to run an activity for the connection " + connectionId, e);
workflowState.setStuck(true);
workflowState.setRetryFailedActivity(false);
Workflow.await(() -> workflowState.isRetryFailedActivity());
log.error("Retrying an activity for the connection " + connectionId, e);
workflowState.setStuck(false);
workflowState.setRetryFailedActivity(false);
runActivity(consumer, input);
}
}
private <INPUT> void runActivity2(final Consumer<INPUT> consumer, final INPUT input) {
// make the consumer look like a function so that we can reuse logic in runActivityWithOutput
runActivityWithOutput((inputInternal) -> {
consumer.accept(inputInternal);
return null;
}, input);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan
private boolean failed = false;
private boolean resetConnection = false;
private boolean continueAsReset = false;
private boolean retryFailedActivity = false;
private boolean stuck = false;
private boolean success = true;

public void setRunning(final boolean running) {
final ChangedStateEvent event = new ChangedStateEvent(
Expand Down Expand Up @@ -95,6 +98,30 @@ public void setContinueAsReset(final boolean continueAsReset) {
this.continueAsReset = continueAsReset;
}

public void setRetryFailedActivity(final boolean retryFailedActivity) {
final ChangedStateEvent event = new ChangedStateEvent(
StateField.RETRY_FAILED_ACTIVITY,
retryFailedActivity);
stateChangedListener.addEvent(id, event);
this.retryFailedActivity = retryFailedActivity;
}

public void setStuck(final boolean stuck) {
final ChangedStateEvent event = new ChangedStateEvent(
StateField.STUCK,
stuck);
stateChangedListener.addEvent(id, event);
this.stuck = stuck;
}

public void setSuccess(final boolean success) {
final ChangedStateEvent event = new ChangedStateEvent(
StateField.SUCCESS,
success);
stateChangedListener.addEvent(id, event);
this.success = success;
}

public void reset() {
this.setRunning(false);
this.setDeleted(false);
Expand All @@ -104,6 +131,9 @@ public void reset() {
this.setFailed(false);
this.setResetConnection(false);
this.setContinueAsReset(false);
this.setRetryFailedActivity(false);
this.setSuccess(false);
this.setStuck(false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ public class TestStateListener implements WorkflowStateChangedListener {

private static final ConcurrentHashMap<UUID, Queue<ChangedStateEvent>> events = new ConcurrentHashMap<>();

public static void reset() {
events.clear();
}

@Override
public Queue<ChangedStateEvent> events(final UUID testId) {
if (!events.containsKey(testId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ enum StateField {
UPDATED,
FAILED,
RESET,
CONTINUE_AS_RESET
CONTINUE_AS_RESET,
RETRY_FAILED_ACTIVITY,
STUCK,
SUCCESS
}

@Value
Expand Down
Loading