Skip to content

Commit 9cc282f

Browse files
Change dismissed behaviour (#116)
* change dismissed requests behaviour * tests * try to fix * keep dismissed requests as dismissed instead of make them failing * qa
1 parent a620f17 commit 9cc282f

File tree

5 files changed

+42
-50
lines changed

5 files changed

+42
-50
lines changed

cads_broker/database.py

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -427,26 +427,9 @@ def count_users(status: str, entry_point: str, session: sa.orm.Session) -> int:
427427
)
428428

429429

430-
def update_dismissed_requests(session: sa.orm.Session) -> Iterable[str]:
431-
stmt_dismissed = (
432-
sa.update(SystemRequest)
433-
.where(SystemRequest.status == "dismissed")
434-
.returning(SystemRequest.request_uid)
435-
.values(status="failed", response_error={"reason": "dismissed request"})
436-
)
437-
dismissed_uids = session.scalars(stmt_dismissed).fetchall()
438-
session.execute( # type: ignore
439-
sa.insert(Events),
440-
map(
441-
lambda x: {
442-
"request_uid": x,
443-
"message": DISMISSED_MESSAGE,
444-
"event_type": "user_visible_error",
445-
},
446-
dismissed_uids,
447-
),
448-
)
449-
return dismissed_uids
430+
def get_dismissed_requests(session: sa.orm.Session) -> Iterable[SystemRequest]:
431+
stmt_dismissed = sa.select(SystemRequest).where(SystemRequest.status == "dismissed")
432+
return session.scalars(stmt_dismissed).fetchall()
450433

451434

452435
def get_events_from_request(

cads_broker/dispatcher.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,9 @@ def values(self) -> Iterable[Any]:
170170
with self._lock:
171171
return self.queue_dict.values()
172172

173-
def pop(self, key: str) -> Any:
173+
def pop(self, key: str, default=None) -> Any:
174174
with self._lock:
175-
return self.queue_dict.pop(key, None)
175+
return self.queue_dict.pop(key, default)
176176

177177
def len(self) -> int:
178178
with self._lock:
@@ -336,16 +336,16 @@ def sync_database(self, session: sa.orm.Session) -> None:
336336
- If the task is not in the dask scheduler, it is re-queued.
337337
This behaviour can be changed with an environment variable.
338338
"""
339-
# the retrieve API sets the status to "dismissed", here the broker deletes the request
340-
# this is to better control the status of the QoS
341-
dismissed_uids = db.update_dismissed_requests(session)
342-
for uid in dismissed_uids:
343-
if future := self.futures.pop(uid, None):
339+
# the retrieve API sets the status to "dismissed",
340+
# here the broker fixes the QoS and queue status accordingly
341+
dismissed_requests = db.get_dismissed_requests(session)
342+
for request in dismissed_requests:
343+
if future := self.futures.pop(request.request_uid, None):
344344
future.cancel()
345-
if dismissed_uids:
346-
self.queue.reset()
347-
self.qos.reload_rules(session)
348-
db.reset_qos_rules(session, self.qos)
345+
self.qos.notify_end_of_request(
346+
request, session, scheduler=self.internal_scheduler
347+
)
348+
self.queue.pop(request.request_uid, None)
349349
session.commit()
350350

351351
statement = sa.select(db.SystemRequest).where(

cads_broker/entry_points.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ def add_dummy_requests(
5353
entry_point="cads_adaptors:DummyAdaptor",
5454
)
5555
session.add(request)
56-
if i % 100 == 0:
5756
session.commit()
5857
session.commit()
5958

tests/test_02_database.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414

1515

1616
class MockRule:
17-
def __init__(self, name, conclusion, info, condition):
17+
def __init__(self, name, conclusion, info, condition, queued=[], running=0):
1818
self.name = name
1919
self.conclusion = conclusion
2020
self.info = info
2121
self.condition = condition
22+
self.queued = queued
23+
self.value = running
2224

2325
def evaluate(self, request):
2426
return 10
@@ -517,8 +519,12 @@ def test_add_qos_rule(session_obj: sa.orm.sessionmaker) -> None:
517519

518520

519521
def test_add_request_qos_status(session_obj: sa.orm.sessionmaker) -> None:
520-
rule1 = MockRule("name1", "conclusion1", "info1", "condition1")
521-
rule2 = MockRule("name2", "conclusion2", "info2", "condition2")
522+
rule1 = MockRule(
523+
"name1", "conclusion1", "info1", "condition1", queued=list(range(5))
524+
)
525+
rule2 = MockRule(
526+
"name2", "conclusion2", "info2", "condition2", queued=list(range(1))
527+
)
522528
adaptor_properties = mock_config()
523529
request = mock_system_request(adaptor_properties_hash=adaptor_properties.hash)
524530
request_uid = request.request_uid
@@ -535,16 +541,18 @@ def test_add_request_qos_status(session_obj: sa.orm.sessionmaker) -> None:
535541
with session_obj() as session:
536542
request = db.get_request(request_uid, session=session)
537543
assert db.get_qos_status_from_request(request) == {
538-
"name1": [
539-
{"info": "info1", "queued": 5 + 1, "running": 0, "conclusion": "10"}
540-
],
544+
"name1": [{"info": "info1", "queued": 5, "running": 0, "conclusion": "10"}],
541545
"name2": [{"info": "info2", "queued": 1, "running": 0, "conclusion": "10"}],
542546
}
543547

544548

545549
def test_delete_request_qos_status(session_obj: sa.orm.sessionmaker) -> None:
546-
rule1 = MockRule("name1", "conclusion1", "info1", "condition1")
547-
rule2 = MockRule("name2", "conclusion2", "info2", "condition2")
550+
rule1 = MockRule(
551+
"name1", "conclusion1", "info1", "condition1", queued=list(range(5)), running=2
552+
)
553+
rule2 = MockRule(
554+
"name2", "conclusion2", "info2", "condition2", queued=list(range(3)), running=2
555+
)
548556
adaptor_properties = mock_config()
549557
request = mock_system_request(adaptor_properties_hash=adaptor_properties.hash)
550558
request_uid = request.request_uid
@@ -574,14 +582,16 @@ def test_delete_request_qos_status(session_obj: sa.orm.sessionmaker) -> None:
574582
rule1 = db.get_qos_rule(str(rule1.__hash__()), session=session)
575583
rule2 = db.get_qos_rule(str(rule2.__hash__()), session=session)
576584
assert rule1.queued == rule1_queued
577-
assert rule1.running == rule1_running + 1
578585
assert rule2.queued == rule2_queued
579-
assert rule2.running == rule2_running + 1
580586

581587

582588
def test_decrement_qos_rule_running(session_obj: sa.orm.sessionmaker) -> None:
583-
rule1 = MockRule("name1", "conclusion1", "info1", "condition1")
584-
rule2 = MockRule("name2", "conclusion2", "info2", "condition2")
589+
rule1 = MockRule(
590+
"name1", "conclusion1", "info1", "condition1", queued=list(range(5)), running=2
591+
)
592+
rule2 = MockRule(
593+
"name2", "conclusion2", "info2", "condition2", queued=list(range(3)), running=4
594+
)
585595
rule1_queued = 5
586596
rule1_running = 2
587597
rule2_queued = 3
@@ -602,11 +612,11 @@ def test_decrement_qos_rule_running(session_obj: sa.orm.sessionmaker) -> None:
602612
with session_obj() as session:
603613
assert (
604614
db.get_qos_rule(str(rule1.__hash__()), session=session).running
605-
== rule1_running - 1
615+
== rule1_running
606616
)
607617
assert (
608618
db.get_qos_rule(str(rule2.__hash__()), session=session).running
609-
== rule2_running - 1
619+
== rule2_running
610620
)
611621

612622

tests/test_20_dispatcher.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,10 @@ def test_broker_sync_database(
8989
session.commit()
9090

9191
def mock_get_tasks() -> dict[str, str]:
92-
return {in_dask_request_uid: "..."}
92+
return {in_dask_request_uid: {"state": "...", "exception": None}}
9393

9494
mocker.patch(
95-
"cads_broker.dispatcher.get_tasks",
95+
"cads_broker.dispatcher.get_tasks_from_scheduler",
9696
return_value=mock_get_tasks(),
9797
)
9898
broker.futures = {in_futures_request_uid: "..."}
@@ -114,8 +114,8 @@ def mock_get_tasks() -> dict[str, str]:
114114
db.SystemRequest.request_uid == lost_request_uid
115115
)
116116
output_request = session.scalars(statement).first()
117-
assert output_request.status == "failed"
118-
assert output_request.request_metadata.get("resubmit_number") is None
117+
assert output_request.status == "accepted"
118+
assert output_request.request_metadata.get("resubmit_number") == 1
119119

120120
# with pytest.raises(db.NoResultFound):
121121
# with session_obj() as session:

0 commit comments

Comments
 (0)