Skip to content

Commit ad7b2bc

Browse files
Fix long running requests and QoS status (#112)
* dummy load test * try to fix * remove unused request_uid * try to fix * logging * logging * fix * revert and fix * fix * fix * try to fix * fix * limit number of tasks done in the queue * add optimization * fix * fix * fix * add env variable * qa * simplify * fix * fix * fix * revert * drop try except * test * fix * debug * debug * debug * try to fix * fix * try longer requests * debug * can't get the future from the scheduler * move results to worker * print only if elapsed > 1 * decrease logging * fix broker doesn't write the result anymore * fix * remove forced elapsed * fix request_body * tidy * add debug logs * logging * debug * reset futures if needed * fix delay between db status and qos status * fix too many running requests * logging * logging * logging * fix running and queued status on db * fix queued * fix * fix * try to fix * logging * logging * try to fix * add logging * try to fix * try to fix slow futures * temporary drop multithreading * fix * add logging * release future * move future release * typing * simplify * add done callback * fix * avoid threading issues * revert * swap orders of updates * fix number of workers * fix number of workers * fix * remove add_done_callback * try to fix memory issues with scheduler * revert * add reload rules * fix * revert to entrypoint
1 parent d6de023 commit ad7b2bc

File tree

6 files changed

+235
-131
lines changed

6 files changed

+235
-131
lines changed

cads_broker/Environment.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ def wrapped(self, *args, **kwargs):
2323

2424

2525
class Environment:
26-
def __init__(self):
27-
self.number_of_workers = 0
26+
def __init__(self, number_of_workers=0):
27+
self.number_of_workers = number_of_workers
2828
self.session = None
2929
self.lock = threading.RLock()
3030
self._enabled = {}

cads_broker/database.py

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -474,14 +474,13 @@ def reset_qos_rules(session: sa.orm.Session, qos):
474474
for request in get_running_requests(session):
475475
# Recompute the limits
476476
limits = qos.limits_for(request, session)
477-
cached_rules.update(
478-
delete_request_qos_status(
479-
request_uid=request.request_uid,
480-
rules=limits,
481-
session=session,
482-
rules_in_db=cached_rules,
483-
)
477+
_, rules = delete_request_qos_status(
478+
request_uid=request.request_uid,
479+
rules=limits,
480+
session=session,
481+
rules_in_db=cached_rules,
484482
)
483+
cached_rules.update(rules)
485484
session.commit()
486485

487486

@@ -523,7 +522,7 @@ def add_qos_rule(
523522
def decrement_qos_rule_running(
524523
rules: list, session: sa.orm.Session, rules_in_db: dict[str, QoSRule] = {}, **kwargs
525524
):
526-
"""Increment the running counter of a QoS rule."""
525+
"""Decrement the running counter of a QoS rule."""
527526
for rule in rules:
528527
if (rule_uid := str(rule.__hash__())) in rules_in_db:
529528
qos_rule = rules_in_db[rule_uid]
@@ -534,7 +533,8 @@ def decrement_qos_rule_running(
534533
# this happend when a request is finished after a broker restart.
535534
# the rule is not in the database anymore because it has been reset.
536535
continue
537-
qos_rule.running = max(0, qos_rule.running - 1)
536+
qos_rule.running = rule.value
537+
return None, None
538538

539539

540540
def delete_request_qos_status(
@@ -556,11 +556,11 @@ def delete_request_qos_status(
556556
except sqlalchemy.orm.exc.NoResultFound:
557557
qos_rule = add_qos_rule(rule=rule, session=session)
558558
created_rules[qos_rule.uid] = qos_rule
559-
if qos_rule in request.qos_rules:
559+
if qos_rule.uid in [r.uid for r in request.qos_rules]:
560560
request.qos_rules.remove(qos_rule)
561-
qos_rule.queued = max(0, qos_rule.queued - 1)
562-
qos_rule.running += 1
563-
return created_rules
561+
qos_rule.queued = len(rule.queued)
562+
qos_rule.running = rule.value
563+
return request, created_rules
564564

565565

566566
def add_request_qos_status(
@@ -569,20 +569,22 @@ def add_request_qos_status(
569569
session: sa.orm.Session,
570570
rules_in_db: dict[str, QoSRule] = {},
571571
**kwargs,
572-
):
572+
) -> tuple[SystemRequest | None, dict[str, QoSRule]]:
573573
created_rules: dict = {}
574+
new_request = None
574575
if request is None:
575-
return {}
576+
return new_request, {}
576577
for rule in rules:
577578
if (rule_uid := str(rule.__hash__())) in rules_in_db:
578579
qos_rule = rules_in_db[rule_uid]
579580
else:
580581
qos_rule = add_qos_rule(rule=rule, session=session)
581582
created_rules[qos_rule.uid] = qos_rule
582-
if qos_rule not in request.qos_rules:
583-
qos_rule.queued += 1
584-
request.qos_rules.append(qos_rule)
585-
return created_rules
583+
if qos_rule.uid not in [r.uid for r in request.qos_rules]:
584+
qos_rule.queued = len(rule.queued)
585+
new_request = get_request(request.request_uid, session)
586+
new_request.qos_rules.append(qos_rule)
587+
return new_request, created_rules
586588

587589

588590
def get_qos_status_from_request(
@@ -606,11 +608,9 @@ def get_qos_status_from_request(
606608

607609

608610
def requeue_request(
609-
request_uid: str,
611+
request: SystemRequest,
610612
session: sa.orm.Session,
611-
):
612-
statement = sa.select(SystemRequest).where(SystemRequest.request_uid == request_uid)
613-
request = session.scalars(statement).one()
613+
) -> SystemRequest | None:
614614
if request.status == "running":
615615
# ugly implementation because sqlalchemy doesn't allow to directly update JSONB
616616
# FIXME: use a specific column for resubmit_number
@@ -624,7 +624,29 @@ def requeue_request(
624624
logger.info("requeueing request", **logger_kwargs(request=request))
625625
return request
626626
else:
627-
return
627+
return None
628+
629+
630+
def set_request_cache_id(request_uid: str, cache_id: int, session: sa.orm.Session):
631+
statement = sa.select(SystemRequest).where(SystemRequest.request_uid == request_uid)
632+
request = session.scalars(statement).one()
633+
request.cache_id = cache_id
634+
session.commit()
635+
return request
636+
637+
638+
def set_successful_request(
639+
request_uid: str,
640+
session: sa.orm.Session,
641+
) -> SystemRequest | None:
642+
statement = sa.select(SystemRequest).where(SystemRequest.request_uid == request_uid)
643+
request = session.scalars(statement).one()
644+
if request.status == "successful":
645+
return None
646+
request.status = "successful"
647+
request.finished_at = sa.func.now()
648+
session.commit()
649+
return request
628650

629651

630652
def set_request_status(
@@ -634,8 +656,6 @@ def set_request_status(
634656
cache_id: int | None = None,
635657
error_message: str | None = None,
636658
error_reason: str | None = None,
637-
log: list[tuple[int, str]] = [],
638-
user_visible_log: list[tuple[int, str]] = [],
639659
resubmit: bool | None = None,
640660
) -> SystemRequest:
641661
"""Set the status of a request."""
@@ -651,16 +671,15 @@ def set_request_status(
651671
request.request_metadata = metadata
652672
if status == "successful":
653673
request.finished_at = sa.func.now()
654-
request.cache_id = cache_id
655674
elif status == "failed":
656675
request.finished_at = sa.func.now()
657676
request.response_error = {"message": error_message, "reason": error_reason}
658677
elif status == "running":
659678
request.started_at = sa.func.now()
660679
request.qos_status = {}
680+
if cache_id is not None:
681+
request.cache_id = cache_id
661682
# FIXME: logs can't be live updated
662-
request.response_log = json.dumps(log)
663-
request.response_user_visible_log = json.dumps(user_visible_log)
664683
request.status = status
665684
session.commit()
666685
return request

0 commit comments

Comments
 (0)