diff --git a/cads_broker/database.py b/cads_broker/database.py index 277fcaf..c1960af 100644 --- a/cads_broker/database.py +++ b/cads_broker/database.py @@ -779,6 +779,7 @@ def logger_kwargs(request: SystemRequest) -> dict[str, str]: if event.event_type == "worker_name" ], "origin": request.origin, + "cache_id": request.cache_id, "portal": request.portal, "entry_point": request.entry_point, "request_metadata": request.request_metadata, diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index 1b1a461..d790d8f 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -3,6 +3,7 @@ import io import os import pickle +import signal import threading import time import traceback @@ -92,6 +93,30 @@ def get_tasks_on_scheduler(dask_scheduler: distributed.Scheduler) -> dict[str, A return client.run_on_scheduler(get_tasks_on_scheduler) +def kill_job_on_worker(client: distributed.Client, request_uid: str) -> None: + """Kill the job on the worker.""" + # loop on all the processes related to the request_uid + for worker_pid_event in client.get_events(request_uid): + _, worker_pid_event = worker_pid_event + pid = worker_pid_event["pid"] + worker_ip = worker_pid_event["worker"] + try: + client.run( + os.kill, + pid, + signal.SIGTERM, + workers=[worker_ip], + nanny=True, + ) + logger.info( + "killed job on worker", job_id=request_uid, pid=pid, worker_ip=worker_ip + ) + except (KeyError, NameError): + logger.warning( + "worker not found while killing a job", job_id=request_uid, pid=pid, worker_ip=worker_ip + ) + + def cancel_jobs_on_scheduler(client: distributed.Client, job_ids: list[str]) -> None: """Cancel jobs on the dask scheduler. @@ -424,6 +449,7 @@ def sync_database(self, session: sa.orm.Session) -> None: # if the request is not in the futures, it means that the request has been lost by the broker # try to cancel the job directly on the scheduler cancel_jobs_on_scheduler(self.client, job_ids=[request.request_uid]) + kill_job_on_worker(self.client, request.request_uid) session = self.manage_dismissed_request(request, session) session.commit()