Skip to content

Commit bd13b1c

Browse files
authored
fix: correct manual retry logic. Fixes #14124 (#14328)
Signed-off-by: oninowang <[email protected]>
1 parent e75f70d commit bd13b1c

File tree

1 file changed

+51
-117
lines changed

1 file changed

+51
-117
lines changed

workflow/util/util.go

Lines changed: 51 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -870,7 +870,7 @@ func newWorkflowsDag(wf *wfv1.Workflow) ([]*dagNode, error) {
870870
parentNode = nodes[parentWfNode.ID]
871871
}
872872

873-
children := []*dagNode{}
873+
children := make([]*dagNode, 0)
874874

875875
for _, childID := range wfNode.Children {
876876
childNode, ok := nodes[childID]
@@ -883,7 +883,7 @@ func newWorkflowsDag(wf *wfv1.Workflow) ([]*dagNode, error) {
883883
nodes[wfNode.ID].children = children
884884
}
885885

886-
values := []*dagNode{}
886+
values := make([]*dagNode, 0)
887887
for _, v := range nodes {
888888
values = append(values, v)
889889
}
@@ -905,13 +905,17 @@ func singularPath(nodes []*dagNode, toNode string) ([]*dagNode, error) {
905905
}
906906
}
907907

908+
if root == nil {
909+
return nil, fmt.Errorf("was unable to find root")
910+
}
911+
908912
if leaf == nil {
909913
return nil, fmt.Errorf("was unable to find %s", toNode)
910914
}
911915

912916
curr := leaf
913917

914-
reverseNodes := []*dagNode{}
918+
reverseNodes := make([]*dagNode, 0)
915919
for {
916920
reverseNodes = append(reverseNodes, curr)
917921
if curr.n.ID == root.n.ID {
@@ -949,37 +953,32 @@ func getChildren(n *dagNode) map[string]bool {
949953

950954
type resetFn func(string)
951955
type deleteFn func(string)
956+
type matchFn func(*dagNode) bool
952957

953-
// untilFn is a function that returns two variables, the first indicates
954-
// a `found` boolean while the second indicates if reset should be called.
955-
type untilFn func(*dagNode) (bool, bool)
956-
957-
func getUntilFnNodeType(nodeType wfv1.NodeType) untilFn {
958-
return func(n *dagNode) (bool, bool) {
959-
return n.n.Type == nodeType, true
958+
func matchNodeType(nodeType wfv1.NodeType) matchFn {
959+
return func(n *dagNode) bool {
960+
return n.n.Type == nodeType
960961
}
961962
}
962963

963-
func resetUntil(n *dagNode, should untilFn, resetFunc resetFn) (*dagNode, error) {
964+
func resetUntil(n *dagNode, matchFunc matchFn, resetFunc resetFn) (*dagNode, error) {
964965
curr := n
965966
for {
966967
if curr == nil {
967968
return nil, fmt.Errorf("was seeking node but ran out of nodes to explore")
968969
}
969970

970-
if foundNode, shouldReset := should(curr); foundNode {
971-
if shouldReset {
972-
resetFunc(curr.n.ID)
973-
}
971+
if match := matchFunc(curr); match {
972+
resetFunc(curr.n.ID)
974973
return curr, nil
975974
}
976975
curr = curr.parent
977976
}
978977
}
979978

980-
func getTillBoundaryFn(boundaryID string) untilFn {
981-
return func(n *dagNode) (bool, bool) {
982-
return n.n.ID == boundaryID, n.n.BoundaryID != ""
979+
func matchBoundaryID(boundaryID string) matchFn {
980+
return func(n *dagNode) bool {
981+
return n.n.ID == boundaryID
983982
}
984983
}
985984

@@ -989,6 +988,10 @@ func resetBoundaries(n *dagNode, resetFunc resetFn) (*dagNode, error) {
989988
if curr == nil {
990989
return curr, nil
991990
}
991+
if curr.parent != nil && curr.parent.n.Type == wfv1.NodeTypeRetry {
992+
resetFunc(curr.parent.n.ID)
993+
curr = curr.parent
994+
}
992995
if curr.parent != nil && curr.parent.n.Type == wfv1.NodeTypeStepGroup {
993996
resetFunc(curr.parent.n.ID)
994997
}
@@ -997,41 +1000,17 @@ func resetBoundaries(n *dagNode, resetFunc resetFn) (*dagNode, error) {
9971000
return curr.parent, nil
9981001
}
9991002
var err error
1000-
curr, err = resetUntil(curr, getTillBoundaryFn(seekingBoundaryID), resetFunc)
1003+
curr, err = resetUntil(curr, matchBoundaryID(seekingBoundaryID), resetFunc)
10011004
if err != nil {
10021005
return nil, err
10031006
}
10041007
}
10051008
}
10061009

1007-
func resetStepGroup(n *dagNode, resetFunc resetFn) (*dagNode, error) {
1008-
return resetUntil(n, getUntilFnNodeType(wfv1.NodeTypeStepGroup), resetFunc)
1009-
}
1010-
1011-
func resetSteps(n *dagNode, resetFunc resetFn) (*dagNode, error) {
1012-
n, err := resetUntil(n, getUntilFnNodeType(wfv1.NodeTypeSteps), resetFunc)
1013-
if err != nil {
1014-
return nil, err
1015-
}
1016-
return resetBoundaries(n, resetFunc)
1017-
}
1018-
1019-
func resetTaskGroup(n *dagNode, resetFunc resetFn) (*dagNode, error) {
1020-
return resetUntil(n, getUntilFnNodeType(wfv1.NodeTypeTaskGroup), resetFunc)
1021-
}
1022-
1023-
func resetDAG(n *dagNode, resetFunc resetFn) (*dagNode, error) {
1024-
n, err := resetUntil(n, getUntilFnNodeType(wfv1.NodeTypeDAG), resetFunc)
1025-
if err != nil {
1026-
return nil, err
1027-
}
1028-
return resetBoundaries(n, resetFunc)
1029-
}
1030-
10311010
// resetPod is only called in the event a Container was found. This implies that there is a parent pod.
10321011
func resetPod(n *dagNode, resetFunc resetFn, addToDelete deleteFn) (*dagNode, error) {
10331012
// this sets to reset but resets are overridden by deletes in the final FormulateRetryWorkflow logic.
1034-
curr, err := resetUntil(n, getUntilFnNodeType(wfv1.NodeTypePod), resetFunc)
1013+
curr, err := resetUntil(n, matchNodeType(wfv1.NodeTypePod), resetFunc)
10351014
if err != nil {
10361015
return nil, err
10371016
}
@@ -1075,79 +1054,27 @@ func resetPath(allNodes []*dagNode, startNode string) (map[string]bool, map[stri
10751054
nodesToDelete[nodeID] = true
10761055
}
10771056

1078-
var mustFind wfv1.NodeType
1079-
mustFind = ""
1080-
1081-
if curr.n.Type == wfv1.NodeTypeContainer {
1082-
// special case where the retry node is the container of a containerSet
1083-
mustFind = wfv1.NodeTypePod
1084-
}
1085-
1086-
findBoundaries := false
10871057
for curr != nil {
10881058

1089-
switch curr.n.Type {
1090-
case wfv1.NodeTypePod:
1091-
//ignore
1092-
case wfv1.NodeTypeContainer:
1093-
//ignore
1094-
case wfv1.NodeTypeSteps:
1095-
addToReset(curr.n.ID)
1096-
findBoundaries = true
1097-
case wfv1.NodeTypeStepGroup:
1098-
addToReset(curr.n.ID)
1099-
findBoundaries = true
1100-
case wfv1.NodeTypeDAG:
1101-
addToReset(curr.n.ID)
1102-
findBoundaries = true
1103-
case wfv1.NodeTypeTaskGroup:
1104-
addToReset(curr.n.ID)
1105-
findBoundaries = true
1106-
case wfv1.NodeTypeRetry:
1107-
addToReset(curr.n.ID)
1108-
case wfv1.NodeTypeSkipped:
1109-
// ignore -> doesn't make sense to reach this
1110-
case wfv1.NodeTypeSuspend:
1111-
// ignore
1112-
case wfv1.NodeTypeHTTP:
1113-
// ignore
1114-
case wfv1.NodeTypePlugin:
1059+
switch {
1060+
case isGroupNodeType(curr.n.Type):
11151061
addToReset(curr.n.ID)
1116-
}
1117-
1118-
if mustFind == "" && !findBoundaries {
1119-
curr = curr.parent
1120-
continue
1121-
}
1122-
1123-
if findBoundaries {
11241062
curr, err = resetBoundaries(curr, addToReset)
11251063
if err != nil {
11261064
return nil, nil, err
11271065
}
1128-
findBoundaries = false
11291066
continue
1130-
}
1131-
1132-
switch mustFind {
1133-
case wfv1.NodeTypePod:
1067+
case curr.n.Type == wfv1.NodeTypeRetry:
1068+
addToReset(curr.n.ID)
1069+
case curr.n.Type == wfv1.NodeTypeContainer:
11341070
curr, err = resetPod(curr, addToReset, addToDelete)
1135-
case wfv1.NodeTypeSteps:
1136-
curr, err = resetSteps(curr, addToReset)
1137-
case wfv1.NodeTypeStepGroup:
1138-
curr, err = resetStepGroup(curr, addToReset)
1139-
case wfv1.NodeTypeDAG:
1140-
curr, err = resetDAG(curr, addToReset)
1141-
case wfv1.NodeTypeTaskGroup:
1142-
curr, err = resetTaskGroup(curr, addToReset)
1143-
default:
1144-
return nil, nil, fmt.Errorf("invalid mustFind of %s supplied", mustFind)
1145-
}
1146-
mustFind = ""
1147-
if err != nil {
1148-
return nil, nil, err
1071+
if err != nil {
1072+
return nil, nil, err
1073+
}
1074+
continue
11491075
}
11501076

1077+
curr = curr.parent
11511078
}
11521079
return nodesToReset, nodesToDelete, nil
11531080
}
@@ -1166,11 +1093,13 @@ func setUnion[T comparable](m1 map[T]bool, m2 map[T]bool) map[T]bool {
11661093
}
11671094
return res
11681095
}
1169-
func shouldRetryFailedType(nodeTyp wfv1.NodeType) bool {
1170-
if nodeTyp == wfv1.NodeTypePod || nodeTyp == wfv1.NodeTypeContainer {
1171-
return true
1172-
}
1173-
return false
1096+
1097+
func isGroupNodeType(nodeType wfv1.NodeType) bool {
1098+
return nodeType == wfv1.NodeTypeDAG || nodeType == wfv1.NodeTypeTaskGroup || nodeType == wfv1.NodeTypeStepGroup || nodeType == wfv1.NodeTypeSteps
1099+
}
1100+
1101+
func isExecutionNodeType(nodeType wfv1.NodeType) bool {
1102+
return nodeType == wfv1.NodeTypeContainer || nodeType == wfv1.NodeTypePod || nodeType == wfv1.NodeTypeHTTP || nodeType == wfv1.NodeTypePlugin
11741103
}
11751104

11761105
// dagSortedNodes sorts the nodes based on topological order, omits onExitNode
@@ -1237,8 +1166,16 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
12371166

12381167
failed := make(map[string]bool)
12391168
for nodeID, node := range wf.Status.Nodes {
1240-
if node.Phase.FailedOrError() && shouldRetryFailedType(node.Type) && !isDescendantNodeSucceeded(wf, node, deleteNodesMap) {
1241-
failed[nodeID] = true
1169+
if node.FailedOrError() && isExecutionNodeType(node.Type) {
1170+
// Check its parent if current node is retry node
1171+
if node.NodeFlag != nil && node.NodeFlag.Retried {
1172+
node = *wf.Status.Nodes.Find(func(nodeStatus wfv1.NodeStatus) bool {
1173+
return nodeStatus.HasChild(node.ID)
1174+
})
1175+
}
1176+
if !isDescendantNodeSucceeded(wf, node, deleteNodesMap) {
1177+
failed[nodeID] = true
1178+
}
12421179
}
12431180
}
12441181
for failedNode := range failed {
@@ -1291,7 +1228,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
12911228
}
12921229

12931230
for nodeID := range toReset {
1294-
// avoid reseting nodes that are marked for deletion
1231+
// avoid resetting nodes that are marked for deletion
12951232
if in := toDelete[nodeID]; in {
12961233
continue
12971234
}
@@ -1336,9 +1273,6 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
13361273
queue.Remove(currNode)
13371274
}
13381275
}
1339-
if n.Name == wf.Name && !shouldRetryFailedType(n.Type) {
1340-
newWf.Status.Nodes.Set(id, resetNode(*n.DeepCopy()))
1341-
}
13421276
}
13431277
for id, oldWfNode := range wf.Status.Nodes {
13441278

0 commit comments

Comments
 (0)