diff --git a/pkg/runner/service.go b/pkg/runner/service.go index 41f475673f..0ccc6bdd61 100644 --- a/pkg/runner/service.go +++ b/pkg/runner/service.go @@ -2,7 +2,9 @@ package runner import ( "context" + "time" + "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -12,8 +14,11 @@ import ( "github.com/kubeshop/testkube/pkg/event" "github.com/kubeshop/testkube/pkg/log" configrepo "github.com/kubeshop/testkube/pkg/repository/config" + "github.com/kubeshop/testkube/pkg/testworkflows/executionworker/controller" "github.com/kubeshop/testkube/pkg/testworkflows/executionworker/executionworkertypes" + "github.com/kubeshop/testkube/pkg/testworkflows/executionworker/registry" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowconfig" + "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/stage" ) type Options struct { @@ -84,8 +89,51 @@ func (s *service) recover(ctx context.Context) (err error) { for _, exec := range executions { go func(environmentId string, executionId string) { err := s.runner.Monitor(ctx, s.proContext.OrgID, environmentId, executionId) - if err != nil { + if err == nil { + return + } + if !errors.Is(err, registry.ErrResourceNotFound) { s.logger.Errorw("failed to monitor execution", "id", executionId, "error", err) + return + } + + s.logger.Warnw("execution to monitor not found. recovering.", "id", executionId, "error", err) + + // Get the existing execution + execution, err := s.client.GetExecution(ctx, environmentId, executionId) + if err != nil { + s.logger.Errorw("failed to recover execution: getting execution", "id", executionId, "error", err) + return + } + + // Ignore if it's still queued - orchestrator will recover it later + if execution.Result.IsQueued() { + s.logger.Warnw("execution to monitor is still queued: leaving it for orchestrator", "id", executionId) + return + } + + // Check if there is error message acknowledged + sigSequence := stage.MapSignatureListToInternal(stage.MapSignatureToSequence(stage.MapSignatureList(execution.Signature))) + errorMessage := execution.Result.Initialization.ErrorMessage + if errorMessage == "" { + for _, sig := range sigSequence { + if execution.Result.Steps[sig.Ref].ErrorMessage != "" { + errorMessage = execution.Result.Steps[sig.Ref].ErrorMessage + break + } + } + } + + // Finalize and save the result + execution.Result.HealAborted(sigSequence, errorMessage, controller.DefaultErrorMessage) + execution.Result.HealTimestamps(sigSequence, execution.ScheduledAt, time.Time{}, time.Time{}, true) + execution.Result.HealDuration(execution.ScheduledAt) + execution.Result.HealMissingPauseStatuses() + execution.Result.HealStatus(sigSequence) + if err = s.client.FinishExecutionResult(ctx, environmentId, executionId, execution.Result); err != nil { + s.logger.Errorw("failed to recover execution: saving execution", "id", executionId, "error", err) + } else { + s.logger.Infow("recovered execution", "id", executionId, "error", err) } }(exec.EnvironmentId, exec.Id) }