Skip to content

Commit 0b4e880

Browse files
committed
20.2.4b
1 parent 2dc592e commit 0b4e880

File tree

10 files changed

+247
-14
lines changed

10 files changed

+247
-14
lines changed

mlcomp/__init__.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
LOG_FOLDER = join(ROOT_FOLDER, 'logs')
1919
CONFIG_FOLDER = join(ROOT_FOLDER, 'configs')
2020
DB_FOLDER = join(ROOT_FOLDER, 'db')
21+
REPORT_FOLDER = join(ROOT_FOLDER, 'report')
2122
TMP_FOLDER = join(ROOT_FOLDER, 'tmp')
2223

2324
os.makedirs(ROOT_FOLDER, exist_ok=True)
@@ -27,6 +28,7 @@
2728
os.makedirs(LOG_FOLDER, exist_ok=True)
2829
os.makedirs(CONFIG_FOLDER, exist_ok=True)
2930
os.makedirs(DB_FOLDER, exist_ok=True)
31+
os.makedirs(REPORT_FOLDER, exist_ok=True)
3032
os.makedirs(TMP_FOLDER, exist_ok=True)
3133

3234
# copy conf files if they do not exist
@@ -118,5 +120,5 @@
118120
'FILE_LOG_LEVEL', 'DB_TYPE', 'SA_CONNECTION_STRING', 'FLASK_ENV',
119121
'DOCKER_MAIN', 'IP', 'PORT', 'LOG_NAME', 'WORKER_USAGE_INTERVAL',
120122
'FILE_SYNC_INTERVAL', 'INSTALL_DEPENDENCIES', 'SYNC_WITH_THIS_COMPUTER',
121-
'CAN_PROCESS_TASKS', 'TMP_FOLDER', 'CONTOUR_FILE'
123+
'CAN_PROCESS_TASKS', 'TMP_FOLDER', 'CONTOUR_FILE', 'REPORT_FOLDER'
122124
]

mlcomp/__main__.py

+18-8
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77

88
import torch
99

10+
from mlcomp.utils.io import yaml_load, yaml_dump
1011
from mlcomp.contrib.search.grid import grid_cells
1112
from mlcomp.migration.manage import migrate as _migrate
1213
from mlcomp import ROOT_FOLDER, IP, PORT, \
13-
WORKER_INDEX, SYNC_WITH_THIS_COMPUTER, CAN_PROCESS_TASKS
14+
WORKER_INDEX, SYNC_WITH_THIS_COMPUTER, CAN_PROCESS_TASKS, CONFIG_FOLDER
1415
from mlcomp.db.core import Session
1516
from mlcomp.db.enums import DagType, ComponentType, TaskStatus
1617
from mlcomp.db.models import Computer
@@ -19,12 +20,13 @@
1920
TaskProvider, \
2021
StepProvider, \
2122
ProjectProvider
23+
from mlcomp.report import create_report, check_statuses
2224
from mlcomp.utils.config import merge_dicts_smart, dict_from_list_str
23-
from mlcomp.utils.io import yaml_load, yaml_dump
2425
from mlcomp.utils.logging import create_logger
2526
from mlcomp.worker.sync import sync_directed
2627
from mlcomp.worker.tasks import execute_by_id
27-
from mlcomp.utils.misc import memory, disk, get_username
28+
from mlcomp.utils.misc import memory, disk, get_username, \
29+
get_default_network_interface
2830
from mlcomp.server.back.create_dags import dag_standard, dag_pipe
2931

3032
_session = Session.create_session(key=__name__)
@@ -105,19 +107,21 @@ def migrate():
105107
@click.option('--control_reqs', type=bool, default=True)
106108
@click.option('--params', multiple=True)
107109
def dag(config: str, control_reqs: bool, params):
110+
check_statuses()
108111
_dag(config, control_reqs=control_reqs, params=params)
109112

110113

111114
@main.command()
112-
def analyze():
113-
pass
115+
def report():
116+
create_report()
114117

115118

116119
@main.command()
117120
@click.argument('config')
118121
@click.option('--debug', type=bool, default=True)
119122
@click.option('--params', multiple=True)
120123
def execute(config: str, debug: bool, params):
124+
check_statuses()
121125
_create_computer()
122126

123127
# Fail all InProgress Tasks
@@ -164,6 +168,7 @@ def execute(config: str, debug: bool, params):
164168
help='only copy files from all the others to the computer'
165169
)
166170
def sync(project: str, computer: str, only_from: bool, only_to: bool):
171+
check_statuses()
167172
_create_computer()
168173

169174
computer = computer or socket.gethostname()
@@ -193,9 +198,14 @@ def sync(project: str, computer: str, only_from: bool, only_to: bool):
193198

194199
@main.command()
195200
def init():
196-
# already done by importing mlcomp
197-
# that is needed to import it
198-
pass
201+
env_path = join(CONFIG_FOLDER, '.env')
202+
lines = open(env_path).readlines()
203+
for i in range(len(lines)):
204+
if 'NCCL_SOCKET_IFNAME' in lines[i]:
205+
interface = get_default_network_interface()
206+
if interface:
207+
lines[i] = f'NCCL_SOCKET_IFNAME={interface}\n'
208+
open(env_path, 'w').writelines(lines)
199209

200210

201211
if __name__ == '__main__':

mlcomp/__version__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '20.2.4a'
1+
__version__ = '20.2.4b'

mlcomp/db/providers/dag.py

+3
Original file line numberDiff line numberDiff line change
@@ -215,5 +215,8 @@ def remove_all(self, ids: List[int]):
215215
synchronize_session=False)
216216
self.commit()
217217

218+
def count(self):
219+
return self.query(Dag).count()
220+
218221

219222
__all__ = ['DagProvider']

mlcomp/db/providers/log.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,24 @@ def get(self, filter: dict, options: PaginatorOptions):
5959
'component': to_snake(ComponentType(log.component).name),
6060
'computer': log.computer,
6161
'step': self.to_dict(step) if step else None,
62-
'task': self.to_dict(task, rules=('-additional_info', ))
62+
'task': self.to_dict(task, rules=('-additional_info',))
6363
if task else None
6464
}
6565
data.append(item)
6666

6767
return {'total': total, 'data': data}
6868

69-
def last(self, count: int, dag: int = None, task: int = None):
69+
def last(self, count: int, dag: int = None, task: int = None,
70+
levels=None, components=None):
7071
query = self.query(Log, Task.id).outerjoin(Task)
7172
if dag is not None:
7273
query = query.filter(Task.dag == dag)
7374
if task is not None:
7475
query = query.filter(Task.id == task)
76+
if levels is not None:
77+
query = query.filter(Log.level.in_(levels))
78+
if components is not None:
79+
query = query.filter(Log.component.in_(components))
7580
return query.order_by(Log.id.desc()).limit(count).all()
7681

7782

mlcomp/report.py

+171
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import shutil
2+
import traceback
3+
from glob import glob
4+
from os import makedirs
5+
from os.path import dirname, join, basename, exists
6+
7+
import pandas as pd
8+
import migrate.versioning.api as api
9+
from mlcomp.utils.io import zip_folder
10+
11+
from mlcomp.db.enums import LogStatus, ComponentType
12+
13+
from mlcomp import SA_CONNECTION_STRING, REPORT_FOLDER, LOG_FOLDER, DB_TYPE, \
14+
CONFIG_FOLDER, ROOT_FOLDER, DATA_FOLDER, MODEL_FOLDER, TASK_FOLDER, \
15+
DB_FOLDER, TMP_FOLDER
16+
from mlcomp.utils.misc import now, to_snake
17+
18+
from mlcomp.db.providers import DagProvider, LogProvider
19+
20+
21+
def statuses(folder: str = None):
22+
rows = []
23+
24+
folder_status = 'OK'
25+
folder_comment = ''
26+
27+
folders = [
28+
ROOT_FOLDER,
29+
DATA_FOLDER,
30+
MODEL_FOLDER,
31+
TASK_FOLDER,
32+
LOG_FOLDER,
33+
CONFIG_FOLDER,
34+
DB_FOLDER,
35+
REPORT_FOLDER,
36+
TMP_FOLDER
37+
]
38+
for f in folders:
39+
if not exists(f):
40+
folder_status = 'ERROR'
41+
folder_comment = f'folder {f} does not exist'
42+
43+
files = [
44+
join(CONFIG_FOLDER, '.env')
45+
]
46+
for f in files:
47+
if not exists(f):
48+
folder_status = 'ERROR'
49+
folder_comment = f'file {f} does not exist'
50+
51+
rows.append({
52+
'name': 'Folders',
53+
'status': folder_status,
54+
'comment': folder_comment
55+
})
56+
57+
database_status = 'OK'
58+
database_comment = f'DB_TYPE = {DB_TYPE}'
59+
try:
60+
provider = DagProvider()
61+
provider.count()
62+
except Exception:
63+
database_status = 'ERROR'
64+
database_comment += ' ' + traceback.format_exc()
65+
66+
rows.append({
67+
'name': 'Database',
68+
'status': database_status,
69+
'comment': database_comment
70+
})
71+
72+
if database_status == 'OK':
73+
migrate_status = 'OK'
74+
75+
repository_folder = join(dirname(__file__), 'migration')
76+
repository_version = api.version(repository_folder)
77+
78+
db_version = api.db_version(SA_CONNECTION_STRING, repository_folder)
79+
80+
if db_version != repository_version:
81+
migrate_status = 'ERROR'
82+
migrate_comment = f'Repository version = {repository_version} ' \
83+
f'Db version = {db_version}'
84+
else:
85+
migrate_comment = f'version: {db_version}'
86+
87+
rows.append({
88+
'name': 'Migrate',
89+
'status': migrate_status,
90+
'comment': migrate_comment
91+
})
92+
93+
df = pd.DataFrame(rows)
94+
95+
if folder is not None:
96+
print('Statuses:')
97+
print(df)
98+
99+
df.to_csv(join(folder, 'statuses.csv'), index=False)
100+
101+
return df
102+
103+
104+
def check_statuses():
105+
stats = statuses()
106+
failed = stats[stats['status'] != 'OK']
107+
if failed.shape[0] > 0:
108+
print('There are errors in statuses')
109+
for row in stats.itertuples():
110+
print(f'name: {row.name} status'
111+
f' {row.status} comment {row.comment}')
112+
113+
import time
114+
time.sleep(0.01)
115+
116+
raise Exception('There are errors in statuses. '
117+
'Please check them above')
118+
119+
120+
def logs(folder: str = None):
121+
log_provider = LogProvider()
122+
errors = log_provider.last(count=1000, levels=[LogStatus.Error.value])
123+
service_components = [ComponentType.Supervisor.value,
124+
ComponentType.API.value,
125+
ComponentType.WorkerSupervisor.value]
126+
services = log_provider.last(count=1000, components=service_components)
127+
logs = errors + services
128+
129+
rows = []
130+
for l, _ in logs:
131+
rows.append({
132+
'status': to_snake(LogStatus(l.level).name),
133+
'component': to_snake(ComponentType(l.component).name),
134+
'time': l.time,
135+
'message': l.message,
136+
})
137+
df = pd.DataFrame(rows)
138+
df.to_csv(join(folder, 'logs_db.csv'), index=False)
139+
140+
if folder is not None:
141+
for file in glob(join(LOG_FOLDER, '*')):
142+
shutil.copy(file, join(folder, basename(file)))
143+
print('logs formed')
144+
return df
145+
146+
147+
def create_report():
148+
print('*** Report Start ***')
149+
print()
150+
151+
folder = join(REPORT_FOLDER, f'{now()}'.split('.')[0])
152+
makedirs(folder, exist_ok=True)
153+
154+
statuses(folder)
155+
156+
print()
157+
158+
logs(folder)
159+
160+
print()
161+
162+
zip_path = folder + '.zip'
163+
zip_folder(folder, dst=zip_path)
164+
165+
print('Report path', zip_path)
166+
print()
167+
168+
print('*** Report End ***')
169+
170+
171+
__all__ = ['check_statuses', 'create_report']

mlcomp/server/__main__.py

+7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import click
55

66
from mlcomp import CONFIG_FOLDER, REDIS_PORT, REDIS_PASSWORD
7+
from mlcomp.report import check_statuses
78
from mlcomp.server.back.app import start_server as _start_server
89
from mlcomp.server.back.app import stop_server as _stop_server
910
from mlcomp.utils.misc import kill_child_processes
@@ -19,6 +20,7 @@ def start_site():
1920
"""
2021
Start only site
2122
"""
23+
check_statuses()
2224
_start_server()
2325

2426

@@ -27,6 +29,7 @@ def stop_site():
2729
"""
2830
Stop site
2931
"""
32+
check_statuses()
3033
_stop_server()
3134

3235

@@ -45,6 +48,8 @@ def start(daemon: bool, debug: bool, workers: int, log_level: str):
4548
4649
It starts: redis-server, site, worker_supervisor, workers
4750
"""
51+
check_statuses()
52+
4853
# creating supervisord config
4954
supervisor_command = 'mlcomp-worker worker-supervisor'
5055
worker_command = 'mlcomp-worker worker'
@@ -93,6 +98,8 @@ def stop():
9398
"""
9499
Stop supervisord started by start command
95100
"""
101+
check_statuses()
102+
96103
lines = os.popen('ps -ef | grep supervisord').readlines()
97104
for line in lines:
98105
if 'mlcomp/configs/supervisord.conf' not in line:

mlcomp/utils/misc.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def dict_func(objcts: List, func=np.mean):
5757

5858
def get_pid(name):
5959
res = []
60-
lines = check_output(["ps", '-ef']).decode().split('\n')
60+
lines = check_output(['ps', '-ef']).decode().split('\n')
6161
header = lines[0].split()
6262
for line in lines[1:]:
6363
if line.strip() == '':
@@ -80,6 +80,20 @@ def get_pid(name):
8080
return res
8181

8282

83+
def get_default_network_interface():
84+
lines = check_output(['route']).decode().split('\n')
85+
header = lines[1].split()
86+
87+
for line in lines[2:]:
88+
parts = line.split()
89+
item = dict()
90+
for h, p in zip(header, parts):
91+
item[h] = p
92+
if item['Destination'] == 'default' and 'Iface' in item:
93+
return item['Iface']
94+
return None
95+
96+
8397
def now():
8498
return datetime.utcnow()
8599

0 commit comments

Comments
 (0)