Skip to content

Commit ae51c3b

Browse files
jswxstwoninowang
authored andcommitted
fix: correct manual retry logic. Fixes #14124
Signed-off-by: oninowang <[email protected]>
1 parent 9b7c0c4 commit ae51c3b

File tree

1 file changed

+42
-116
lines changed

1 file changed

+42
-116
lines changed

workflow/util/util.go

Lines changed: 42 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -870,7 +870,7 @@ func newWorkflowsDag(wf *wfv1.Workflow) ([]*dagNode, error) {
870870
parentNode = nodes[parentWfNode.ID]
871871
}
872872

873-
children := []*dagNode{}
873+
children := make([]*dagNode, 0)
874874

875875
for _, childID := range wfNode.Children {
876876
childNode, ok := nodes[childID]
@@ -883,7 +883,7 @@ func newWorkflowsDag(wf *wfv1.Workflow) ([]*dagNode, error) {
883883
nodes[wfNode.ID].children = children
884884
}
885885

886-
values := []*dagNode{}
886+
values := make([]*dagNode, 0)
887887
for _, v := range nodes {
888888
values = append(values, v)
889889
}
@@ -905,13 +905,17 @@ func singularPath(nodes []*dagNode, toNode string) ([]*dagNode, error) {
905905
}
906906
}
907907

908+
if root == nil {
909+
return nil, fmt.Errorf("was unable to find root")
910+
}
911+
908912
if leaf == nil {
909913
return nil, fmt.Errorf("was unable to find %s", toNode)
910914
}
911915

912916
curr := leaf
913917

914-
reverseNodes := []*dagNode{}
918+
reverseNodes := make([]*dagNode, 0)
915919
for {
916920
reverseNodes = append(reverseNodes, curr)
917921
if curr.n.ID == root.n.ID {
@@ -949,37 +953,32 @@ func getChildren(n *dagNode) map[string]bool {
949953

950954
type resetFn func(string)
951955
type deleteFn func(string)
956+
type matchFn func(*dagNode) bool
952957

953-
// untilFn is a function that returns two variables, the first indicates
954-
// a `found` boolean while the second indicates if reset should be called.
955-
type untilFn func(*dagNode) (bool, bool)
956-
957-
func getUntilFnNodeType(nodeType wfv1.NodeType) untilFn {
958-
return func(n *dagNode) (bool, bool) {
959-
return n.n.Type == nodeType, true
958+
func matchNodeType(nodeType wfv1.NodeType) matchFn {
959+
return func(n *dagNode) bool {
960+
return n.n.Type == nodeType
960961
}
961962
}
962963

963-
func resetUntil(n *dagNode, should untilFn, resetFunc resetFn) (*dagNode, error) {
964+
func resetUntil(n *dagNode, matchFunc matchFn, resetFunc resetFn) (*dagNode, error) {
964965
curr := n
965966
for {
966967
if curr == nil {
967968
return nil, fmt.Errorf("was seeking node but ran out of nodes to explore")
968969
}
969970

970-
if foundNode, shouldReset := should(curr); foundNode {
971-
if shouldReset {
972-
resetFunc(curr.n.ID)
973-
}
971+
if match := matchFunc(curr); match {
972+
resetFunc(curr.n.ID)
974973
return curr, nil
975974
}
976975
curr = curr.parent
977976
}
978977
}
979978

980-
func getTillBoundaryFn(boundaryID string) untilFn {
981-
return func(n *dagNode) (bool, bool) {
982-
return n.n.ID == boundaryID, n.n.BoundaryID != ""
979+
func matchBoundaryID(boundaryID string) matchFn {
980+
return func(n *dagNode) bool {
981+
return n.n.ID == boundaryID
983982
}
984983
}
985984

@@ -989,6 +988,10 @@ func resetBoundaries(n *dagNode, resetFunc resetFn) (*dagNode, error) {
989988
if curr == nil {
990989
return curr, nil
991990
}
991+
if curr.parent != nil && curr.parent.n.Type == wfv1.NodeTypeRetry {
992+
resetFunc(curr.parent.n.ID)
993+
curr = curr.parent
994+
}
992995
if curr.parent != nil && curr.parent.n.Type == wfv1.NodeTypeStepGroup {
993996
resetFunc(curr.parent.n.ID)
994997
}
@@ -997,41 +1000,17 @@ func resetBoundaries(n *dagNode, resetFunc resetFn) (*dagNode, error) {
9971000
return curr.parent, nil
9981001
}
9991002
var err error
1000-
curr, err = resetUntil(curr, getTillBoundaryFn(seekingBoundaryID), resetFunc)
1003+
curr, err = resetUntil(curr, matchBoundaryID(seekingBoundaryID), resetFunc)
10011004
if err != nil {
10021005
return nil, err
10031006
}
10041007
}
10051008
}
10061009

1007-
func resetStepGroup(n *dagNode, resetFunc resetFn) (*dagNode, error) {
1008-
return resetUntil(n, getUntilFnNodeType(wfv1.NodeTypeStepGroup), resetFunc)
1009-
}
1010-
1011-
func resetSteps(n *dagNode, resetFunc resetFn) (*dagNode, error) {
1012-
n, err := resetUntil(n, getUntilFnNodeType(wfv1.NodeTypeSteps), resetFunc)
1013-
if err != nil {
1014-
return nil, err
1015-
}
1016-
return resetBoundaries(n, resetFunc)
1017-
}
1018-
1019-
func resetTaskGroup(n *dagNode, resetFunc resetFn) (*dagNode, error) {
1020-
return resetUntil(n, getUntilFnNodeType(wfv1.NodeTypeTaskGroup), resetFunc)
1021-
}
1022-
1023-
func resetDAG(n *dagNode, resetFunc resetFn) (*dagNode, error) {
1024-
n, err := resetUntil(n, getUntilFnNodeType(wfv1.NodeTypeDAG), resetFunc)
1025-
if err != nil {
1026-
return nil, err
1027-
}
1028-
return resetBoundaries(n, resetFunc)
1029-
}
1030-
10311010
// resetPod is only called in the event a Container was found. This implies that there is a parent pod.
10321011
func resetPod(n *dagNode, resetFunc resetFn, addToDelete deleteFn) (*dagNode, error) {
10331012
// this sets to reset but resets are overridden by deletes in the final FormulateRetryWorkflow logic.
1034-
curr, err := resetUntil(n, getUntilFnNodeType(wfv1.NodeTypePod), resetFunc)
1013+
curr, err := resetUntil(n, matchNodeType(wfv1.NodeTypePod), resetFunc)
10351014
if err != nil {
10361015
return nil, err
10371016
}
@@ -1075,83 +1054,31 @@ func resetPath(allNodes []*dagNode, startNode string) (map[string]bool, map[stri
10751054
nodesToDelete[nodeID] = true
10761055
}
10771056

1078-
var mustFind wfv1.NodeType
1079-
mustFind = ""
1080-
1081-
if curr.n.Type == wfv1.NodeTypeContainer {
1082-
// special case where the retry node is the container of a containerSet
1083-
mustFind = wfv1.NodeTypePod
1084-
}
1085-
1086-
findBoundaries := false
10871057
for {
10881058

10891059
if curr == nil {
10901060
break
10911061
}
10921062

1093-
switch curr.n.Type {
1094-
case wfv1.NodeTypePod:
1095-
//ignore
1096-
case wfv1.NodeTypeContainer:
1097-
//ignore
1098-
case wfv1.NodeTypeSteps:
1099-
addToReset(curr.n.ID)
1100-
findBoundaries = true
1101-
case wfv1.NodeTypeStepGroup:
1102-
addToReset(curr.n.ID)
1103-
findBoundaries = true
1104-
case wfv1.NodeTypeDAG:
1105-
addToReset(curr.n.ID)
1106-
findBoundaries = true
1107-
case wfv1.NodeTypeTaskGroup:
1108-
addToReset(curr.n.ID)
1109-
findBoundaries = true
1110-
case wfv1.NodeTypeRetry:
1063+
switch {
1064+
case isGroupNodeType(curr.n.Type):
11111065
addToReset(curr.n.ID)
1112-
case wfv1.NodeTypeSkipped:
1113-
// ignore -> doesn't make sense to reach this
1114-
case wfv1.NodeTypeSuspend:
1115-
// ignore
1116-
case wfv1.NodeTypeHTTP:
1117-
// ignore
1118-
case wfv1.NodeTypePlugin:
1119-
addToReset(curr.n.ID)
1120-
}
1121-
1122-
if mustFind == "" && !findBoundaries {
1123-
curr = curr.parent
1124-
continue
1125-
}
1126-
1127-
if findBoundaries {
11281066
curr, err = resetBoundaries(curr, addToReset)
11291067
if err != nil {
11301068
return nil, nil, err
11311069
}
1132-
findBoundaries = false
11331070
continue
1134-
}
1135-
1136-
switch mustFind {
1137-
case wfv1.NodeTypePod:
1071+
case curr.n.Type == wfv1.NodeTypeRetry:
1072+
addToReset(curr.n.ID)
1073+
case curr.n.Type == wfv1.NodeTypeContainer:
11381074
curr, err = resetPod(curr, addToReset, addToDelete)
1139-
case wfv1.NodeTypeSteps:
1140-
curr, err = resetSteps(curr, addToReset)
1141-
case wfv1.NodeTypeStepGroup:
1142-
curr, err = resetStepGroup(curr, addToReset)
1143-
case wfv1.NodeTypeDAG:
1144-
curr, err = resetDAG(curr, addToReset)
1145-
case wfv1.NodeTypeTaskGroup:
1146-
curr, err = resetTaskGroup(curr, addToReset)
1147-
default:
1148-
return nil, nil, fmt.Errorf("invalid mustFind of %s supplied", mustFind)
1149-
}
1150-
mustFind = ""
1151-
if err != nil {
1152-
return nil, nil, err
1075+
if err != nil {
1076+
return nil, nil, err
1077+
}
1078+
continue
11531079
}
11541080

1081+
curr = curr.parent
11551082
}
11561083
return nodesToReset, nodesToDelete, nil
11571084
}
@@ -1170,11 +1097,13 @@ func setUnion[T comparable](m1 map[T]bool, m2 map[T]bool) map[T]bool {
11701097
}
11711098
return res
11721099
}
1173-
func shouldRetryFailedType(nodeTyp wfv1.NodeType) bool {
1174-
if nodeTyp == wfv1.NodeTypePod || nodeTyp == wfv1.NodeTypeContainer {
1175-
return true
1176-
}
1177-
return false
1100+
1101+
func isGroupNodeType(nodeType wfv1.NodeType) bool {
1102+
return nodeType == wfv1.NodeTypeDAG || nodeType == wfv1.NodeTypeTaskGroup || nodeType == wfv1.NodeTypeStepGroup || nodeType == wfv1.NodeTypeSteps
1103+
}
1104+
1105+
func isExecutionNodeType(nodeType wfv1.NodeType) bool {
1106+
return nodeType == wfv1.NodeTypeContainer || nodeType == wfv1.NodeTypePod || nodeType == wfv1.NodeTypeHTTP || nodeType == wfv1.NodeTypePlugin
11781107
}
11791108

11801109
// dagSortedNodes sorts the nodes based on topological order, omits onExitNode
@@ -1241,7 +1170,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
12411170

12421171
failed := make(map[string]bool)
12431172
for nodeID, node := range wf.Status.Nodes {
1244-
if node.Phase.FailedOrError() && shouldRetryFailedType(node.Type) && !isDescendantNodeSucceeded(wf, node, deleteNodesMap) {
1173+
if node.FailedOrError() && isExecutionNodeType(node.Type) && !isDescendantNodeSucceeded(wf, node, deleteNodesMap) {
12451174
failed[nodeID] = true
12461175
}
12471176
}
@@ -1295,7 +1224,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
12951224
}
12961225

12971226
for nodeID := range toReset {
1298-
// avoid reseting nodes that are marked for deletion
1227+
// avoid resetting nodes that are marked for deletion
12991228
if in := toDelete[nodeID]; in {
13001229
continue
13011230
}
@@ -1340,9 +1269,6 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
13401269
queue.Remove(currNode)
13411270
}
13421271
}
1343-
if n.Name == wf.Name && !shouldRetryFailedType(n.Type) {
1344-
newWf.Status.Nodes.Set(id, resetNode(*n.DeepCopy()))
1345-
}
13461272
}
13471273
for id, oldWfNode := range wf.Status.Nodes {
13481274

0 commit comments

Comments
 (0)