Skip to content

Commit 2c2f8bf

Browse files
Enhance job termination process on workers to handle multiple processes and improve error logging
1 parent 3b3b8f2 commit 2c2f8bf

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

cads_broker/dispatcher.py

+17-8
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,23 @@ def get_tasks_on_scheduler(dask_scheduler: distributed.Scheduler) -> dict[str, A
9494

9595

9696
def kill_job_on_worker(client: distributed.Client, request_uid: str) -> None:
97-
worker_pid_event = client.get_events(request_uid)[0][1]
98-
client.run(
99-
os.kill,
100-
worker_pid_event["pid"],
101-
signal.SIGTERM,
102-
workers=[worker_pid_event["worker"]],
103-
nanny=True,
104-
)
97+
"""Kill the job on the worker."""
98+
# loop on all the processes related to the request_uid
99+
for worker_pid_event in client.get_events(request_uid):
100+
_, worker_pid_event = worker_pid_event
101+
pid = worker_pid_event["pid"]
102+
worker_ip = worker_pid_event["worker"]
103+
try:
104+
client.run(
105+
os.kill,
106+
pid,
107+
signal.SIGTERM,
108+
workers=[worker_ip],
109+
nanny=True,
110+
)
111+
logger.info("killing worker", job_id=request_uid, pid=pid, worker_ip=worker_ip)
112+
except (KeyError, NameError):
113+
logger.warning("worker not found", job_id=request_uid, pid=pid, worker_ip=worker_ip)
105114

106115

107116
def cancel_jobs_on_scheduler(client: distributed.Client, job_ids: list[str]) -> None:

0 commit comments

Comments
 (0)