8
8
9
9
from mlcomp .db .enums import DagType , ComponentType , TaskStatus
10
10
from mlcomp .db .models import Computer
11
+ from mlcomp .db .providers import \
12
+ ComputerProvider , \
13
+ TaskProvider , \
14
+ StepProvider , \
15
+ ProjectProvider
11
16
from mlcomp .utils .io import yaml_load
12
17
from mlcomp .utils .logging import create_logger
13
- from mlcomp .db .providers import *
14
18
from multiprocessing import cpu_count
15
19
16
20
from mlcomp .utils .settings import ROOT_FOLDER , DATA_FOLDER , MODEL_FOLDER
@@ -26,25 +30,26 @@ def _dag(config: str, debug: bool = False):
26
30
27
31
type_name = config_parsed ['info' ].get ('type' , "standard" )
28
32
if type_name == DagType .Standard .name .lower ():
29
- return dag_standard (config_parsed ,
30
- debug = debug ,
31
- config_text = config_text )
33
+ return dag_standard (
34
+ config_parsed , debug = debug , config_text = config_text
35
+ )
32
36
33
- return dag_pipe (config_parsed ,
34
- config_text = config_text )
37
+ return dag_pipe (config_parsed , config_text = config_text )
35
38
36
39
37
40
def _create_computer ():
38
41
tot_m , used_m , free_m = memory ()
39
42
tot_d , used_d , free_d = disk (ROOT_FOLDER )
40
- computer = Computer (name = socket .gethostname (),
41
- gpu = len (GPUtil .getGPUs ()),
42
- cpu = cpu_count (), memory = tot_m ,
43
- ip = os .getenv ('IP' ),
44
- port = int (os .getenv ('PORT' )),
45
- user = os .getenv ('USER' ),
46
- disk = tot_d
47
- )
43
+ computer = Computer (
44
+ name = socket .gethostname (),
45
+ gpu = len (GPUtil .getGPUs ()),
46
+ cpu = cpu_count (),
47
+ memory = tot_m ,
48
+ ip = os .getenv ('IP' ),
49
+ port = int (os .getenv ('PORT' )),
50
+ user = os .getenv ('USER' ),
51
+ disk = tot_d
52
+ )
48
53
ComputerProvider ().create_or_update (computer , 'name' )
49
54
50
55
@@ -72,16 +77,15 @@ def execute(config: str, debug: bool):
72
77
provider = TaskProvider ()
73
78
step_provider = StepProvider ()
74
79
75
- for t in provider .by_status (TaskStatus .InProgress ,
76
- worker_index = worker_index ):
80
+ for t in provider .by_status (
81
+ TaskStatus .InProgress , worker_index = worker_index
82
+ ):
77
83
step = step_provider .last_for_task (t .id )
78
84
logger .error (
79
85
f'Task Id = { t .id } was in InProgress state '
80
86
f'when another tasks arrived to the same worker' ,
81
- ComponentType .Worker ,
82
- t .computer_assigned ,
83
- t .id ,
84
- step )
87
+ ComponentType .Worker , t .computer_assigned , t .id , step
88
+ )
85
89
provider .change_status (t , TaskStatus .Failed )
86
90
87
91
# Create dag
@@ -93,12 +97,16 @@ def execute(config: str, debug: bool):
93
97
94
98
@main .command ()
95
99
@click .option ('--computer' , help = 'sync computer with all the others' )
96
- @click .option ('--only_from' ,
97
- is_flag = True ,
98
- help = 'only copy files from the computer to all the others' )
99
- @click .option ('--only_to' ,
100
- is_flag = True ,
101
- help = 'only copy files from all the others to the computer' )
100
+ @click .option (
101
+ '--only_from' ,
102
+ is_flag = True ,
103
+ help = 'only copy files from the computer to all the others'
104
+ )
105
+ @click .option (
106
+ '--only_to' ,
107
+ is_flag = True ,
108
+ help = 'only copy files from all the others to the computer'
109
+ )
102
110
def sync (computer : str , only_from : bool , only_to : bool ):
103
111
computer = computer or socket .gethostname ()
104
112
provider = ComputerProvider ()
0 commit comments