Skip to content

Commit 46946f1

Browse files
authored
feat: add more detailed log for debugging executions considered aborted (#6298)
* fix: correctly determine paused Test Workflow * feat: add debugging information for the execution that has been detected as aborted * chore: don't show part of debug if not needed * chore: avoid logging aborted details inside of execution (service/parallel) unless debug mode is enabled
1 parent 4e4727c commit 46946f1

File tree

15 files changed

+191
-11
lines changed

15 files changed

+191
-11
lines changed

cmd/api-server/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func main() {
239239
executionWorker := services.CreateExecutionWorker(clientset, cfg, clusterId, proContext.Agent.ID, serviceAccountNames, testWorkflowProcessor, map[string]string{
240240
testworkflowconfig.FeatureFlagNewArchitecture: fmt.Sprintf("%v", cfg.FeatureNewArchitecture),
241241
testworkflowconfig.FeatureFlagCloudStorage: fmt.Sprintf("%v", cfg.FeatureCloudStorage),
242-
}, commonEnvVariables)
242+
}, commonEnvVariables, true)
243243

244244
runnerOpts := runner2.Options{
245245
ClusterID: clusterId,

cmd/api-server/services/executionworker.go

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func CreateExecutionWorker(
2323
processor testworkflowprocessor.Processor,
2424
featureFlags map[string]string,
2525
commonEnvVariables []corev1.EnvVar,
26+
logAbortedDetails bool,
2627
) executionworkertypes.Worker {
2728
namespacesConfig := map[string]kubernetesworker.NamespaceConfig{}
2829
for n, s := range serviceAccountNames {
@@ -53,5 +54,6 @@ func CreateExecutionWorker(
5354
FeatureFlags: featureFlags,
5455
RunnerId: runnerId,
5556
CommonEnvVariables: commonEnvVariables,
57+
LogAbortedDetails: logAbortedDetails,
5658
})
5759
}

cmd/tcl/testworkflow-toolkit/spawn/utils.go

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ func ExecutionWorker() executionworkertypes.Worker {
7676
FeatureFlags: cfg.Worker.FeatureFlags,
7777
RunnerId: cfg.Worker.RunnerID,
7878
CommonEnvVariables: cfg.Worker.CommonEnvVariables,
79+
LogAbortedDetails: config.Debug(),
7980
})
8081
}
8182
return executionWorker

pkg/api/v1/testkube/model_test_workflow_result_extended.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (r *TestWorkflowResult) IsAnyStepAborted() bool {
138138
func (r *TestWorkflowResult) IsAnyStepPaused() bool {
139139
// When initialization was aborted or failed - it's immediately end
140140
if r.Initialization.Status.AnyError() {
141-
return true
141+
return false
142142
}
143143

144144
// Analyze the rest of the steps

pkg/runner/service.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (s *service) recover(ctx context.Context) (err error) {
133133
if err = s.client.FinishExecutionResult(ctx, environmentId, executionId, execution.Result); err != nil {
134134
s.logger.Errorw("failed to recover execution: saving execution", "id", executionId, "error", err)
135135
} else {
136-
s.logger.Infow("recovered execution", "id", executionId, "error", err)
136+
s.logger.Infow("recovered execution", "id", executionId, "status", "error", err)
137137
}
138138
}(exec.EnvironmentId, exec.Id)
139139
}

pkg/testworkflows/executionworker/controller/controller.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
initconstants "github.com/kubeshop/testkube/cmd/testworkflow-init/constants"
1212
"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
1313
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
14+
"github.com/kubeshop/testkube/pkg/log"
1415
"github.com/kubeshop/testkube/pkg/testworkflows/executionworker/controller/watchers"
1516
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowconfig"
1617
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/stage"
@@ -44,7 +45,7 @@ type Controller interface {
4445
Pause(ctx context.Context) error
4546
Resume(ctx context.Context) error
4647
Cleanup(ctx context.Context) error
47-
Watch(ctx context.Context, disableFollow bool) <-chan ChannelMessage[Notification]
48+
Watch(ctx context.Context, disableFollow, logAbortedDetails bool) <-chan ChannelMessage[Notification]
4849
WatchLightweight(ctx context.Context) <-chan LightweightNotification
4950
Logs(ctx context.Context, follow bool) io.Reader
5051
NodeName() (string, error)
@@ -88,6 +89,7 @@ func New(parentCtx context.Context, clientSet kubernetes.Interface, namespace, i
8889

8990
// There was a job or pod for this execution, so we may only assume it is aborted
9091
if !watcher.State().JobEvents().FirstTimestamp().IsZero() || !watcher.State().PodEvents().FirstTimestamp().IsZero() {
92+
log.DefaultLogger.Errorw("connecting to aborted execution", "executionId", watcher.State().ResourceId(), "debug", watcher.State().Debug())
9193
return nil, ErrJobAborted
9294
}
9395

@@ -226,9 +228,10 @@ func (c *controller) EstimatedResult(parentCtx context.Context) (*testkube.TestW
226228
return nil, ErrMissingEstimatedResult
227229
}
228230

229-
func (c *controller) Watch(parentCtx context.Context, disableFollow bool) <-chan ChannelMessage[Notification] {
231+
func (c *controller) Watch(parentCtx context.Context, disableFollow bool, logAbortedDetails bool) <-chan ChannelMessage[Notification] {
230232
ch, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.watcher, WatchInstrumentedPodOptions{
231-
DisableFollow: disableFollow,
233+
DisableFollow: disableFollow,
234+
LogAbortedDetails: logAbortedDetails,
232235
})
233236
if err != nil {
234237
v := make(chan ChannelMessage[Notification], 1)
@@ -249,7 +252,7 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei
249252
ch := make(chan LightweightNotification)
250253
go func() {
251254
defer close(ch)
252-
for v := range c.Watch(parentCtx, false) {
255+
for v := range c.Watch(parentCtx, false, false) {
253256
if v.Error != nil {
254257
ch <- LightweightNotification{Error: v.Error}
255258
continue

pkg/testworkflows/executionworker/controller/watchers/commons.go

+23
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,29 @@ func GetJobError(job *batchv1.Job) string {
306306
return ""
307307
}
308308

309+
func GetContainerStateDebug(state corev1.ContainerState) string {
310+
if state.Running != nil {
311+
return "running"
312+
} else if state.Terminated != nil {
313+
result := fmt.Sprintf("terminated, reason: '%s'", state.Terminated.Reason)
314+
if state.Terminated.Message != "" {
315+
result += fmt.Sprintf(", message: '%s'", state.Terminated.Message)
316+
}
317+
if state.Terminated.ExitCode != 0 {
318+
result += fmt.Sprintf(", exit code: '%d'", state.Terminated.ExitCode)
319+
}
320+
if state.Terminated.Signal != 0 {
321+
result += fmt.Sprintf(", signal: %d", state.Terminated.Signal)
322+
}
323+
return result
324+
} else if state.Waiting != nil {
325+
return fmt.Sprintf("waiting, reason: '%s', message: '%s'",
326+
state.Waiting.Reason,
327+
state.Waiting.Message)
328+
}
329+
return "unknown"
330+
}
331+
309332
func ReadContainerResult(status *corev1.ContainerStatus, errorFallback string) ContainerResult {
310333
result := ContainerResult{}
311334

pkg/testworkflows/executionworker/controller/watchers/executionstate.go

+23
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type ExecutionState interface {
6161
ExecutionError() string
6262
JobExecutionError() string
6363
PodExecutionError() string
64+
Debug() map[string]string
6465

6566
PodCreationTimestamp() time.Time
6667
EstimatedPodCreationTimestamp() time.Time
@@ -443,3 +444,25 @@ func (e *executionState) ExecutionError() string {
443444
}
444445
return podErr
445446
}
447+
448+
func (e *executionState) Debug() map[string]string {
449+
result := map[string]string{
450+
"pod": "unknown",
451+
"job": "unknown",
452+
"podEvents": "unknown",
453+
"jobEvents": "unknown",
454+
}
455+
if e.pod != nil {
456+
result["pod"] = e.pod.Debug()
457+
}
458+
if e.job != nil {
459+
result["job"] = e.job.Debug()
460+
}
461+
if e.podEvents != nil {
462+
result["podEvents"] = e.podEvents.Debug()
463+
}
464+
if e.jobEvents != nil {
465+
result["jobEvents"] = e.jobEvents.Debug()
466+
}
467+
return result
468+
}

pkg/testworkflows/executionworker/controller/watchers/job.go

+46
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package watchers
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"time"
67

78
batchv1 "k8s.io/api/batch/v1"
@@ -31,6 +32,7 @@ type Job interface {
3132
InternalConfig() (testworkflowconfig.InternalConfig, error)
3233
ScheduledAt() (time.Time, error)
3334
ExecutionError() string
35+
Debug() string
3436
}
3537

3638
func NewJob(original *batchv1.Job) Job {
@@ -97,3 +99,47 @@ func (j *job) ScheduledAt() (time.Time, error) {
9799
func (j *job) ExecutionError() string {
98100
return GetJobError(j.original)
99101
}
102+
103+
func (j *job) Debug() string {
104+
if j.original == nil {
105+
return "unknown"
106+
}
107+
state := "found"
108+
if j.original.Status.Active > 0 {
109+
state += fmt.Sprintf(", active: %d", j.original.Status.Active)
110+
}
111+
if j.original.Status.Failed > 0 {
112+
state += fmt.Sprintf(", failed: %d", j.original.Status.Failed)
113+
}
114+
if j.original.Status.Succeeded > 0 {
115+
state += fmt.Sprintf(", succeeded: %d", j.original.Status.Succeeded)
116+
}
117+
if j.original.Status.Ready != nil {
118+
state += fmt.Sprintf(", ready: %d", *j.original.Status.Ready)
119+
}
120+
if j.original.Status.Terminating != nil {
121+
state += fmt.Sprintf(", terminating: %d", *j.original.Status.Terminating)
122+
}
123+
if j.original.Status.UncountedTerminatedPods != nil &&
124+
(len(j.original.Status.UncountedTerminatedPods.Failed) > 0 || len(j.original.Status.UncountedTerminatedPods.Succeeded) > 0) {
125+
state += fmt.Sprintf(", uncounted terminated pods: (failed) %d (succeeded) %d",
126+
len(j.original.Status.UncountedTerminatedPods.Failed),
127+
len(j.original.Status.UncountedTerminatedPods.Succeeded))
128+
}
129+
if j.original.Status.StartTime != nil {
130+
state += ", started"
131+
}
132+
if j.original.DeletionTimestamp != nil {
133+
state += ", deleted"
134+
}
135+
if j.original.Status.CompletionTime != nil {
136+
state += ", completed"
137+
}
138+
for i := range j.original.Status.Conditions {
139+
state += fmt.Sprintf(", %s='%s'", j.original.Status.Conditions[i].Type, j.original.Status.Conditions[i].Status)
140+
}
141+
if j.original.Spec.TTLSecondsAfterFinished != nil {
142+
state += fmt.Sprintf(", ttl after: %ds", *j.original.Spec.TTLSecondsAfterFinished)
143+
}
144+
return state
145+
}

pkg/testworkflows/executionworker/controller/watchers/jobevents.go

+12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package watchers
22

33
import (
4+
"fmt"
45
"strings"
56
"time"
67

@@ -27,6 +28,7 @@ type JobEvents interface {
2728
Error() bool
2829
ErrorReason() string
2930
ErrorMessage() string
31+
Debug() string
3032
}
3133

3234
func NewJobEvents(events []*corev1.Event) JobEvents {
@@ -167,3 +169,13 @@ func (j *jobEvents) ErrorMessage() string {
167169
}
168170
return ""
169171
}
172+
173+
func (j *jobEvents) Debug() string {
174+
firstTs := j.FirstTimestamp()
175+
result := make([]string, len(j.events))
176+
for i := range j.events {
177+
result[i] = fmt.Sprintf("[%.1fs] %s: %s", float64(GetEventTimestamp(j.events[i]).Sub(firstTs))/float64(time.Second), j.events[i].Reason, j.events[i].Message)
178+
}
179+
180+
return strings.Join(result, ", ")
181+
}

pkg/testworkflows/executionworker/controller/watchers/pod.go

+53
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package watchers
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"strconv"
7+
"strings"
68
"time"
79

810
corev1 "k8s.io/api/core/v1"
@@ -43,6 +45,7 @@ type Pod interface {
4345
ContainersReady() bool
4446
ContainerError() string
4547
ExecutionError() string
48+
Debug() string
4649
}
4750

4851
func NewPod(original *corev1.Pod) Pod {
@@ -232,3 +235,53 @@ func (p *pod) ExecutionError() string {
232235
}
233236
return errStr
234237
}
238+
239+
func (p *pod) ContainerDebug(name string) string {
240+
status := GetContainerStatus(p.original, name)
241+
if status == nil {
242+
return "unknown"
243+
}
244+
state := GetContainerStateDebug(status.State)
245+
if status.Started != nil {
246+
state += fmt.Sprintf(", started: %v", *status.Started)
247+
}
248+
if last := GetContainerStateDebug(status.LastTerminationState); last != "unknown" {
249+
state += fmt.Sprintf(", last: %s", last)
250+
}
251+
return state
252+
}
253+
254+
func (p *pod) Debug() string {
255+
if p.original == nil {
256+
return "unknown"
257+
}
258+
state := "found"
259+
if p.original.Status.Reason != "" {
260+
state += fmt.Sprintf(", reason: '%s'", p.original.Status.Reason)
261+
}
262+
if p.original.Status.Message != "" {
263+
state += fmt.Sprintf(", message: '%s'", p.original.Status.Message)
264+
}
265+
if p.original.Status.StartTime != nil {
266+
state += ", started"
267+
}
268+
if p.original.DeletionTimestamp != nil {
269+
state += ", deleted"
270+
}
271+
if p.ExecutionError() != "" {
272+
state += fmt.Sprintf(", error: '%s'", p.ExecutionError())
273+
}
274+
for i := range p.original.Status.Conditions {
275+
state += fmt.Sprintf(", %s='%s'", p.original.Status.Conditions[i].Type, p.original.Status.Conditions[i].Status)
276+
}
277+
initContainers := make([]string, 0, len(p.original.Spec.InitContainers))
278+
for i := range p.original.Spec.InitContainers {
279+
initContainers = append(initContainers, fmt.Sprintf("[%s]: %s", p.original.Spec.InitContainers[i].Name, p.ContainerDebug(p.original.Spec.InitContainers[i].Name)))
280+
}
281+
containers := make([]string, 0, len(p.original.Spec.Containers))
282+
for i := range p.original.Spec.Containers {
283+
containers = append(containers, fmt.Sprintf("[%s]: %s", p.original.Spec.Containers[i].Name, p.ContainerDebug(p.original.Spec.Containers[i].Name)))
284+
}
285+
state += fmt.Sprintf(", [INIT] %s [CONTAINERS] %s", strings.Join(initContainers, ", "), strings.Join(containers, ", "))
286+
return state
287+
}

pkg/testworkflows/executionworker/controller/watchers/podevents.go

+11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package watchers
22

33
import (
4+
"fmt"
45
"regexp"
56
"strings"
67
"time"
@@ -29,6 +30,7 @@ type PodEvents interface {
2930
Error() bool
3031
ErrorReason() string
3132
ErrorMessage() string
33+
Debug() string
3234

3335
Container(name string) ContainerEvents
3436
}
@@ -187,6 +189,15 @@ func (p *podEvents) ErrorMessage() string {
187189
return ""
188190
}
189191

192+
func (p *podEvents) Debug() string {
193+
firstTs := p.FirstTimestamp()
194+
result := make([]string, len(p.events))
195+
for i := range p.events {
196+
result[i] = fmt.Sprintf("[%.1fs] %s: %s", float64(GetEventTimestamp(p.events[i]).Sub(firstTs))/float64(time.Second), p.events[i].Reason, p.events[i].Message)
197+
}
198+
return strings.Join(result, ", ")
199+
}
200+
190201
func (p *podEvents) Container(name string) ContainerEvents {
191202
if name == "" {
192203
return nil

pkg/testworkflows/executionworker/controller/watchinstrumentedpod.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/kubeshop/testkube/cmd/testworkflow-init/constants"
1212
"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
1313
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
14+
"github.com/kubeshop/testkube/pkg/log"
1415
watchers2 "github.com/kubeshop/testkube/pkg/testworkflows/executionworker/controller/watchers"
1516
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/stage"
1617
)
@@ -20,7 +21,8 @@ const (
2021
)
2122

2223
type WatchInstrumentedPodOptions struct {
23-
DisableFollow bool
24+
DisableFollow bool
25+
LogAbortedDetails bool
2426
}
2527

2628
func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interface, signature []stage.Signature, scheduledAt time.Time, watcher watchers2.ExecutionWatcher, opts WatchInstrumentedPodOptions) (<-chan ChannelMessage[Notification], error) {
@@ -40,6 +42,10 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
4042
notifier.End()
4143
ctxCancel()
4244
close(notifier.ch)
45+
46+
if opts.LogAbortedDetails && notifier.result.IsAborted() {
47+
log.DefaultLogger.Warnw("execution (watch) detected as aborted", "executionId", watcher.State().ResourceId(), "debug", watcher.State().Debug())
48+
}
4349
}()
4450

4551
// Mark Job as started
@@ -264,6 +270,5 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
264270
notifier.Align(watcher.State())
265271
}()
266272

267-
//return notifierProxyCh, nil
268273
return notifier.ch, nil
269274
}

0 commit comments

Comments
 (0)