@@ -2,7 +2,9 @@ package runner
2
2
3
3
import (
4
4
"context"
5
+ "time"
5
6
7
+ "github.com/pkg/errors"
6
8
"go.uber.org/zap"
7
9
"golang.org/x/sync/errgroup"
8
10
@@ -12,8 +14,11 @@ import (
12
14
"github.com/kubeshop/testkube/pkg/event"
13
15
"github.com/kubeshop/testkube/pkg/log"
14
16
configrepo "github.com/kubeshop/testkube/pkg/repository/config"
17
+ "github.com/kubeshop/testkube/pkg/testworkflows/executionworker/controller"
15
18
"github.com/kubeshop/testkube/pkg/testworkflows/executionworker/executionworkertypes"
19
+ "github.com/kubeshop/testkube/pkg/testworkflows/executionworker/registry"
16
20
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowconfig"
21
+ "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/stage"
17
22
)
18
23
19
24
type Options struct {
@@ -84,8 +89,51 @@ func (s *service) recover(ctx context.Context) (err error) {
84
89
for _ , exec := range executions {
85
90
go func (environmentId string , executionId string ) {
86
91
err := s .runner .Monitor (ctx , s .proContext .OrgID , environmentId , executionId )
87
- if err != nil {
92
+ if err == nil {
93
+ return
94
+ }
95
+ if ! errors .Is (err , registry .ErrResourceNotFound ) {
88
96
s .logger .Errorw ("failed to monitor execution" , "id" , executionId , "error" , err )
97
+ return
98
+ }
99
+
100
+ s .logger .Warnw ("execution to monitor not found. recovering." , "id" , executionId , "error" , err )
101
+
102
+ // Get the existing execution
103
+ execution , err := s .client .GetExecution (ctx , environmentId , executionId )
104
+ if err != nil {
105
+ s .logger .Errorw ("failed to recover execution: getting execution" , "id" , executionId , "error" , err )
106
+ return
107
+ }
108
+
109
+ // Ignore if it's still queued - orchestrator will recover it later
110
+ if execution .Result .IsQueued () {
111
+ s .logger .Warnw ("execution to monitor is still queued: leaving it for orchestrator" , "id" , executionId )
112
+ return
113
+ }
114
+
115
+ // Check if there is error message acknowledged
116
+ sigSequence := stage .MapSignatureListToInternal (stage .MapSignatureToSequence (stage .MapSignatureList (execution .Signature )))
117
+ errorMessage := execution .Result .Initialization .ErrorMessage
118
+ if errorMessage == "" {
119
+ for _ , sig := range sigSequence {
120
+ if execution .Result .Steps [sig .Ref ].ErrorMessage != "" {
121
+ errorMessage = execution .Result .Steps [sig .Ref ].ErrorMessage
122
+ break
123
+ }
124
+ }
125
+ }
126
+
127
+ // Finalize and save the result
128
+ execution .Result .HealAborted (sigSequence , errorMessage , controller .DefaultErrorMessage )
129
+ execution .Result .HealTimestamps (sigSequence , execution .ScheduledAt , time.Time {}, time.Time {}, true )
130
+ execution .Result .HealDuration (execution .ScheduledAt )
131
+ execution .Result .HealMissingPauseStatuses ()
132
+ execution .Result .HealStatus (sigSequence )
133
+ if err = s .client .FinishExecutionResult (ctx , environmentId , executionId , execution .Result ); err != nil {
134
+ s .logger .Errorw ("failed to recover execution: saving execution" , "id" , executionId , "error" , err )
135
+ } else {
136
+ s .logger .Infow ("recovered execution" , "id" , executionId , "error" , err )
89
137
}
90
138
}(exec .EnvironmentId , exec .Id )
91
139
}
0 commit comments