From 86eef03e6bbd1af3eeaa395033db824f9bce9024 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Thu, 16 Jan 2025 14:01:07 +0100 Subject: [PATCH 1/6] kill job on workers --- cads_broker/database.py | 1 + cads_broker/dispatcher.py | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/cads_broker/database.py b/cads_broker/database.py index 277fcaf..c1960af 100644 --- a/cads_broker/database.py +++ b/cads_broker/database.py @@ -779,6 +779,7 @@ def logger_kwargs(request: SystemRequest) -> dict[str, str]: if event.event_type == "worker_name" ], "origin": request.origin, + "cache_id": request.cache_id, "portal": request.portal, "entry_point": request.entry_point, "request_metadata": request.request_metadata, diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index 1b1a461..f12e952 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -7,6 +7,7 @@ import time import traceback from typing import Any +import signal import attrs import cachetools @@ -92,6 +93,17 @@ def get_tasks_on_scheduler(dask_scheduler: distributed.Scheduler) -> dict[str, A return client.run_on_scheduler(get_tasks_on_scheduler) +def kill_job_on_worker(client: distributed.Client, request_uid: str) -> None: + worker_pid_event = client.get_events(request_uid)[0][1] + client.run( + os.kill, + worker_pid_event["pid"], + signal.SIGTERM, + workers=[worker_pid_event["worker"]], + nanny=True, + ) + + def cancel_jobs_on_scheduler(client: distributed.Client, job_ids: list[str]) -> None: """Cancel jobs on the dask scheduler. @@ -420,6 +432,7 @@ def sync_database(self, session: sa.orm.Session) -> None: for request in dismissed_requests: if future := self.futures.pop(request.request_uid, None): future.cancel() + kill_job_on_worker(self.client, request.request_uid) else: # if the request is not in the futures, it means that the request has been lost by the broker # try to cancel the job directly on the scheduler From af93d20a1ab7732bb611b8c7f7bef6fccf39f903 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Thu, 16 Jan 2025 14:31:45 +0100 Subject: [PATCH 2/6] killed job --- cads_broker/dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index f12e952..102d90c 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -432,11 +432,11 @@ def sync_database(self, session: sa.orm.Session) -> None: for request in dismissed_requests: if future := self.futures.pop(request.request_uid, None): future.cancel() - kill_job_on_worker(self.client, request.request_uid) else: # if the request is not in the futures, it means that the request has been lost by the broker # try to cancel the job directly on the scheduler cancel_jobs_on_scheduler(self.client, job_ids=[request.request_uid]) + kill_job_on_worker(self.client, request.request_uid) session = self.manage_dismissed_request(request, session) session.commit() From 3b3b8f2d5e9984077c993d9ea405b1897ffaa1b3 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Thu, 16 Jan 2025 14:54:19 +0100 Subject: [PATCH 3/6] qa --- cads_broker/dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index 102d90c..7cb3be7 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -3,11 +3,11 @@ import io import os import pickle +import signal import threading import time import traceback from typing import Any -import signal import attrs import cachetools From 2c2f8bfacdd8c0c2d4986cd7e46c50c596044a52 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Thu, 16 Jan 2025 15:45:49 +0100 Subject: [PATCH 4/6] Enhance job termination process on workers to handle multiple processes and improve error logging --- cads_broker/dispatcher.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index 7cb3be7..3e3eedb 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -94,14 +94,23 @@ def get_tasks_on_scheduler(dask_scheduler: distributed.Scheduler) -> dict[str, A def kill_job_on_worker(client: distributed.Client, request_uid: str) -> None: - worker_pid_event = client.get_events(request_uid)[0][1] - client.run( - os.kill, - worker_pid_event["pid"], - signal.SIGTERM, - workers=[worker_pid_event["worker"]], - nanny=True, - ) + """Kill the job on the worker.""" + # loop on all the processes related to the request_uid + for worker_pid_event in client.get_events(request_uid): + _, worker_pid_event = worker_pid_event + pid = worker_pid_event["pid"] + worker_ip = worker_pid_event["worker"] + try: + client.run( + os.kill, + pid, + signal.SIGTERM, + workers=[worker_ip], + nanny=True, + ) + logger.info("killing worker", job_id=request_uid, pid=pid, worker_ip=worker_ip) + except (KeyError, NameError): + logger.warning("worker not found", job_id=request_uid, pid=pid, worker_ip=worker_ip) def cancel_jobs_on_scheduler(client: distributed.Client, job_ids: list[str]) -> None: From dcf0bd78d5708a2ad7cf3c4e2673e91c71e891e4 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Thu, 16 Jan 2025 15:59:37 +0100 Subject: [PATCH 5/6] Refactor logging in job termination process for improved readability --- cads_broker/dispatcher.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index 3e3eedb..f4a8b41 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -108,9 +108,13 @@ def kill_job_on_worker(client: distributed.Client, request_uid: str) -> None: workers=[worker_ip], nanny=True, ) - logger.info("killing worker", job_id=request_uid, pid=pid, worker_ip=worker_ip) + logger.info( + "killing worker", job_id=request_uid, pid=pid, worker_ip=worker_ip + ) except (KeyError, NameError): - logger.warning("worker not found", job_id=request_uid, pid=pid, worker_ip=worker_ip) + logger.warning( + "worker not found", job_id=request_uid, pid=pid, worker_ip=worker_ip + ) def cancel_jobs_on_scheduler(client: distributed.Client, job_ids: list[str]) -> None: From 248004d3b1b6e4046bc3b05da9aaca5a48cc2bd5 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Thu, 16 Jan 2025 16:54:02 +0100 Subject: [PATCH 6/6] improve log messages --- cads_broker/dispatcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index f4a8b41..d790d8f 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -109,11 +109,11 @@ def kill_job_on_worker(client: distributed.Client, request_uid: str) -> None: nanny=True, ) logger.info( - "killing worker", job_id=request_uid, pid=pid, worker_ip=worker_ip + "killed job on worker", job_id=request_uid, pid=pid, worker_ip=worker_ip ) except (KeyError, NameError): logger.warning( - "worker not found", job_id=request_uid, pid=pid, worker_ip=worker_ip + "worker not found while killing a job", job_id=request_uid, pid=pid, worker_ip=worker_ip )