Skip to content

Commit e5e249a

Browse files
Reduce scheduler memory (#113)
* try to reduce scheduler memory * add cache * fix * add performance logger to submit_requests * add perf logger * add perf logger * add perflogger * block adding new queued requests if the table is too big * revert * try to improve performance * truncate instead of delete * fix truncate * fix * improve performance of delete_request_qos_status * remove wrong queue update * fix * adding cascading rules * style * remove comments * qa --------- Co-authored-by: Alessio Siniscalchi <[email protected]>
1 parent 62330e1 commit e5e249a

File tree

3 files changed

+89
-18
lines changed

3 files changed

+89
-18
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""cascading rules for requests-rules.
2+
3+
Revision ID: ca178571bdc5
4+
Revises: 8deb52d20c05
5+
Create Date: 2024-06-10 17:36:06.551396
6+
7+
"""
8+
9+
from alembic import op
10+
11+
# revision identifiers, used by Alembic.
12+
revision = "ca178571bdc5"
13+
down_revision = "8deb52d20c05"
14+
branch_labels = None
15+
depends_on = None
16+
17+
18+
def upgrade() -> None:
19+
op.drop_constraint(
20+
"system_request_qos_rule_rule_uid_fkey", "system_request_qos_rule"
21+
)
22+
op.drop_constraint(
23+
"system_request_qos_rule_request_uid_fkey", "system_request_qos_rule"
24+
)
25+
op.create_foreign_key(
26+
"system_request_qos_rule_rule_uid_fkey",
27+
"system_request_qos_rule",
28+
"qos_rules",
29+
["rule_uid"],
30+
["uid"],
31+
ondelete="CASCADE",
32+
)
33+
op.create_foreign_key(
34+
"system_request_qos_rule_request_uid_fkey",
35+
"system_request_qos_rule",
36+
"system_requests",
37+
["request_uid"],
38+
["request_uid"],
39+
ondelete="CASCADE",
40+
)
41+
42+
43+
def downgrade() -> None:
44+
op.drop_constraint(
45+
"system_request_qos_rule_rule_uid_fkey", "system_request_qos_rule"
46+
)
47+
op.drop_constraint(
48+
"system_request_qos_rule_request_uid_fkey", "system_request_qos_rule"
49+
)
50+
op.create_foreign_key(
51+
"system_request_qos_rule_rule_uid_fkey",
52+
"system_request_qos_rule",
53+
"qos_rules",
54+
["rule_uid"],
55+
["uid"],
56+
)
57+
op.create_foreign_key(
58+
"system_request_qos_rule_request_uid_fkey",
59+
"system_request_qos_rule",
60+
"system_requests",
61+
["request_uid"],
62+
["request_uid"],
63+
)

cads_broker/database.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,12 @@ class SystemRequestQoSRule(BaseModel):
7070

7171
request_uid = sa.Column(
7272
sa.dialects.postgresql.UUID(False),
73-
sa.ForeignKey("system_requests.request_uid"),
73+
sa.ForeignKey("system_requests.request_uid", ondelete="CASCADE"),
7474
primary_key=True,
7575
)
76-
rule_uid = sa.Column(sa.Text, sa.ForeignKey("qos_rules.uid"), primary_key=True)
76+
rule_uid = sa.Column(
77+
sa.Text, sa.ForeignKey("qos_rules.uid", ondelete="CASCADE"), primary_key=True
78+
)
7779

7880

7981
class Events(BaseModel):
@@ -467,9 +469,11 @@ def get_events_from_request(
467469

468470
def reset_qos_rules(session: sa.orm.Session, qos):
469471
"""Delete all QoS rules."""
470-
for rule in session.scalars(sa.select(QoSRule)):
471-
rule.system_requests = []
472-
session.delete(rule)
472+
session.execute(sa.text("truncate qos_rules cascade"))
473+
# for rule in session.scalars(sa.select(QoSRule)):
474+
# # rule.system_requests = []
475+
# session.delete(rule)
476+
473477
cached_rules: dict[str, Any] = {}
474478
for request in get_running_requests(session):
475479
# Recompute the limits
@@ -484,6 +488,11 @@ def reset_qos_rules(session: sa.orm.Session, qos):
484488
session.commit()
485489

486490

491+
def count_system_request_qos_rule(session: sa.orm.Session) -> int:
492+
"""Count the number of rows in system_request_qos_rule."""
493+
return session.query(SystemRequestQoSRule).count()
494+
495+
487496
def get_qos_rule(uid: str, session: sa.orm.Session):
488497
"""Get a QoS rule."""
489498
statement = sa.select(QoSRule).where(QoSRule.uid == uid)
@@ -557,11 +566,10 @@ def delete_request_qos_status(
557566
except sqlalchemy.orm.exc.NoResultFound:
558567
qos_rule = add_qos_rule(rule=rule, session=session)
559568
created_rules[qos_rule.uid] = qos_rule
560-
if qos_rule.uid in [r.uid for r in request.qos_rules]:
561-
request.qos_rules.remove(qos_rule)
562569
qos_rule.queued = len(rule.queued)
563570
qos_rule.running = rule.value
564-
return request, created_rules
571+
request.qos_rules = []
572+
return None, created_rules
565573

566574

567575
def add_request_qos_status(

cads_broker/dispatcher.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def get_tasks_from_scheduler(client: distributed.Client) -> Any:
7979
8080
This function is executed on the scheduler pod.
8181
"""
82+
8283
def get_tasks_on_scheduler(dask_scheduler: distributed.Scheduler) -> dict[str, Any]:
8384
tasks = {}
8485
for task_id, task in dask_scheduler.tasks.items():
@@ -177,6 +178,7 @@ def len(self) -> int:
177178
with self._lock:
178179
return len(self.queue_dict)
179180

181+
@cachetools.cachedmethod(lambda self: cachetools.TTLCache(maxsize=1024, ttl=60))
180182
def reset(self) -> None:
181183
with self._lock:
182184
self.queue_dict = dict()
@@ -235,8 +237,8 @@ def from_address(
235237
rules_hash=rules_hash,
236238
)
237239
with session_maker_write() as session:
238-
qos.reload_rules(session=session)
239-
db.reset_qos_rules(session, qos)
240+
perf_logger(qos.reload_rules)(session=session)
241+
perf_logger(db.reset_qos_rules)(session, qos)
240242
self = cls(
241243
client=client,
242244
session_maker_read=session_maker_read,
@@ -263,7 +265,7 @@ def set_request_error_status(
263265
264266
If the error reason is "KilledWorker":
265267
- if the worker has been killed by the Nanny for memory usage, it add the event for the user
266-
- if the worker is killed for unknown reasons, it re-queues the request
268+
- if the worker is killed for unknown reasons, it re-queues the request
267269
if the requeue limit is not reached. This is configurable with the environment variable
268270
"""
269271
error_message = "".join(traceback.format_exception(exception))
@@ -327,10 +329,10 @@ def sync_database(self, session: sa.orm.Session) -> None:
327329
328330
- If the task is in the futures list it does nothing.
329331
- If the task is not in the futures list but it is in the scheduler:
330-
- If the task is in memory (it is successful but it has been lost by the broker),
332+
- If the task is in memory (it is successful but it has been lost by the broker),
331333
it is set to successful.
332334
- If the task is in error, it is set to failed.
333-
- If the task is not in the dask scheduler, it is re-queued.
335+
- If the task is not in the dask scheduler, it is re-queued.
334336
This behaviour can be changed with an environment variable.
335337
"""
336338
# the retrieve API sets the status to "dismissed", here the broker deletes the request
@@ -444,15 +446,15 @@ def sync_qos_rules(self, session_write) -> None:
444446
- the requests from the self.queue.
445447
If a request is updated the relative self.queue entry is updated too
446448
"""
447-
qos_rules = db.get_qos_rules(session=session_write)
449+
qos_rules = perf_logger(db.get_qos_rules)(session=session_write)
448450
if tasks_number := len(self.internal_scheduler.queue):
449451
logger.info("performance", tasks_number=tasks_number)
450452
for task in list(self.internal_scheduler.queue)[
451453
: int(os.getenv("BROKER_MAX_INTERNAL_SCHEDULER_TASKS", 500))
452454
]:
453455
# the internal scheduler is used to asynchronously add qos rules to database
454456
# it returns a new qos rule if a new qos rule is added to database
455-
request, new_qos_rules = task["function"](
457+
request, new_qos_rules = perf_logger(task["function"])(
456458
session=session_write,
457459
request=self.queue.get(task["kwargs"].get("request_uid")),
458460
rules_in_db=qos_rules,
@@ -527,6 +529,7 @@ def on_future_done(self, future: distributed.Future) -> str:
527529
future.release()
528530
return future.key
529531

532+
@perf_logger
530533
def submit_requests(
531534
self,
532535
session_write: sa.orm.Session,
@@ -568,10 +571,7 @@ def submit_request(
568571
request_uid=request.request_uid,
569572
user_uid=request.user_uid,
570573
hostname=os.getenv("CDS_PROJECT_URL"),
571-
**request.adaptor_properties.config,
572574
),
573-
form=request.adaptor_properties.form,
574-
request=request.request_body.get("request", {}),
575575
resources=request.request_metadata.get("resources", {}),
576576
metadata=request.request_metadata,
577577
)

0 commit comments

Comments
 (0)