Skip to content

Commit a086d50

Browse files
Batch queued requests (#134)
* add limit on get_accepted_requests * test log * comment out reset queue * re-introduce check queue * cache requests qos properties and manage permission error in 1 commit * move commits * fix permission error * copy values of the dictionary * add perf logger * perf logger and optimisation * fix * logs * add logs * add response_error * delete log messages * optional commit for add_event * sessionflush every 2000 * fix permission error when properties are not cached * commit instead of flush * introduce limit on dismissed requests management * move check permission outside of properties * fix permission error * improve performance * qa * qa
1 parent f9d9b50 commit a086d50

File tree

7 files changed

+133
-76
lines changed

7 files changed

+133
-76
lines changed

cads_broker/config.py

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class BrokerConfig(pydantic_settings.BaseSettings):
3939
broker_requeue_on_lost_requests: bool = True
4040
broker_requeue_limit: int = 3
4141
broker_max_internal_scheduler_tasks: int = 500
42+
broker_max_accepted_requests: int = 2000
4243

4344

4445
class SqlalchemySettings(pydantic_settings.BaseSettings):

cads_broker/database.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -221,14 +221,17 @@ def get_running_requests(
221221
def get_accepted_requests(
222222
session: sa.orm.Session,
223223
last_created_at: datetime.datetime | None = None,
224+
limit: int | None = None,
224225
):
225-
"""Get all accepted requests."""
226+
"""Get all accepted requests after 'last_created_at'."""
226227
statement = sa.select(SystemRequest)
227228
if last_created_at:
228229
statement = statement.where(SystemRequest.created_at >= last_created_at)
229230
statement = statement.where(SystemRequest.status == "accepted").order_by(
230231
SystemRequest.created_at
231232
)
233+
if limit:
234+
statement = statement.limit(limit)
232235
return session.scalars(statement).all()
233236

234237

@@ -321,8 +324,12 @@ def count_users(status: str, entry_point: str, session: sa.orm.Session) -> int:
321324
)
322325

323326

324-
def get_dismissed_requests(session: sa.orm.Session) -> Iterable[SystemRequest]:
327+
def get_dismissed_requests(
328+
session: sa.orm.Session, limit: int | None
329+
) -> Iterable[SystemRequest]:
325330
stmt_dismissed = sa.select(SystemRequest).where(SystemRequest.status == "dismissed")
331+
if limit:
332+
stmt_dismissed = stmt_dismissed.limit(limit)
326333
return session.scalars(stmt_dismissed).fetchall()
327334

328335

@@ -708,10 +715,12 @@ def add_event(
708715
request_uid: str,
709716
message: str,
710717
session: sa.orm.Session,
718+
commit: bool = True,
711719
):
712720
event = Events(event_type=event_type, request_uid=request_uid, message=message)
713721
session.add(event)
714-
session.commit()
722+
if commit:
723+
session.commit()
715724

716725

717726
def dictify_request(request: SystemRequest) -> dict[str, Any]:

cads_broker/dispatcher.py

+42-5
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ def manage_dismissed_request(self, request, session):
335335
request_uid=request.request_uid,
336336
message=dismission_metadata.get("message", ""),
337337
session=session,
338+
commit=False,
338339
)
339340
previous_status = dismission_metadata.get("previous_status", "accepted")
340341
if dismission_metadata.get("reason", "DismissedRequest") == "PermissionError":
@@ -371,8 +372,10 @@ def sync_database(self, session: sa.orm.Session) -> None:
371372
"""
372373
# the retrieve API sets the status to "dismissed",
373374
# here the broker fixes the QoS and queue status accordingly
374-
dismissed_requests = db.get_dismissed_requests(session)
375-
for request in dismissed_requests:
375+
dismissed_requests = db.get_dismissed_requests(
376+
session, limit=CONFIG.broker_max_accepted_requests
377+
)
378+
for i, request in enumerate(dismissed_requests):
376379
if future := self.futures.pop(request.request_uid, None):
377380
future.cancel()
378381
session = self.manage_dismissed_request(request, session)
@@ -561,6 +564,34 @@ def on_future_done(self, future: distributed.Future) -> str:
561564
future.release()
562565
return future.key
563566

567+
@perf_logger
568+
def cache_requests_qos_properties(self, requests, session: sa.orm.Session) -> None:
569+
"""Cache the qos properties of the requests."""
570+
# copy list of requests to avoid RuntimeError: dictionary changed size during iteration
571+
for request in list(requests):
572+
try:
573+
self.qos._properties(request, check_permissions=True, session=session)
574+
except PermissionError as exception:
575+
db.add_event(
576+
event_type="user_visible_error",
577+
request_uid=request.request_uid,
578+
message=exception.args[0],
579+
session=session,
580+
)
581+
request = db.get_request(request.request_uid, session=session)
582+
request.status = "failed"
583+
request.finished_at = datetime.datetime.now()
584+
request.response_error = {
585+
"reason": "PermissionError",
586+
"message": exception.args[0],
587+
}
588+
self.queue.pop(request.request_uid, None)
589+
self.qos.notify_dismission_of_request(
590+
request, session, scheduler=self.internal_scheduler
591+
)
592+
logger.info("job has finished", **db.logger_kwargs(request=request))
593+
session.commit()
594+
564595
def processing_time_priority_algorithm(
565596
self,
566597
session_write: sa.orm.Session,
@@ -592,7 +623,9 @@ def processing_time_priority_algorithm(
592623
)
593624
for request in requests:
594625
# need to check the limits on each request to update the qos_rules table
595-
can_run = self.qos.can_run(request, session=session_write, scheduler=self.internal_scheduler)
626+
can_run = self.qos.can_run(
627+
request, session=session_write, scheduler=self.internal_scheduler
628+
)
596629
if can_run and may_run and requests_counter < number_of_requests:
597630
self.submit_request(request, session=session_write)
598631
may_run = False
@@ -673,6 +706,7 @@ def run(self) -> None:
673706
db.get_accepted_requests(
674707
session=session_write,
675708
last_created_at=self.queue.last_created_at,
709+
limit=CONFIG.broker_max_accepted_requests,
676710
)
677711
)
678712
self.sync_qos_rules(session_write)
@@ -688,10 +722,13 @@ def run(self) -> None:
688722
# if the internal queue is not in sync with the database, re-sync it
689723
logger.info(
690724
"re-syncing internal queue",
691-
internal_queue={queue_length},
692-
db_queue={db_queue},
725+
internal_queue=queue_length,
726+
db_queue=db_queue,
693727
)
694728
self.queue.reset()
729+
self.cache_requests_qos_properties(
730+
self.queue.values(), session_write
731+
)
695732

696733
running_requests = len(db.get_running_requests(session=session_read))
697734
queue_length = self.queue.len()

cads_broker/entry_points.py

+36-32
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import datetime
44
import os
55
import random
6-
import sqlite3
76
import uuid
87
from enum import Enum
98
from typing import Any, Optional
@@ -19,42 +18,47 @@
1918

2019
@app.command()
2120
def add_dummy_requests(
22-
requests_db: str, number_of_requests: int = 1000, number_of_users: int = 100
21+
number_of_requests: int = 1000, number_of_users: int = 100, max_length: int = 60
2322
) -> None:
24-
connection = sqlite3.connect(requests_db)
25-
connection.row_factory = sqlite3.Row
26-
cursor = connection.execute(f"SELECT * FROM broker limit {number_of_requests}")
2723
user_uids = [str(uuid.uuid4()) for _ in range(number_of_users)]
2824
with database.ensure_session_obj(None)() as session:
29-
for i, row in enumerate(cursor):
30-
if row["elapsed"] != "null":
31-
database.ensure_adaptor_properties(
32-
hash="test",
33-
config={},
34-
form={},
35-
session=session,
36-
)
37-
request = database.SystemRequest(
38-
request_uid=str(uuid.uuid4()),
39-
process_id="test-adaptor-dummy",
40-
user_uid=random.choice(user_uids),
41-
status="accepted",
42-
request_body={
43-
"setup_code": None,
44-
"request": {
45-
"elapsed": str(datetime.timedelta(seconds=row["elapsed"])),
46-
"timestamp": str(datetime.datetime.now()),
47-
},
25+
database.ensure_adaptor_properties(
26+
hash="test",
27+
config={},
28+
form={},
29+
session=session,
30+
)
31+
for i in range(number_of_requests):
32+
request = database.SystemRequest(
33+
request_uid=str(uuid.uuid4()),
34+
process_id="test-adaptor-dummy",
35+
user_uid=random.choice(user_uids),
36+
# avoid using the same timestamp for all requests
37+
created_at=datetime.datetime.now()
38+
- datetime.timedelta(seconds=number_of_requests - i),
39+
status="accepted",
40+
request_body={
41+
"setup_code": None,
42+
"request": {
43+
"elapsed": str(
44+
datetime.timedelta(seconds=random.randint(0, max_length))
45+
),
46+
"timestamp": str(datetime.datetime.now()),
4847
},
49-
request_metadata={},
50-
origin="api",
51-
portal="c3s",
52-
adaptor_properties_hash="test",
53-
entry_point="cads_adaptors:DummyAdaptor",
54-
)
55-
session.add(request)
56-
session.commit()
48+
},
49+
request_metadata={},
50+
origin="api",
51+
portal="c3s",
52+
adaptor_properties_hash="test",
53+
entry_point="cads_adaptors:DummyAdaptor",
54+
)
55+
session.add(request)
56+
57+
print(f"Added request {i} with request_uid {request.request_uid}")
58+
59+
# Commit all at the end
5760
session.commit()
61+
print(f"Committed {number_of_requests} requests")
5862

5963

6064
@app.command()

cads_broker/qos/QoS.py

+27-25
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def can_run(self, request, session, scheduler):
122122
return not len(limits) and not len(permissions)
123123

124124
@locked
125-
def _properties(self, request, session):
125+
def _properties(self, request, session, check_permissions=False):
126126
"""Return the Properties object associated with a request.
127127
128128
If it does not exists it is created.
@@ -135,18 +135,8 @@ def _properties(self, request, session):
135135

136136
properties = Properties()
137137

138-
# First check permissions
139-
for rule in self.rules.permissions:
140-
if rule.match(request):
141-
properties.permissions.append(rule)
142-
if not rule.evaluate(request):
143-
database.set_dismissed_request(
144-
request_uid=request.request_uid,
145-
session=session,
146-
message=rule.info.evaluate(Context(request, self.environment)),
147-
reason="PermissionError",
148-
)
149-
break
138+
if check_permissions:
139+
self.check_permissions_for(request, properties)
150140

151141
# Add general limits
152142
for rule in self.rules.global_limits:
@@ -217,7 +207,7 @@ def _status(self, request, session, out):
217207
out(" {}".format(priority))
218208

219209
out("Permissions rules:")
220-
for permission in self.permissions_for(request, session):
210+
for permission in self.check_permissions_for(request, session):
221211
out(" {}".format(permission))
222212

223213
@locked
@@ -229,12 +219,23 @@ def limits_for(self, request, session):
229219
return self._properties(request, session).limits
230220

231221
@locked
232-
def permissions_for(self, request, session):
222+
def check_permissions_for(self, request, properties):
233223
"""Return the permission rules that applies to a request.
234224
235225
Ensure that the properties cache is created if needed.
236226
"""
237-
return self._properties(request, session).permissions
227+
# check permissions
228+
for rule in self.rules.permissions:
229+
if rule.match(request):
230+
properties.permissions.append(rule)
231+
if not rule.evaluate(request):
232+
# Store in cache with empty properties
233+
self.requests_properties_cache[request.request_uid] = properties
234+
raise PermissionError(
235+
rule.info.evaluate(Context(request, self.environment))
236+
)
237+
238+
return properties.permissions
238239

239240
@locked
240241
def priorities_for(self, request, session):
@@ -303,15 +304,16 @@ def notify_dismission_of_request(self, request, session, scheduler):
303304
for limit in self.limits_for(request, session):
304305
limit.remove_from_queue(request.request_uid)
305306
limits_list.append(limit)
306-
scheduler.append(
307-
{
308-
"function": database.delete_request_qos_status,
309-
"kwargs": {
310-
"rules": limits_list,
311-
"request_uid": request.request_uid,
312-
},
313-
}
314-
)
307+
if limits_list:
308+
scheduler.append(
309+
{
310+
"function": database.delete_request_qos_status,
311+
"kwargs": {
312+
"rules": limits_list,
313+
"request_uid": request.request_uid,
314+
},
315+
}
316+
)
315317

316318
@locked
317319
def notify_start_of_request(self, request, session, scheduler):

tests/test_02_database.py

+14-7
Original file line numberDiff line numberDiff line change
@@ -100,18 +100,25 @@ def test_get_accepted_requests(session_obj: sa.orm.sessionmaker) -> None:
100100
successful_request = mock_system_request(
101101
status="running", adaptor_properties_hash=adaptor_properties.hash
102102
)
103-
accepted_request = mock_system_request(
103+
accepted_request1 = mock_system_request(
104104
status="accepted", adaptor_properties_hash=adaptor_properties.hash
105105
)
106-
accepted_request_uid = accepted_request.request_uid
106+
accepted_request2 = mock_system_request(
107+
status="accepted", adaptor_properties_hash=adaptor_properties.hash
108+
)
109+
accepted_request_uid = accepted_request1.request_uid
107110
with session_obj() as session:
108111
session.add(adaptor_properties)
109112
session.add(successful_request)
110-
session.add(accepted_request)
113+
session.add(accepted_request1)
114+
session.add(accepted_request2)
111115
session.commit()
112-
requests = db.get_accepted_requests(session=session)
113-
assert len(requests) == 1
114-
assert requests[0].request_uid == accepted_request_uid
116+
all_requests = db.get_accepted_requests(session=session)
117+
one_request = db.get_accepted_requests(session=session, limit=1)
118+
assert len(all_requests) == 2
119+
assert len(one_request) == 1
120+
assert min(all_requests, key=lambda x: x.created_at) == one_request[0]
121+
assert all_requests[0].request_uid == accepted_request_uid
115122

116123

117124
def test_count_finished_requests_per_user(session_obj: sa.orm.sessionmaker) -> None:
@@ -863,4 +870,4 @@ def test_count_running_users(session_obj: sa.orm.sessionmaker) -> None:
863870
)
864871
assert 1 == db.count_users(
865872
session=session, entry_point="foobar", status="running"
866-
)
873+
)

tests/test_20_dispatcher.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ def test_processing_time_priority_algorithm(
135135
request1 = mock_system_request(user_uid="user1", request_uid="request1")
136136
request2 = mock_system_request(user_uid="user1", request_uid="request2")
137137

138-
139138
def mock_get_users_queue_from_processing_time():
140139
return {"user1": [request1, request2]}
141140

@@ -154,9 +153,7 @@ def mock_submit_request(self, request, session):
154153
if candidate.request_uid == request.request_uid:
155154
candidates.remove(candidate)
156155

157-
mocker.patch(
158-
"cads_broker.dispatcher.Broker.submit_request", mock_submit_request
159-
)
156+
mocker.patch("cads_broker.dispatcher.Broker.submit_request", mock_submit_request)
160157

161158
environment = Environment.Environment()
162159
qos = QoS.QoS(

0 commit comments

Comments
 (0)