Skip to content

Commit 62b8d12

Browse files
committed
docker for server
1 parent 2190e58 commit 62b8d12

23 files changed

+182
-83
lines changed

.dockerignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ dist
33
*.egg-ingo
44
examples
55
.git
6-
mlcomp/server
6+
mlcomp/server/front/node_modules

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,5 @@ Pipfile
110110
mlcomp/server/front/node_modules
111111
examples/**/data
112112
examples/**/log
113-
dist
113+
dist
114+
docker/*.conf

docker/Readme.md

+12-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,17 @@
22

33
Worker:
44

5+
docker build -f docker/worker-dev -t mlcomp-worker .
56

6-
docker run --net=host -it mlcomp-worker /bin/bash
7+
docker run --net=host -v /opt/mlcomp/:/opt/mlcomp -it mlcomp-worker /bin/bash
78

8-
PYTHONPATH=../ python __main__.py worker 0
9+
PYTHONPATH=../ python __main__.py worker 0
10+
11+
12+
Server:
13+
14+
docker build -f docker/server-dev -t mlcomp-server .
15+
16+
docker run --net=host -p 4201:4201 -it mlcomp-server /bin/bash
17+
18+
PYTHONPATH=../ python __main__.py start-server

docker/server-dev

+23-11
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,31 @@
1-
FROM ubuntu:16.04
1+
FROM python:3.6
22

3-
RUN export DEBIAN_FRONTEND=noninteractive \
4-
&& apt-get update -qq && apt-get upgrade -qq \
5-
&& apt-get install -y --no-install-recommends \
6-
python3 \
7-
python3-pip \
8-
python3-setuptools \
9-
supervisor \
10-
&& BUILD_DEPS='build-essential python3-dev git' \
11-
&& apt-get install -y --no-install-recommends ${BUILD_DEPS}
3+
RUN pip install --no-cache-dir \
4+
numpy>=1.16.3 \
5+
click>=7.0 \
6+
typing \
7+
apscheduler>=3.6.0 \
8+
sqlalchemy>=1.3.4 \
9+
celery>=4.3.0 \
10+
flask>=1.0.2 \
11+
requests \
12+
flask_cors>=3.0.6 \
13+
sqlalchemy_serializer==1.1.1 \
14+
psycopg2-binary>=2.8.2 \
15+
matplotlib \
16+
scikit-learn>=0.21.2 \
17+
Pillow \
18+
PyYAML \
19+
pathspec>=0.5.9 \
20+
psutil>=5.6.2 \
21+
GPUtil==1.4.0
1222

1323
# Set the locale
1424
ENV LANG C.UTF-8
1525
ENV LC_ALL C.UTF-8
1626

1727
# Copy source files
1828
COPY mlcomp /app/mlcomp
19-
WORKDIR /app/mlcomp
29+
WORKDIR /app/mlcomp
30+
31+
CMD ["python", "__main__.py", "start-server"]

docker/worker-dev

+10-3
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@ RUN apt-get update && apt-get install -y \
1717
libpq-dev \
1818
libturbojpeg \
1919
software-properties-common \
20+
supervisor \
2021
&& apt-get clean \
2122
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
2223

23-
RUN pip install setuptools>=41.0.1 \
24-
torchvision>=0.3.0 \
24+
RUN pip install --no-cache-dir \
25+
setuptools>=41.0.1 \
26+
torchvision>=0.2.0 \
2527
numpy>=1.16.3 \
26-
torch==1.1.0 \
2728
click>=7.0 \
2829
psutil>=5.6.2 \
2930
GPUtil==1.4.0 \
@@ -52,4 +53,10 @@ ENV LC_ALL C.UTF-8
5253
# Copy source files
5354
COPY mlcomp /app/mlcomp
5455
WORKDIR /app/mlcomp
56+
COPY docker/worker.py /app/mlcomp/worker.py
57+
COPY docker/worker.sh /app/mlcomp/worker.sh
58+
59+
60+
RUN chmod 777 worker.sh
61+
CMD [",/worker.sh"]
5562

docker/worker.py

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import os
2+
from multiprocessing import cpu_count
3+
4+
pr = os.getenv('CPU', cpu_count())
5+
text = [
6+
'[supervisord]',
7+
'nodaemon=true',
8+
''
9+
]
10+
for p in range(pr):
11+
text.append(f'[program:worker{p}]')
12+
text.append(f'command=python __main__.py worker {p}')
13+
text.append('autostart=true')
14+
text.append('autorestart=true')
15+
text.append('')
16+
17+
with open('supervisord.conf', 'w') as f:
18+
f.writelines('\n'.join(text))

docker/worker.sh

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/usr/bin/env bash
2+
3+
python worker.py
4+
PYTHONPATH=../ supervisord -c supervisord.conf

examples/catalyst/config.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ info:
22
name: catalyst
33
project: first
44
folder: examples/catalyst
5-
data_folder: /f/data/first
5+
data_folder: catalyst
66
executors:
77
dummy1:
88
type: Catalyst

examples/catalyst/experiment.py

+9-7
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,17 @@ def get_datasets(self, stage: str, **kwargs):
3838
transform=Experiment.get_transforms(stage=stage, mode="valid")
3939
)
4040

41-
# while True:
42-
# pass
43-
#raise Exception('assd')
41+
while True:
42+
continue
4443

45-
trainset.data = trainset.data[:32]
46-
trainset.targets = np.clip(trainset.targets[:32], 0, 1)
44+
trainset.train_data = trainset.train_data[:32]
45+
trainset.train_labels = np.clip(trainset.train_labels[:32], 0, 1)
4746

48-
testset.data = trainset.data[:32]
49-
testset.targets = np.clip(trainset.targets[:32], 0, 1)
47+
testset.train_data = trainset.train_data[:32]
48+
testset.train_labels = np.clip(trainset.train_labels[:32], 0, 1)
49+
50+
testset.test_data = testset.test_data[:32]
51+
testset.test_labels = np.clip(testset.test_labels[:32], 0, 1)
5052

5153
datasets["train"] = trainset
5254
datasets["valid"] = testset

examples/catalyst/model.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ def forward(self, x):
2121
x = F.relu(self.fc1(x))
2222
x = F.relu(self.fc2(x))
2323
x = self.fc3(x)
24-
return x
24+
return x

mlcomp/__main__.py

+26-15
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,16 @@
33
import os
44
from mlcomp.task.storage import Storage
55
from mlcomp.utils.config import load_ordered_yaml
6-
from mlcomp.task.executors import Executor
76
from mlcomp.task.app import app
87
import socket
9-
from multiprocessing import cpu_count, Process
10-
import torch
11-
8+
from multiprocessing import cpu_count
129
from mlcomp.utils.misc import dict_func
1310
import psutil
1411
import GPUtil
1512
import numpy as np
1613
from mlcomp.task.tasks import execute_by_id
14+
from mlcomp.utils.schedule import start_schedule
15+
1716

1817
@click.group()
1918
def main():
@@ -46,28 +45,43 @@ def worker_usage():
4645
@main.command()
4746
@click.argument('number', type=int)
4847
def worker(number):
48+
docker_img = os.getenv('DOCKER_IMG', 'default')
49+
argv = [
50+
'worker',
51+
'--loglevel=INFO',
52+
'-P=solo',
53+
f'-n={number}',
54+
'-O fair',
55+
'-c=1',
56+
'--prefetch-multiplier=1',
57+
'-Q',
58+
f'{socket.gethostname()}_{docker_img}'
59+
]
60+
app.worker_main(argv)
61+
62+
@main.command()
63+
def worker_supervisor():
4964
provider = ComputerProvider()
5065
tot_m, used_m, free_m = map(int, os.popen('free -t -m').readlines()[-1].split()[1:])
51-
52-
computer = Computer(name=socket.gethostname(), gpu=torch.cuda.device_count(), cpu=cpu_count(), memory=tot_m)
66+
computer = Computer(name=socket.gethostname(), gpu=len(GPUtil.getGPUs()), cpu=cpu_count(), memory=tot_m)
5367
provider.create_or_update(computer, 'name')
5468

55-
# start_schedule([(worker_usage, 60)])
69+
start_schedule([(worker_usage, 60)])
5670

71+
docker_img = os.getenv('DOCKER_IMG', 'default')
5772
argv = [
5873
'worker',
5974
'--loglevel=INFO',
6075
'-P=solo',
61-
f'-n={number}',
76+
f'-n=1',
6277
'-O fair',
6378
'-c=1',
6479
'--prefetch-multiplier=1',
6580
'-Q',
66-
socket.gethostname()
81+
f'{socket.gethostname()}_{docker_img}_supervisor'
6782
]
6883
app.worker_main(argv)
6984

70-
7185
@main.command()
7286
def start_server():
7387
from mlcomp.server.back.app import start_server as _start_server
@@ -101,7 +115,8 @@ def _dag(config: str, debug: bool = False):
101115

102116
folder = os.path.join(os.getcwd(), info['folder'])
103117
project = ProjectProvider().by_name(info['project']).id
104-
dag = dag_provider.add(Dag(config=config_text, project=project, name=info['name']))
118+
dag = dag_provider.add(Dag(config=config_text, project=project,
119+
name=info['name'], docker_img=info.get('docker_img')))
105120
storage.upload(folder, dag)
106121

107122
created = OrderedDict()
@@ -112,8 +127,6 @@ def _dag(config: str, debug: bool = False):
112127
for d in v['depends']:
113128
if d not in executors:
114129
raise Exception(f'Executor {k} depend on {d} which does not exist')
115-
if not Executor.is_registered(executors[d]['type']):
116-
raise Exception(f'Executor {d} has not been registered')
117130

118131
valid = valid and d in created
119132
if valid:
@@ -166,8 +179,6 @@ def execute(config: str):
166179
for d in v['depends']:
167180
if d not in executors:
168181
raise Exception(f'Executor {k} depend on {d} which does not exist')
169-
if not Executor.is_registered(executors[d]['type']):
170-
raise Exception(f'Executor {d} has not been registered')
171182

172183
valid = valid and d in created
173184
if valid:

mlcomp/db/models/dag.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ class Dag(Base):
1111
name = sa.Column(sa.String)
1212
tasks = relationship('Task', lazy='noload')
1313
project_rel = relationship('Project', lazy='noload')
14-
14+
docker_img = sa.Column(sa.String)

mlcomp/db/models/task.py

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class Task(Base):
2424
dag_rel = relationship('Dag', lazy='noload')
2525
debug = sa.Column(sa.Boolean, default=False)
2626
pid = sa.Column(sa.Integer)
27+
replay = sa.Column(sa.Boolean)
2728

2829
class TaskDependence(Base):
2930
__tablename__ = 'task_dependencies'

mlcomp/db/providers/base.py

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def create_or_update(self, obj: Base, field: str):
5252

5353
def update(self):
5454
self.session.update()
55+
self.session.commit()
5556

5657
@property
5758
def session(self):

mlcomp/db/providers/task.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def change_status(self, task, status: TaskStatus):
4949
task.finished = now()
5050

5151
task.status = status.value
52-
self.session.update()
52+
self.update()
5353

5454
def by_status(self, status: TaskStatus):
5555
return self.query(Task).filter(Task.status == status.value).options(joinedload(Task.dag_rel)).all()

mlcomp/server/back/app.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ def tasks():
166166
@requires_auth
167167
def task_stop():
168168
data = request_data()
169-
status = celery_tasks.stop(data['id'])
169+
task = TaskProvider().by_id(data['id'], joinedload(Task.dag_rel))
170+
status = celery_tasks.stop(task)
170171
return json.dumps({'success': True, 'status': to_snake(TaskStatus(status).name)})
171172

172173

@@ -178,7 +179,8 @@ def dag_stop():
178179
id = int(data['id'])
179180
dag = provider.by_id(id, joined_load=['tasks'])
180181
for t in dag.tasks:
181-
celery_tasks.stop(t.id)
182+
t.dag_rel = dag
183+
celery_tasks.stop(t)
182184
return json.dumps({'success': True, 'dag': provider.get({'id': id})['data'][0]})
183185

184186

@@ -297,6 +299,7 @@ def all_exception_handler(error):
297299
if type(error) == ProgrammingError:
298300
Session.cleanup()
299301

302+
logger.error(error)
300303
return str(error), 500
301304

302305

mlcomp/server/back/supervisor.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import time
22
from mlcomp.utils.logging import logger
33
import traceback
4-
from mlcomp.task.tasks import execute
4+
from mlcomp.task.tasks import execute, queue_list
55
from mlcomp.db.providers import *
66
from mlcomp.utils.schedule import start_schedule
77

@@ -11,7 +11,10 @@ def supervisor():
1111
computer_provider = ComputerProvider()
1212

1313
try:
14-
time.sleep(1)
14+
queues = queue_list()
15+
if len(queues)==0:
16+
return
17+
1518
not_ran_tasks = provider.by_status(TaskStatus.NotRan)
1619
not_ran_tasks = [task for task in not_ran_tasks if not task.debug]
1720
logger.info(f'Found {len(not_ran_tasks)} not ran tasks')
@@ -39,12 +42,16 @@ def supervisor():
3942
if task.computer is not None and task.computer != computer.name:
4043
continue
4144

42-
r = execute.apply_async((task.id,), queue=computer['name'])
45+
queue = f'{computer["name"]}_{task.dag_rel.docker_img or "default"}'
46+
if queue not in queues:
47+
continue
48+
49+
r = execute.apply_async((task.id,), queue=queue)
4350
task.status = TaskStatus.Queued.value
4451
task.computer_assigned = computer['name']
4552
task.celery_id = r.id
4653

47-
provider.session.update()
54+
provider.update()
4855

4956
computer['gpu'] -= task.gpu
5057
computer['cpu'] -= task.cpu

mlcomp/task/app.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
from __future__ import absolute_import, unicode_literals
22
from celery import Celery
33

4-
app = Celery('task',
4+
app = Celery('mlcomp',
55
broker='amqp://',
66
backend='amqp://',
77
include=['mlcomp.task.tasks']
8-
)
8+
)

0 commit comments

Comments
 (0)