Skip to content

Commit d9ad9a1

Browse files
Kill jobs on workers (#147)
* kill job on workers * killed job * qa * Enhance job termination process on workers to handle multiple processes and improve error logging * Refactor logging in job termination process for improved readability * improve log messages
1 parent bac1747 commit d9ad9a1

File tree

1 file changed

+26
-0
lines changed

1 file changed

+26
-0
lines changed

cads_broker/dispatcher.py

+26
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io
44
import os
55
import pickle
6+
import signal
67
import threading
78
import time
89
import traceback
@@ -92,6 +93,30 @@ def get_tasks_on_scheduler(dask_scheduler: distributed.Scheduler) -> dict[str, A
9293
return client.run_on_scheduler(get_tasks_on_scheduler)
9394

9495

96+
def kill_job_on_worker(client: distributed.Client, request_uid: str) -> None:
97+
"""Kill the job on the worker."""
98+
# loop on all the processes related to the request_uid
99+
for worker_pid_event in client.get_events(request_uid):
100+
_, worker_pid_event = worker_pid_event
101+
pid = worker_pid_event["pid"]
102+
worker_ip = worker_pid_event["worker"]
103+
try:
104+
client.run(
105+
os.kill,
106+
pid,
107+
signal.SIGTERM,
108+
workers=[worker_ip],
109+
nanny=True,
110+
)
111+
logger.info(
112+
"killed job on worker", job_id=request_uid, pid=pid, worker_ip=worker_ip
113+
)
114+
except (KeyError, NameError):
115+
logger.warning(
116+
"worker not found while killing a job", job_id=request_uid, pid=pid, worker_ip=worker_ip
117+
)
118+
119+
95120
def cancel_jobs_on_scheduler(client: distributed.Client, job_ids: list[str]) -> None:
96121
"""Cancel jobs on the dask scheduler.
97122
@@ -424,6 +449,7 @@ def sync_database(self, session: sa.orm.Session) -> None:
424449
# if the request is not in the futures, it means that the request has been lost by the broker
425450
# try to cancel the job directly on the scheduler
426451
cancel_jobs_on_scheduler(self.client, job_ids=[request.request_uid])
452+
kill_job_on_worker(self.client, request.request_uid)
427453
session = self.manage_dismissed_request(request, session)
428454
session.commit()
429455

0 commit comments

Comments
 (0)