@@ -870,7 +870,7 @@ func newWorkflowsDag(wf *wfv1.Workflow) ([]*dagNode, error) {
870
870
parentNode = nodes [parentWfNode .ID ]
871
871
}
872
872
873
- children := []* dagNode {}
873
+ children := make ( []* dagNode , 0 )
874
874
875
875
for _ , childID := range wfNode .Children {
876
876
childNode , ok := nodes [childID ]
@@ -883,7 +883,7 @@ func newWorkflowsDag(wf *wfv1.Workflow) ([]*dagNode, error) {
883
883
nodes [wfNode .ID ].children = children
884
884
}
885
885
886
- values := []* dagNode {}
886
+ values := make ( []* dagNode , 0 )
887
887
for _ , v := range nodes {
888
888
values = append (values , v )
889
889
}
@@ -905,13 +905,17 @@ func singularPath(nodes []*dagNode, toNode string) ([]*dagNode, error) {
905
905
}
906
906
}
907
907
908
+ if root == nil {
909
+ return nil , fmt .Errorf ("was unable to find root" )
910
+ }
911
+
908
912
if leaf == nil {
909
913
return nil , fmt .Errorf ("was unable to find %s" , toNode )
910
914
}
911
915
912
916
curr := leaf
913
917
914
- reverseNodes := []* dagNode {}
918
+ reverseNodes := make ( []* dagNode , 0 )
915
919
for {
916
920
reverseNodes = append (reverseNodes , curr )
917
921
if curr .n .ID == root .n .ID {
@@ -949,37 +953,32 @@ func getChildren(n *dagNode) map[string]bool {
949
953
950
954
type resetFn func (string )
951
955
type deleteFn func (string )
956
+ type matchFn func (* dagNode ) bool
952
957
953
- // untilFn is a function that returns two variables, the first indicates
954
- // a `found` boolean while the second indicates if reset should be called.
955
- type untilFn func (* dagNode ) (bool , bool )
956
-
957
- func getUntilFnNodeType (nodeType wfv1.NodeType ) untilFn {
958
- return func (n * dagNode ) (bool , bool ) {
959
- return n .n .Type == nodeType , true
958
+ func matchNodeType (nodeType wfv1.NodeType ) matchFn {
959
+ return func (n * dagNode ) bool {
960
+ return n .n .Type == nodeType
960
961
}
961
962
}
962
963
963
- func resetUntil (n * dagNode , should untilFn , resetFunc resetFn ) (* dagNode , error ) {
964
+ func resetUntil (n * dagNode , matchFunc matchFn , resetFunc resetFn ) (* dagNode , error ) {
964
965
curr := n
965
966
for {
966
967
if curr == nil {
967
968
return nil , fmt .Errorf ("was seeking node but ran out of nodes to explore" )
968
969
}
969
970
970
- if foundNode , shouldReset := should (curr ); foundNode {
971
- if shouldReset {
972
- resetFunc (curr .n .ID )
973
- }
971
+ if match := matchFunc (curr ); match {
972
+ resetFunc (curr .n .ID )
974
973
return curr , nil
975
974
}
976
975
curr = curr .parent
977
976
}
978
977
}
979
978
980
- func getTillBoundaryFn (boundaryID string ) untilFn {
981
- return func (n * dagNode ) ( bool , bool ) {
982
- return n .n .ID == boundaryID , n . n . BoundaryID != ""
979
+ func matchBoundaryID (boundaryID string ) matchFn {
980
+ return func (n * dagNode ) bool {
981
+ return n .n .ID == boundaryID
983
982
}
984
983
}
985
984
@@ -989,6 +988,10 @@ func resetBoundaries(n *dagNode, resetFunc resetFn) (*dagNode, error) {
989
988
if curr == nil {
990
989
return curr , nil
991
990
}
991
+ if curr .parent != nil && curr .parent .n .Type == wfv1 .NodeTypeRetry {
992
+ resetFunc (curr .parent .n .ID )
993
+ curr = curr .parent
994
+ }
992
995
if curr .parent != nil && curr .parent .n .Type == wfv1 .NodeTypeStepGroup {
993
996
resetFunc (curr .parent .n .ID )
994
997
}
@@ -997,41 +1000,17 @@ func resetBoundaries(n *dagNode, resetFunc resetFn) (*dagNode, error) {
997
1000
return curr .parent , nil
998
1001
}
999
1002
var err error
1000
- curr , err = resetUntil (curr , getTillBoundaryFn (seekingBoundaryID ), resetFunc )
1003
+ curr , err = resetUntil (curr , matchBoundaryID (seekingBoundaryID ), resetFunc )
1001
1004
if err != nil {
1002
1005
return nil , err
1003
1006
}
1004
1007
}
1005
1008
}
1006
1009
1007
- func resetStepGroup (n * dagNode , resetFunc resetFn ) (* dagNode , error ) {
1008
- return resetUntil (n , getUntilFnNodeType (wfv1 .NodeTypeStepGroup ), resetFunc )
1009
- }
1010
-
1011
- func resetSteps (n * dagNode , resetFunc resetFn ) (* dagNode , error ) {
1012
- n , err := resetUntil (n , getUntilFnNodeType (wfv1 .NodeTypeSteps ), resetFunc )
1013
- if err != nil {
1014
- return nil , err
1015
- }
1016
- return resetBoundaries (n , resetFunc )
1017
- }
1018
-
1019
- func resetTaskGroup (n * dagNode , resetFunc resetFn ) (* dagNode , error ) {
1020
- return resetUntil (n , getUntilFnNodeType (wfv1 .NodeTypeTaskGroup ), resetFunc )
1021
- }
1022
-
1023
- func resetDAG (n * dagNode , resetFunc resetFn ) (* dagNode , error ) {
1024
- n , err := resetUntil (n , getUntilFnNodeType (wfv1 .NodeTypeDAG ), resetFunc )
1025
- if err != nil {
1026
- return nil , err
1027
- }
1028
- return resetBoundaries (n , resetFunc )
1029
- }
1030
-
1031
1010
// resetPod is only called in the event a Container was found. This implies that there is a parent pod.
1032
1011
func resetPod (n * dagNode , resetFunc resetFn , addToDelete deleteFn ) (* dagNode , error ) {
1033
1012
// this sets to reset but resets are overridden by deletes in the final FormulateRetryWorkflow logic.
1034
- curr , err := resetUntil (n , getUntilFnNodeType (wfv1 .NodeTypePod ), resetFunc )
1013
+ curr , err := resetUntil (n , matchNodeType (wfv1 .NodeTypePod ), resetFunc )
1035
1014
if err != nil {
1036
1015
return nil , err
1037
1016
}
@@ -1075,79 +1054,27 @@ func resetPath(allNodes []*dagNode, startNode string) (map[string]bool, map[stri
1075
1054
nodesToDelete [nodeID ] = true
1076
1055
}
1077
1056
1078
- var mustFind wfv1.NodeType
1079
- mustFind = ""
1080
-
1081
- if curr .n .Type == wfv1 .NodeTypeContainer {
1082
- // special case where the retry node is the container of a containerSet
1083
- mustFind = wfv1 .NodeTypePod
1084
- }
1085
-
1086
- findBoundaries := false
1087
1057
for curr != nil {
1088
1058
1089
- switch curr .n .Type {
1090
- case wfv1 .NodeTypePod :
1091
- //ignore
1092
- case wfv1 .NodeTypeContainer :
1093
- //ignore
1094
- case wfv1 .NodeTypeSteps :
1095
- addToReset (curr .n .ID )
1096
- findBoundaries = true
1097
- case wfv1 .NodeTypeStepGroup :
1098
- addToReset (curr .n .ID )
1099
- findBoundaries = true
1100
- case wfv1 .NodeTypeDAG :
1101
- addToReset (curr .n .ID )
1102
- findBoundaries = true
1103
- case wfv1 .NodeTypeTaskGroup :
1104
- addToReset (curr .n .ID )
1105
- findBoundaries = true
1106
- case wfv1 .NodeTypeRetry :
1107
- addToReset (curr .n .ID )
1108
- case wfv1 .NodeTypeSkipped :
1109
- // ignore -> doesn't make sense to reach this
1110
- case wfv1 .NodeTypeSuspend :
1111
- // ignore
1112
- case wfv1 .NodeTypeHTTP :
1113
- // ignore
1114
- case wfv1 .NodeTypePlugin :
1059
+ switch {
1060
+ case isGroupNodeType (curr .n .Type ):
1115
1061
addToReset (curr .n .ID )
1116
- }
1117
-
1118
- if mustFind == "" && ! findBoundaries {
1119
- curr = curr .parent
1120
- continue
1121
- }
1122
-
1123
- if findBoundaries {
1124
1062
curr , err = resetBoundaries (curr , addToReset )
1125
1063
if err != nil {
1126
1064
return nil , nil , err
1127
1065
}
1128
- findBoundaries = false
1129
1066
continue
1130
- }
1131
-
1132
- switch mustFind {
1133
- case wfv1 .NodeTypePod :
1067
+ case curr .n .Type == wfv1 .NodeTypeRetry :
1068
+ addToReset (curr .n .ID )
1069
+ case curr .n .Type == wfv1 .NodeTypeContainer :
1134
1070
curr , err = resetPod (curr , addToReset , addToDelete )
1135
- case wfv1 .NodeTypeSteps :
1136
- curr , err = resetSteps (curr , addToReset )
1137
- case wfv1 .NodeTypeStepGroup :
1138
- curr , err = resetStepGroup (curr , addToReset )
1139
- case wfv1 .NodeTypeDAG :
1140
- curr , err = resetDAG (curr , addToReset )
1141
- case wfv1 .NodeTypeTaskGroup :
1142
- curr , err = resetTaskGroup (curr , addToReset )
1143
- default :
1144
- return nil , nil , fmt .Errorf ("invalid mustFind of %s supplied" , mustFind )
1145
- }
1146
- mustFind = ""
1147
- if err != nil {
1148
- return nil , nil , err
1071
+ if err != nil {
1072
+ return nil , nil , err
1073
+ }
1074
+ continue
1149
1075
}
1150
1076
1077
+ curr = curr .parent
1151
1078
}
1152
1079
return nodesToReset , nodesToDelete , nil
1153
1080
}
@@ -1166,11 +1093,13 @@ func setUnion[T comparable](m1 map[T]bool, m2 map[T]bool) map[T]bool {
1166
1093
}
1167
1094
return res
1168
1095
}
1169
- func shouldRetryFailedType (nodeTyp wfv1.NodeType ) bool {
1170
- if nodeTyp == wfv1 .NodeTypePod || nodeTyp == wfv1 .NodeTypeContainer {
1171
- return true
1172
- }
1173
- return false
1096
+
1097
+ func isGroupNodeType (nodeType wfv1.NodeType ) bool {
1098
+ return nodeType == wfv1 .NodeTypeDAG || nodeType == wfv1 .NodeTypeTaskGroup || nodeType == wfv1 .NodeTypeStepGroup || nodeType == wfv1 .NodeTypeSteps
1099
+ }
1100
+
1101
+ func isExecutionNodeType (nodeType wfv1.NodeType ) bool {
1102
+ return nodeType == wfv1 .NodeTypeContainer || nodeType == wfv1 .NodeTypePod || nodeType == wfv1 .NodeTypeHTTP || nodeType == wfv1 .NodeTypePlugin
1174
1103
}
1175
1104
1176
1105
// dagSortedNodes sorts the nodes based on topological order, omits onExitNode
@@ -1237,8 +1166,16 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
1237
1166
1238
1167
failed := make (map [string ]bool )
1239
1168
for nodeID , node := range wf .Status .Nodes {
1240
- if node .Phase .FailedOrError () && shouldRetryFailedType (node .Type ) && ! isDescendantNodeSucceeded (wf , node , deleteNodesMap ) {
1241
- failed [nodeID ] = true
1169
+ if node .FailedOrError () && isExecutionNodeType (node .Type ) {
1170
+ // Check its parent if current node is retry node
1171
+ if node .NodeFlag != nil && node .NodeFlag .Retried {
1172
+ node = * wf .Status .Nodes .Find (func (nodeStatus wfv1.NodeStatus ) bool {
1173
+ return nodeStatus .HasChild (node .ID )
1174
+ })
1175
+ }
1176
+ if ! isDescendantNodeSucceeded (wf , node , deleteNodesMap ) {
1177
+ failed [nodeID ] = true
1178
+ }
1242
1179
}
1243
1180
}
1244
1181
for failedNode := range failed {
@@ -1291,7 +1228,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
1291
1228
}
1292
1229
1293
1230
for nodeID := range toReset {
1294
- // avoid reseting nodes that are marked for deletion
1231
+ // avoid resetting nodes that are marked for deletion
1295
1232
if in := toDelete [nodeID ]; in {
1296
1233
continue
1297
1234
}
@@ -1336,9 +1273,6 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
1336
1273
queue .Remove (currNode )
1337
1274
}
1338
1275
}
1339
- if n .Name == wf .Name && ! shouldRetryFailedType (n .Type ) {
1340
- newWf .Status .Nodes .Set (id , resetNode (* n .DeepCopy ()))
1341
- }
1342
1276
}
1343
1277
for id , oldWfNode := range wf .Status .Nodes {
1344
1278
0 commit comments