Skip to content

Commit ec1eaf3

Browse files
Track killed worker for memory usage (#108)
* track killed worker log event from nanny * add configurability to requeue * request not found * rename requeue env var * change env variable * check all qos status * qa
1 parent cb76da3 commit ec1eaf3

File tree

2 files changed

+68
-35
lines changed

2 files changed

+68
-35
lines changed

cads_broker/dispatcher.py

+65-33
Original file line numberDiff line numberDiff line change
@@ -268,14 +268,30 @@ def sync_database(self, session: sa.orm.Session) -> None:
268268
# if it doesn't find the request: re-queue it
269269
else:
270270
# FIXME: check if request status has changed
271-
logger.info(
272-
"request not found: re-queueing", job_id={request.request_uid}
273-
)
274-
db.requeue_request(request_uid=request.request_uid, session=session)
275-
self.queue.add(request.request_uid, request)
276-
self.qos.notify_end_of_request(
277-
request, session, scheduler=self.internal_scheduler
278-
)
271+
if os.getenv(
272+
"BROKER_REQUEUE_ON_LOST_REQUESTS", False
273+
) and request.request_metadata.get("resubmit", 0) < os.getenv(
274+
"BROKER_REQUEUE_LIMIT", 3
275+
):
276+
logger.info(
277+
"request not found: re-queueing", job_id={request.request_uid}
278+
)
279+
db.requeue_request(request_uid=request.request_uid, session=session)
280+
self.queue.add(request.request_uid, request)
281+
self.qos.notify_end_of_request(
282+
request, session, scheduler=self.internal_scheduler
283+
)
284+
else:
285+
db.set_request_status(
286+
request_uid=request.request_uid,
287+
status="failed",
288+
error_message="Request not found in dask scheduler",
289+
error_reason="not_found",
290+
session=session,
291+
)
292+
self.qos.notify_end_of_request(
293+
request, session, scheduler=self.internal_scheduler
294+
)
279295

280296
@perf_logger
281297
def sync_qos_rules(self, session_write) -> None:
@@ -298,10 +314,6 @@ def sync_qos_rules(self, session_write) -> None:
298314
def on_future_done(self, future: distributed.Future) -> None:
299315
job_status = DASK_STATUS_TO_STATUS.get(future.status, "accepted")
300316
logger_kwargs: dict[str, Any] = {}
301-
log = list(self.client.get_events(f"{future.key}/log"))
302-
user_visible_log = list(
303-
self.client.get_events(f"{future.key}/user_visible_log")
304-
)
305317
with self.session_maker_write() as session:
306318
if future.status == "finished":
307319
result = future.result()
@@ -310,27 +322,53 @@ def on_future_done(self, future: distributed.Future) -> None:
310322
job_status,
311323
cache_id=result,
312324
session=session,
313-
log=log,
314-
user_visible_log=user_visible_log,
315325
)
316326
elif future.status == "error":
317327
exception = future.exception()
318328
error_message = "".join(traceback.format_exception(exception))
319329
error_reason = exception.__class__.__name__
320-
if error_reason == "distributed.scheduler.KilledWorker" and os.getenv(
321-
"BROKER_REQUEUE_ON_KILLED_WORKER", False
322-
):
323-
logger.info("worker killed: re-queueing", job_id=future.key)
324-
db.requeue_request(request_uid=future.key, session=session)
325-
self.queue.add(request.request_uid, request)
330+
request = db.get_request(future.key, session=session)
331+
requeue = os.getenv("BROKER_REQUEUE_ON_KILLED_WORKER_REQUESTS", False)
332+
if error_reason == "KilledWorker":
333+
worker_restart_events = self.client.get_events(
334+
"worker-restart-memory"
335+
)
336+
# get info on worker and pid of the killed request
337+
_, worker_pid_event = self.client.get_events(future.key)[0]
338+
if worker_restart_events:
339+
for event in worker_restart_events:
340+
_, job = event
341+
if (
342+
job["worker"] == worker_pid_event["worker"]
343+
and job["pid"] == worker_pid_event["pid"]
344+
):
345+
db.add_event(
346+
event_type="killed_worker",
347+
request_uid=future.key,
348+
message="Worker has been killed by the Nanny due to memory usage. "
349+
f"{job['worker']=}, {job['pid']=}, {job['rss']=}",
350+
session=session,
351+
)
352+
request = db.set_request_status(
353+
future.key,
354+
"failed",
355+
error_message=error_message,
356+
error_reason=error_reason,
357+
session=session,
358+
)
359+
requeue = False
360+
if requeue and request.request_metadata.get(
361+
"resubmit", 0
362+
) < os.getenv("BROKER_REQUEUE_LIMIT", 3):
363+
logger.info("worker killed: re-queueing", job_id=future.key)
364+
db.requeue_request(request_uid=future.key, session=session)
365+
self.queue.add(future.key, request)
326366
else:
327367
request = db.set_request_status(
328368
future.key,
329369
job_status,
330370
error_message=error_message,
331371
error_reason=error_reason,
332-
log=log,
333-
user_visible_log=user_visible_log,
334372
session=session,
335373
)
336374
else:
@@ -340,9 +378,8 @@ def on_future_done(self, future: distributed.Future) -> None:
340378
job_status,
341379
session=session,
342380
resubmit=True,
343-
log=log,
344-
user_visible_log=user_visible_log,
345381
)
382+
self.queue.add(future.key, request)
346383
logger.warning(
347384
"unknown dask status, re-queing",
348385
job_status={future.status},
@@ -367,20 +404,17 @@ def submit_requests(
367404
) -> None:
368405
queue = sorted(
369406
candidates,
370-
key=lambda candidate: self.qos.priority(
371-
candidate, session_write
372-
),
407+
key=lambda candidate: self.qos.priority(candidate, session_write),
373408
reverse=True,
374409
)
375410
requests_counter = 0
376411
for request in queue:
377412
if self.qos.can_run(
378413
request, session=session_write, scheduler=self.internal_scheduler
379414
):
380-
self.submit_request(request, session=session_write)
415+
if requests_counter <= int(number_of_requests * WORKERS_MULTIPLIER):
416+
self.submit_request(request, session=session_write)
381417
requests_counter += 1
382-
if requests_counter == int(number_of_requests * WORKERS_MULTIPLIER):
383-
break
384418

385419
def submit_request(
386420
self, request: db.SystemRequest, session: sa.orm.Session
@@ -421,9 +455,7 @@ def run(self) -> None:
421455
with self.session_maker_read() as session_read:
422456
if (rules_hash := get_rules_hash(self.qos.path)) != self.qos.rules_hash:
423457
logger.info("reloading qos rules")
424-
self.qos.reload_rules(
425-
session=session_read
426-
)
458+
self.qos.reload_rules(session=session_read)
427459
self.qos.rules_hash = rules_hash
428460
self.qos.environment.set_session(session_read)
429461
# expire_on_commit=False is used to detach the accepted requests without an error

cads_broker/entry_points.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def delete_requests(
126126
days: float = 0,
127127
skip_confirmation: Annotated[bool, typer.Option("--yes", "-y")] = False,
128128
) -> None:
129-
"""Set the status of records in the system_requests table to 'dismissed' if they are in the specified status.
129+
"""Set the status of records in the system_requests table to 'dismissed'.
130130
131131
Parameters
132132
----------
@@ -148,7 +148,8 @@ def delete_requests(
148148
number_of_requests = session.execute(statement).rowcount
149149
if not skip_confirmation:
150150
if not typer.confirm(
151-
f"Setting status to 'dismissed' for {number_of_requests} {status} requests. Do you want to continue?",
151+
f"Setting status to 'dismissed' for {number_of_requests} {status} requests. "
152+
"Do you want to continue?",
152153
abort=True,
153154
default=True,
154155
):

0 commit comments

Comments
 (0)