Skip to content

Commit e72207b

Browse files
committed
feat: dependency
1 parent 6f6e944 commit e72207b

File tree

9 files changed

+133
-29
lines changed

9 files changed

+133
-29
lines changed

internal/calloc/CmdArgParser.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ var (
3737
FlagGres string
3838
FlagNodelist string
3939
FlagExcludes string
40+
FlagDependency string
4041

4142
FlagConfigFilePath string
4243
FlagDebugLevel string
@@ -76,4 +77,5 @@ func init() {
7677
RootCmd.Flags().StringVar(&FlagGres, "gres", "", "Gres required per task,format: \"gpu:a100:1\" or \"gpu:1\"")
7778
RootCmd.Flags().StringVarP(&FlagNodelist, "nodelist", "w", "", "Nodes to be allocated to the job (commas separated list)")
7879
RootCmd.Flags().StringVarP(&FlagExcludes, "exclude", "x", "", "Exclude specific nodes from allocating (commas separated list)")
80+
RootCmd.Flags().StringVarP(&FlagDependency, "dependency", "d", "", "Conditions for job to execute")
7981
}

internal/calloc/calloc.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,14 @@ func main(cmd *cobra.Command, args []string) {
433433
if FlagExcludes != "" {
434434
task.Excludes = FlagExcludes
435435
}
436+
437+
if FlagDependency != "" {
438+
err := util.SetTaskDependencies(task, FlagDependency)
439+
if err != nil {
440+
log.Fatal(err)
441+
}
442+
}
443+
436444
task.Resources.AllocatableRes.CpuCoreLimit = task.CpusPerTask * float64(task.NtasksPerNode)
437445
if task.Resources.AllocatableRes.CpuCoreLimit > 1e6 {
438446
log.Fatalf("request too many cpus: %f", task.Resources.AllocatableRes.CpuCoreLimit)

internal/cbatch/CmdArgParser.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ var (
5252
FlagMailType string
5353
FlagMailUser string
5454

55+
FlagDependency string
56+
5557
RootCmd = &cobra.Command{
5658
Use: "cbatch [flags] file",
5759
Short: "Submit batch job",
@@ -128,4 +130,5 @@ func init() {
128130
RootCmd.Flags().StringVar(&FlagMailType, "mail-type", "", "Notify user by mail when certain events occur, supported values: NONE, BEGIN, END, FAIL, ALL (default is NONE)")
129131
RootCmd.Flags().StringVar(&FlagMailUser, "mail-user", "", "Mail address of the notification receiver")
130132
RootCmd.Flags().BoolVar(&FlagJson, "json", false, "Output in JSON format")
133+
RootCmd.Flags().StringVarP(&FlagDependency, "dependency", "d", "", "Conditions for job to execute")
131134
}

internal/cbatch/cbatch.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ func ProcessCbatchArgs(cmd *cobra.Command, args []CbatchArg) (bool, *protos.Task
139139
task.MailType = mailType
140140
case "--mail-user":
141141
task.MailUser = arg.val
142+
case "--dependency", "-d":
143+
err := util.SetTaskDependencies(task, arg.val)
144+
if err != nil {
145+
log.Error(err)
146+
return false, nil
147+
}
142148
default:
143149
log.Errorf("Invalid parameter '%s' given in the script file.\n", arg.name)
144150
return false, nil
@@ -224,6 +230,13 @@ func ProcessCbatchArgs(cmd *cobra.Command, args []CbatchArg) (bool, *protos.Task
224230
if FlagMailUser != "" {
225231
task.MailUser = FlagMailUser
226232
}
233+
if FlagDependency != "" {
234+
err := util.SetTaskDependencies(task, FlagDependency)
235+
if err != nil {
236+
log.Error(err)
237+
return false, nil
238+
}
239+
}
227240

228241
// Check the validity of the parameters
229242
if task.CpusPerTask <= 0 {
@@ -260,6 +273,17 @@ func ProcessCbatchArgs(cmd *cobra.Command, args []CbatchArg) (bool, *protos.Task
260273
return false, nil
261274
}
262275

276+
if task.Dependencies != nil {
277+
taskIds := make(map[uint32]bool)
278+
for _, dep := range task.Dependencies.Dependencies {
279+
if taskIds[dep.TaskId] {
280+
log.Errorf("Duplicate task #%d in dependencies\n", dep.TaskId)
281+
return false, nil
282+
}
283+
taskIds[dep.TaskId] = true
284+
}
285+
}
286+
263287
if len(task.Name) > 30 {
264288
task.Name = task.Name[:30]
265289
log.Warnf("Job name exceeds 30 characters, trimmed to %v.\n", task.Name)

internal/crun/CmdArgParser.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ var (
3737
FlagNodelist string
3838
FlagExcludes string
3939
FlagGres string
40+
FlagDependency string
4041

4142
FlagConfigFilePath string
4243
FlagDebugLevel string
@@ -75,4 +76,5 @@ func init() {
7576
RootCmd.Flags().StringVarP(&FlagQos, "qos", "q", "", "QoS used for the job")
7677
RootCmd.Flags().StringVarP(&FlagNodelist, "nodelist", "w", "", "Nodes to be allocated to the job (commas separated list)")
7778
RootCmd.Flags().StringVarP(&FlagExcludes, "exclude", "x", "", "Exclude specific nodes from allocating (commas separated list)")
79+
RootCmd.Flags().StringVarP(&FlagDependency, "dependency", "d", "", "Conditions for job to execute")
7880
}

internal/crun/crun.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,14 @@ func MainCrun(cmd *cobra.Command, args []string) {
623623
if FlagExcludes != "" {
624624
task.Excludes = FlagExcludes
625625
}
626+
627+
if FlagDependency != "" {
628+
err := util.SetTaskDependencies(task, FlagDependency)
629+
if err != nil {
630+
log.Fatal(err)
631+
}
632+
}
633+
626634
task.Resources.AllocatableRes.CpuCoreLimit = task.CpusPerTask * float64(task.NtasksPerNode)
627635
if task.Resources.AllocatableRes.CpuCoreLimit > 1e6 {
628636
log.Fatalf("request too many cpus: %f", task.Resources.AllocatableRes.CpuCoreLimit)

internal/util/string.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,3 +520,58 @@ func ParseGres(gres string) *protos.DeviceMap {
520520

521521
return result
522522
}
523+
524+
func SetTaskDependencies(task *protos.TaskToCtld, depStr string) error {
525+
if strings.Contains(depStr, ",") && strings.Contains(depStr, "?") {
526+
return fmt.Errorf("cannot use both ',' and '?' in the dependency string")
527+
}
528+
sep := ","
529+
if strings.Contains(depStr, "?") {
530+
sep = "?"
531+
}
532+
depend_all := (sep == ",")
533+
if task.Dependencies != nil && depend_all != task.Dependencies.DependAll {
534+
return fmt.Errorf("cannot merge dependency with different dependency types(, and ?)")
535+
}
536+
if task.Dependencies == nil {
537+
task.Dependencies = &protos.Dependencies{
538+
DependAll: depend_all,
539+
}
540+
}
541+
542+
depStr = strings.TrimSpace(depStr)
543+
depStrList := strings.Split(depStr, sep)
544+
for _, subDepStr := range depStrList {
545+
dependencies := strings.Split(subDepStr, ":")
546+
if len(dependencies) < 2 {
547+
return fmt.Errorf("unrecognized dependency string: %s", subDepStr)
548+
}
549+
condition := new(protos.DependencyType)
550+
switch dependencies[0] {
551+
case "after":
552+
*condition = protos.DependencyType_AFTER
553+
case "afterok":
554+
*condition = protos.DependencyType_AFTER_OK
555+
case "afternotok":
556+
*condition = protos.DependencyType_AFTER_NOT_OK
557+
case "afterany":
558+
*condition = protos.DependencyType_AFTER_ANY
559+
default:
560+
return fmt.Errorf("unrecognized dependency type: %s", dependencies[0])
561+
}
562+
for _, dep := range dependencies[1:] {
563+
taskId, err := strconv.ParseUint(dep, 10, 32)
564+
if err != nil {
565+
return fmt.Errorf("invalid task id: %s", dep)
566+
}
567+
task.Dependencies.Dependencies = append(task.Dependencies.Dependencies, &protos.DependencyCondition{
568+
TaskId: uint32(taskId),
569+
Type: *condition,
570+
})
571+
}
572+
}
573+
if len(task.Dependencies.Dependencies) > 50 {
574+
return fmt.Errorf("dependency count should be no more than 50")
575+
}
576+
return nil
577+
}

protos/Crane.proto

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -542,12 +542,11 @@ message StreamCtldReply {
542542
}
543543
}
544544

545-
message QueryActualGresRequest{
546-
}
545+
message QueryActualDresRequest{}
547546

548-
message QueryActualGresReply{
547+
message QueryActualDresReply{
549548
bool ok = 1;
550-
DedicatedResource dedicated_resource = 2;
549+
DedicatedResourceInNode dres_in_node = 2;
551550
}
552551

553552
message StreamCrunRequest{
@@ -738,7 +737,7 @@ service Craned {
738737
rpc CreateCgroupForTasks(CreateCgroupForTasksRequest) returns(CreateCgroupForTasksReply);
739738
rpc ReleaseCgroupForTasks(ReleaseCgroupForTasksRequest) returns(ReleaseCgroupForTasksReply);
740739

741-
rpc QueryActualGres(QueryActualGresRequest) returns(QueryActualGresReply);
740+
rpc QueryActualDres(QueryActualDresRequest) returns(QueryActualDresReply);
742741
/*
743742
If the task is an interactive task, the resource uuid is also revoked.
744743
If there's no process in this interactive task, just deallocate all the resources.

protos/PublicDefs.proto

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -66,34 +66,11 @@ message ResourceV2 {
6666
map <string /*craned id*/, ResourceInNode> each_node_res = 1;
6767
}
6868

69-
// Old implementation
70-
message DedicatedResource {
71-
map <string /*craned id*/, DedicatedResourceInNode> each_node_gres = 1;
72-
}
73-
7469
message ResourceView {
7570
AllocatableResource allocatable_res = 1;
7671
DeviceMap device_map = 2;
7772
}
7873

79-
// Old implementation
80-
message Resources {
81-
AllocatableResource allocatable_resource = 1;
82-
83-
oneof dedicated_resource {
84-
// After the task is scheduled, some missing device
85-
// specification is fulfilled by the scheduler and
86-
// a detailed specification of dedicated resources is generated.
87-
// Such a detailed form is sent to Craned for actual device allocation.
88-
DedicatedResource actual_dedicated_resource = 2;
89-
90-
// The form of dedicated resource presented by user
91-
// is abstracted by DeviceMap.
92-
// DeviceMap is used between front end and CraneCtld.
93-
DeviceMap dedicated_resource_req = 3;
94-
}
95-
}
96-
9774
enum PartitionState {
9875
PARTITION_UP = 0;
9976
PARTITION_DOWN = 1;
@@ -132,6 +109,23 @@ enum InteractiveTaskType {
132109
Crun = 1;
133110
}
134111

112+
enum DependencyType {
113+
AFTER = 0;
114+
AFTER_ANY = 1;
115+
AFTER_OK = 2;
116+
AFTER_NOT_OK = 3;
117+
}
118+
119+
message DependencyCondition {
120+
uint32 task_id = 1;
121+
DependencyType type = 2;
122+
}
123+
124+
message Dependencies{
125+
repeated DependencyCondition dependencies = 1;
126+
bool depend_all = 2;
127+
}
128+
135129
message TaskToCtld {
136130
/* -------- Fields that are set at the submission time. ------- */
137131
google.protobuf.Duration time_limit = 1;
@@ -151,6 +145,8 @@ message TaskToCtld {
151145

152146
bool requeue_if_failed = 12;
153147
bool get_user_env = 13;
148+
149+
Dependencies dependencies = 14;
154150

155151
oneof payload {
156152
BatchTaskAdditionalMeta batch_meta = 21;
@@ -184,13 +180,19 @@ message RuntimeAttrOfTask {
184180
int32 requeue_count = 11;
185181
repeated string craned_ids = 12;
186182
TaskStatus status = 13;
187-
uint32 exit_code = 14;
183+
uint32 exit_code = 14;;
188184

189185
google.protobuf.Timestamp submit_time = 15;
190186
google.protobuf.Timestamp start_time = 16;
191187
google.protobuf.Timestamp end_time = 17;
192188

193189
bool held = 18;
190+
ResourceV2 resources = 19;
191+
bool dependency_ok = 20;
192+
// If this task depends all dependencies, store satisfied dependencies.
193+
// If this task depends any dependency, store unsatisfied dependencies.
194+
// TaskId must be stored in order to restore.
195+
repeated uint32 dependency_ids = 21;
194196
}
195197

196198
message TaskToD {
@@ -269,6 +271,7 @@ message TaskInfo {
269271
ResourceView res_view = 17;
270272

271273
// Dynamic task information
274+
uint32 dependency_state = 29;
272275
bool held = 30;
273276
TaskStatus status = 31;
274277

0 commit comments

Comments
 (0)