Skip to content

Kill jobs on workers #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cads_broker/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ def logger_kwargs(request: SystemRequest) -> dict[str, str]:
if event.event_type == "worker_name"
],
"origin": request.origin,
"cache_id": request.cache_id,
"portal": request.portal,
"entry_point": request.entry_point,
"request_metadata": request.request_metadata,
Expand Down
26 changes: 26 additions & 0 deletions cads_broker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io
import os
import pickle
import signal
import threading
import time
import traceback
Expand Down Expand Up @@ -92,6 +93,30 @@ def get_tasks_on_scheduler(dask_scheduler: distributed.Scheduler) -> dict[str, A
return client.run_on_scheduler(get_tasks_on_scheduler)


def kill_job_on_worker(client: distributed.Client, request_uid: str) -> None:
"""Kill the job on the worker."""
# loop on all the processes related to the request_uid
for worker_pid_event in client.get_events(request_uid):
_, worker_pid_event = worker_pid_event
pid = worker_pid_event["pid"]
worker_ip = worker_pid_event["worker"]
try:
client.run(
os.kill,
pid,
signal.SIGTERM,
workers=[worker_ip],
nanny=True,
)
logger.info(
"killed job on worker", job_id=request_uid, pid=pid, worker_ip=worker_ip
)
except (KeyError, NameError):
logger.warning(
"worker not found while killing a job", job_id=request_uid, pid=pid, worker_ip=worker_ip
)


def cancel_jobs_on_scheduler(client: distributed.Client, job_ids: list[str]) -> None:
"""Cancel jobs on the dask scheduler.

Expand Down Expand Up @@ -424,6 +449,7 @@ def sync_database(self, session: sa.orm.Session) -> None:
# if the request is not in the futures, it means that the request has been lost by the broker
# try to cancel the job directly on the scheduler
cancel_jobs_on_scheduler(self.client, job_ids=[request.request_uid])
kill_job_on_worker(self.client, request.request_uid)
session = self.manage_dismissed_request(request, session)
session.commit()

Expand Down
Loading