Skip to content

Commit f0fdb69

Browse files
shuangkunsarabala1979tczhao
authored
feat(CLI-alpha): support backfill for cron workflow. Part of #2706 (#13999)
Signed-off-by: shuangkun <[email protected]> Signed-off-by: shuangkun tian <[email protected]> Co-authored-by: Saravanan Balasubramanian <[email protected]> Co-authored-by: Tianchu Zhao <[email protected]>
1 parent 50219ce commit f0fdb69

File tree

11 files changed

+445
-1
lines changed

11 files changed

+445
-1
lines changed

cmd/argo/commands/cron/backfill.go

+256
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
package cron
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"math"
8+
"os"
9+
"time"
10+
11+
"github.com/argoproj/argo-workflows/v3/workflow/util"
12+
13+
cron "github.com/robfig/cron/v3"
14+
"github.com/spf13/cobra"
15+
"sigs.k8s.io/yaml"
16+
17+
"github.com/argoproj/pkg/rand"
18+
19+
"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/client"
20+
"github.com/argoproj/argo-workflows/v3/pkg/apiclient/cronworkflow"
21+
"github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
22+
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
23+
"github.com/argoproj/argo-workflows/v3/workflow/common"
24+
)
25+
26+
type backfillOpts struct {
27+
cronWfName string
28+
name string
29+
startDate string
30+
endDate string
31+
parallel bool
32+
argName string
33+
dateFormat string
34+
maxWorkflowCount int
35+
}
36+
37+
func NewBackfillCommand() *cobra.Command {
38+
var (
39+
cliOps backfillOpts
40+
)
41+
var command = &cobra.Command{
42+
Use: "backfill cronwf",
43+
Short: "create a cron backfill(new alpha feature)",
44+
RunE: func(cmd *cobra.Command, args []string) error {
45+
if len(args) == 0 {
46+
cmd.HelpFunc()(cmd, args)
47+
os.Exit(0)
48+
}
49+
if cliOps.name == "" {
50+
name, err := rand.RandString(5)
51+
if err != nil {
52+
return err
53+
}
54+
cliOps.name = name
55+
}
56+
57+
cliOps.cronWfName = args[0]
58+
return backfillCronWorkflow(cmd.Context(), args[0], cliOps)
59+
},
60+
}
61+
command.Flags().StringVar(&cliOps.name, "name", "", "Backfill name")
62+
command.Flags().StringVar(&cliOps.startDate, "start", "", "Start date")
63+
command.Flags().StringVar(&cliOps.endDate, "end", "", "End Date")
64+
command.Flags().BoolVar(&cliOps.parallel, "parallel", false, "Enabled all backfile workflows run parallel")
65+
command.Flags().StringVar(&cliOps.argName, "argname", "cronScheduleTime", "Schedule time argument name for workflow")
66+
command.Flags().StringVar(&cliOps.dateFormat, "format", time.RFC1123, "Date format for Schedule time value")
67+
command.Flags().IntVar(&cliOps.maxWorkflowCount, "maxworkflowcount", 1000, "Maximum number of generated backfill workflows")
68+
69+
return command
70+
}
71+
72+
func backfillCronWorkflow(ctx context.Context, cronWFName string, cliOps backfillOpts) error {
73+
if cliOps.startDate == "" {
74+
return fmt.Errorf("Start Date should not be empty")
75+
}
76+
startTime, err := time.Parse(cliOps.dateFormat, cliOps.startDate)
77+
if err != nil {
78+
return err
79+
}
80+
var endTime time.Time
81+
if cliOps.endDate != "" {
82+
endTime, err = time.Parse(cliOps.dateFormat, cliOps.endDate)
83+
if err != nil {
84+
return err
85+
}
86+
} else {
87+
endTime = time.Now()
88+
cliOps.endDate = endTime.Format(time.RFC1123)
89+
}
90+
91+
ctx, apiClient, err := client.NewAPIClient(ctx)
92+
if err != nil {
93+
return err
94+
}
95+
cronClient, err := apiClient.NewCronWorkflowServiceClient()
96+
if err != nil {
97+
return err
98+
}
99+
wfClient := apiClient.NewWorkflowServiceClient()
100+
req := cronworkflow.GetCronWorkflowRequest{
101+
Name: cronWFName,
102+
Namespace: client.Namespace(),
103+
}
104+
cronWF, err := cronClient.GetCronWorkflow(ctx, &req)
105+
if err != nil {
106+
return err
107+
}
108+
cronTab, err := cron.ParseStandard(cronWF.Spec.Schedule)
109+
if err != nil {
110+
return err
111+
}
112+
scheTime := startTime
113+
priority := int32(math.MaxInt32)
114+
var scheList []string
115+
wf := common.ConvertCronWorkflowToWorkflow(cronWF)
116+
paramArg := `{{inputs.parameters.backfillscheduletime}}`
117+
wf.GenerateName = util.GenerateBackfillWorkflowPrefix(cronWF.Name, cliOps.name) + "-"
118+
param := v1alpha1.Parameter{
119+
Name: cliOps.argName,
120+
Value: v1alpha1.AnyStringPtr(paramArg),
121+
}
122+
if !cliOps.parallel {
123+
wf.Spec.Priority = &priority
124+
wf.Spec.Synchronization = &v1alpha1.Synchronization{
125+
Mutex: &v1alpha1.Mutex{Name: cliOps.name},
126+
}
127+
}
128+
wf.Spec.Arguments.Parameters = append(wf.Spec.Arguments.Parameters, param)
129+
for {
130+
scheTime = cronTab.Next(scheTime)
131+
if endTime.Before(scheTime) {
132+
break
133+
}
134+
timeStr := scheTime.String()
135+
scheList = append(scheList, timeStr)
136+
}
137+
wfJsonByte, err := json.Marshal(wf)
138+
if err != nil {
139+
return err
140+
}
141+
yamlbyte, err := yaml.JSONToYAML(wfJsonByte)
142+
if err != nil {
143+
return err
144+
}
145+
wfYamlStr := "apiVersion: argoproj.io/v1alpha1 \n" + string(yamlbyte)
146+
if len(scheList) > 0 {
147+
return CreateMonitorWf(ctx, wfYamlStr, client.Namespace(), cronWFName, scheList, wfClient, cliOps)
148+
} else {
149+
fmt.Print("There is no suitable scheduling time.")
150+
}
151+
return nil
152+
}
153+
154+
const backfillWf = `{
155+
"apiVersion": "argoproj.io/v1alpha1",
156+
"kind": "Workflow",
157+
"metadata": {
158+
"generateName": "backfill-wf-"
159+
},
160+
"spec": {
161+
"entrypoint": "main",
162+
"templates": [
163+
{
164+
"name": "main",
165+
"steps": [
166+
[
167+
{
168+
"name": "create-workflow",
169+
"template": "create-workflow",
170+
"arguments": {
171+
"parameters": [
172+
{
173+
"name": "backfillscheduletime",
174+
"value": "{{item}}"
175+
}
176+
],
177+
"withParam": "{{workflows.parameters.cronscheduletime}}"
178+
}
179+
}
180+
]
181+
]
182+
},
183+
{
184+
"name": "create-workflow",
185+
"inputs": {
186+
"parameters": [
187+
{
188+
"name": "backfillscheduletime"
189+
}
190+
]
191+
},
192+
"resource": {
193+
"successCondition": "status.phase == Succeeded",
194+
"action": "create"
195+
}
196+
}
197+
]
198+
}
199+
}
200+
`
201+
202+
func CreateMonitorWf(ctx context.Context, wf, namespace, cronWFName string, scheTime []string, wfClient workflow.WorkflowServiceClient, cliOps backfillOpts) error {
203+
var monitorWfObj v1alpha1.Workflow
204+
err := json.Unmarshal([]byte(backfillWf), &monitorWfObj)
205+
if monitorWfObj.ObjectMeta.Labels == nil {
206+
monitorWfObj.ObjectMeta.Labels = make(map[string]string)
207+
}
208+
monitorWfObj.ObjectMeta.Labels[common.LabelKeyCronWorkflowBackfill] = cronWFName
209+
if err != nil {
210+
return err
211+
}
212+
TotalScheCount := len(scheTime)
213+
iterCount := int(float64(len(scheTime)/cliOps.maxWorkflowCount)) + 1
214+
startIdx := 0
215+
var endIdx int
216+
var wfNames []string
217+
for i := 0; i < iterCount; i++ {
218+
tmpl := monitorWfObj.GetTemplateByName("create-workflow")
219+
if (TotalScheCount - i*cliOps.maxWorkflowCount) < cliOps.maxWorkflowCount {
220+
endIdx = TotalScheCount
221+
} else {
222+
endIdx = startIdx + cliOps.maxWorkflowCount
223+
}
224+
scheTimeByte, err := json.Marshal(scheTime[startIdx:endIdx])
225+
startIdx = endIdx
226+
if err != nil {
227+
return err
228+
}
229+
tmpl.Resource.Manifest = fmt.Sprint(wf)
230+
stepTmpl := monitorWfObj.GetTemplateByName("main")
231+
stepTmpl.Steps[0].Steps[0].WithParam = string(scheTimeByte)
232+
c, err := wfClient.CreateWorkflow(ctx, &workflow.WorkflowCreateRequest{Namespace: namespace, Workflow: &monitorWfObj})
233+
if err != nil {
234+
return err
235+
}
236+
wfNames = append(wfNames, c.Name)
237+
}
238+
printBackFillOutput(wfNames, len(scheTime), cliOps)
239+
return nil
240+
}
241+
242+
func printBackFillOutput(wfNames []string, totalSches int, cliOps backfillOpts) {
243+
fmt.Printf("Created %s Backfill task for Cronworkflow %s \n", cliOps.name, cliOps.cronWfName)
244+
fmt.Printf("==================================================\n")
245+
fmt.Printf("Backfill Period :\n")
246+
fmt.Printf("Start Time : %s \n", cliOps.startDate)
247+
fmt.Printf(" End Time : %s \n", cliOps.endDate)
248+
fmt.Printf("Total Backfill Schedule: %d \n", totalSches)
249+
fmt.Printf("==================================================\n")
250+
fmt.Printf("Backfill Workflows: \n")
251+
fmt.Printf(" NAMESPACE\t WORKFLOW: \n")
252+
namespace := client.Namespace()
253+
for idx, wfName := range wfNames {
254+
fmt.Printf("%d. %s \t %s \n", idx+1, namespace, wfName)
255+
}
256+
}

cmd/argo/commands/cron/root.go

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func NewCronWorkflowCommand() *cobra.Command {
2222
command.AddCommand(NewSuspendCommand())
2323
command.AddCommand(NewResumeCommand())
2424
command.AddCommand(NewUpdateCommand())
25+
command.AddCommand(NewBackfillCommand())
2526

2627
return command
2728
}

docs/cli/argo_cron.md

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ argo cron [flags]
5454
### SEE ALSO
5555

5656
* [argo](argo.md) - argo is the command line interface to Argo
57+
* [argo cron backfill](argo_cron_backfill.md) - create a cron backfill(new alpha feature)
5758
* [argo cron create](argo_cron_create.md) - create a cron workflow
5859
* [argo cron delete](argo_cron_delete.md) - delete a cron workflow
5960
* [argo cron get](argo_cron_get.md) - display details about a cron workflow

docs/cli/argo_cron_backfill.md

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
## argo cron backfill
2+
3+
create a cron backfill(new alpha feature)
4+
5+
```
6+
argo cron backfill cronwf [flags]
7+
```
8+
9+
### Options
10+
11+
```
12+
--argname string Schedule time argument name for workflow (default "cronScheduleTime")
13+
--end string End Date
14+
--format string Date format for Schedule time value (default "Mon, 02 Jan 2006 15:04:05 MST")
15+
-h, --help help for backfill
16+
--maxworkflowcount int Maximum number of generated backfill workflows (default 1000)
17+
--name string Backfill name
18+
--parallel Enabled all backfile workflows run parallel
19+
--start string Start date
20+
```
21+
22+
### Options inherited from parent commands
23+
24+
```
25+
--argo-base-href string Path to use with HTTP client due to Base HREF. Defaults to the ARGO_BASE_HREF environment variable.
26+
--argo-http1 If true, use the HTTP client. Defaults to the ARGO_HTTP1 environment variable.
27+
-s, --argo-server host:port API server host:port. e.g. localhost:2746. Defaults to the ARGO_SERVER environment variable.
28+
--as string Username to impersonate for the operation
29+
--as-group stringArray Group to impersonate for the operation, this flag can be repeated to specify multiple groups.
30+
--as-uid string UID to impersonate for the operation
31+
--certificate-authority string Path to a cert file for the certificate authority
32+
--client-certificate string Path to a client certificate file for TLS
33+
--client-key string Path to a client key file for TLS
34+
--cluster string The name of the kubeconfig cluster to use
35+
--context string The name of the kubeconfig context to use
36+
--disable-compression If true, opt-out of response compression for all requests to the server
37+
--gloglevel int Set the glog logging level
38+
-H, --header strings Sets additional header to all requests made by Argo CLI. (Can be repeated multiple times to add multiple headers, also supports comma separated headers) Used only when either ARGO_HTTP1 or --argo-http1 is set to true.
39+
--insecure-skip-tls-verify If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure
40+
-k, --insecure-skip-verify If true, the Argo Server's certificate will not be checked for validity. This will make your HTTPS connections insecure. Defaults to the ARGO_INSECURE_SKIP_VERIFY environment variable.
41+
--instanceid string submit with a specific controller's instance id label. Default to the ARGO_INSTANCEID environment variable.
42+
--kubeconfig string Path to a kube config. Only required if out-of-cluster
43+
--loglevel string Set the logging level. One of: debug|info|warn|error (default "info")
44+
-n, --namespace string If present, the namespace scope for this CLI request
45+
--password string Password for basic authentication to the API server
46+
--proxy-url string If provided, this URL will be used to connect via proxy
47+
--request-timeout string The length of time to wait before giving up on a single server request. Non-zero values should contain a corresponding time unit (e.g. 1s, 2m, 3h). A value of zero means don't timeout requests. (default "0")
48+
-e, --secure Whether or not the server is using TLS with the Argo Server. Defaults to the ARGO_SECURE environment variable. (default true)
49+
--server string The address and port of the Kubernetes API server
50+
--tls-server-name string If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used.
51+
--token string Bearer token for authentication to the API server
52+
--user string The name of the kubeconfig user to use
53+
--username string Username for basic authentication to the API server
54+
-v, --verbose Enabled verbose logging, i.e. --loglevel debug
55+
```
56+
57+
### SEE ALSO
58+
59+
* [argo cron](argo_cron.md) - manage cron workflows
60+

mkdocs.yml

+1
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ nav:
202202
- argo completion: cli/argo_completion.md
203203
- argo cp: cli/argo_cp.md
204204
- argo cron: cli/argo_cron.md
205+
- argo cron: cli/argo_cron_backfill.md
205206
- argo cron create: cli/argo_cron_create.md
206207
- argo cron delete: cli/argo_cron_delete.md
207208
- argo cron get: cli/argo_cron_get.md

0 commit comments

Comments
 (0)