Skip to content

Fix cancel handling in pipedv1 scheduler #5597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/app/pipedv1/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,10 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final
TargetDeploymentSource: tds.ToPluginDeploySource(),
},
})
if err != nil {
// do not return error if the context is already canceled.
// this occurs when the stage is canceled.
// otherwise, return the error.
if err != nil && ctx.Err() == nil {
s.logger.Error("failed to execute stage", zap.String("stage-name", ps.Name), zap.Error(err))
s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires)
return model.StageStatus_STAGE_FAILURE
Expand Down
5 changes: 4 additions & 1 deletion pkg/app/pipedv1/plugin/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ func wait(ctx context.Context, duration time.Duration, initialStart time.Time, s

case <-ctx.Done(): // on cancelled
slp.Info("Wait cancelled")
return sdk.StageStatusCancelled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[IMO] I think StageStatusCancelled should remain, although it's not used in piped.
That's because plugin developers will be confused about which status to return.

If we want to remove StageStatusCancelled, we should remove case <-ctx.Done(): section too. (If possible, that's ideal)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you are concerned about.
On the other hand, my concern is that the plugin developers may think they have to handle context cancellation as StageStatusCancelled. This is incorrect; the plugin should exit its operation on the context cancel without concern about its response.
The WAIT plugin's case is special because we must handle context cancellation to exit its operation. Almost all plugins can do this only by passing the context to their internal functions because deployment operations can handle context cancellation as a failure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plugin developers may think they have to handle context cancellation as StageStatusCancelled.

I agree!

What about WaitApproval and ScriptRun stages?

e.LogPersister.Infof("Waiting for approval from at least %d user(s)...", num)
for {
select {
case <-ticker.C:
if e.checkApproval(ctx, num) {
return model.StageStatus_STAGE_SUCCESS
}
case s := <-sig.Ch():
switch s {
case executor.StopSignalCancel:
return model.StageStatus_STAGE_CANCELLED
case executor.StopSignalTerminate:
return originalStatus
default:
return model.StageStatus_STAGE_FAILURE
}
case <-timer.C:
e.LogPersister.Errorf("Timed out %v", timeout)
return model.StageStatus_STAGE_FAILURE
}
}

for {
select {
case result := <-c:
return result
case <-timer.C:
e.LogPersister.Errorf("Canceled because of timeout")
return model.StageStatus_STAGE_FAILURE
case s := <-sig.Ch():
switch s {
case executor.StopSignalCancel:
e.LogPersister.Info("Canceled by user")
return model.StageStatus_STAGE_CANCELLED
case executor.StopSignalTerminate:
e.LogPersister.Info("Terminated by system")
return originalStatus
default:
e.LogPersister.Error("Unexpected")
return model.StageStatus_STAGE_FAILURE
}
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, the timeout should be handled on the piped side so the plugin doesn't have to.

SCRIPT_RUN stage should use os/exec.CommandContext: it handles context cancellation as an interruption of executed commands. So we can implement it without watching ctx.Done().

WAIT_APPROVAL stage is difficult to implement without watching ctx.Done() because it doesn't operate something with context other than polling the approval states.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i got it.

Let's add a note about when to handle ctx.Done() to the plugin dev guide.
Even if StageStatusCancelled is removed, plugin developers should be aware of cancellation to certainly exit the stage.

// The piped handles this case as cancelled by the user without using the plugin's result.
// So we don't need to consider which status should be returned.
// We return the failure here.
return sdk.StageStatusFailure
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/pipedv1/plugin/wait/wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestWait_Cancel(t *testing.T) {

select {
case result := <-resultCh:
assert.Equal(t, sdk.StageStatusCancelled, result)
assert.Equal(t, sdk.StageStatusFailure, result)
case <-time.After(1 * time.Second):
t.Error("wait() did not ended even after the context was canceled")
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/plugin/sdk/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,8 @@ type ExecuteStageResponse struct {
type StageStatus int

const (
StageStatusSuccess StageStatus = 2
StageStatusFailure StageStatus = 3
StageStatusCancelled StageStatus = 4
StageStatusSuccess StageStatus = 2
StageStatusFailure StageStatus = 3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Q] I don't remember why we made this enum start from 2; could you teach me? 👀

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because these lines are copied from below. It's a bit confusing, so I want to make them start from 1.

// StageStatus represents the current status of a stage of a deployment.
type StageStatus int32
const (
StageStatus_STAGE_NOT_STARTED_YET StageStatus = 0
StageStatus_STAGE_RUNNING StageStatus = 1
StageStatus_STAGE_SUCCESS StageStatus = 2
StageStatus_STAGE_FAILURE StageStatus = 3
StageStatus_STAGE_CANCELLED StageStatus = 4
StageStatus_STAGE_SKIPPED StageStatus = 5
StageStatus_STAGE_EXITED StageStatus = 6
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored it on this commit.
a755ab8


// StageStatusSkipped StageStatus = 5 // TODO: If SDK can handle whole skipping, this is unnecessary.

Expand All @@ -472,8 +471,6 @@ func (o StageStatus) toModelEnum() model.StageStatus {
return model.StageStatus_STAGE_SUCCESS
case StageStatusFailure:
return model.StageStatus_STAGE_FAILURE
case StageStatusCancelled:
return model.StageStatus_STAGE_CANCELLED
case StageStatusExited:
return model.StageStatus_STAGE_EXITED
default:
Expand Down