Skip to content

Commit 86b53d1

Browse files
Merge branch 'main' into download-buckets
2 parents 279e691 + c8ac301 commit 86b53d1

File tree

6 files changed

+135
-14
lines changed

6 files changed

+135
-14
lines changed

cads_broker/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class BrokerConfig(pydantic_settings.BaseSettings):
3939
broker_requeue_limit: int = 3
4040
broker_max_internal_scheduler_tasks: int = 500
4141
broker_max_accepted_requests: int = 2000
42+
broker_max_dismissed_requests: int = 100
4243
broker_cancel_stuck_requests_cache_ttl: int = 60
4344
broker_stuck_requests_limit_minutes: int = 15
4445

cads_broker/dispatcher.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ def sync_database(self, session: sa.orm.Session) -> None:
440440
# the retrieve API sets the status to "dismissed",
441441
# here the broker fixes the QoS and queue status accordingly
442442
dismissed_requests = db.get_dismissed_requests(
443-
session, limit=CONFIG.broker_max_accepted_requests
443+
session, limit=CONFIG.broker_max_dismissed_requests
444444
)
445445
for request in dismissed_requests:
446446
if future := self.futures.pop(request.request_uid, None):
@@ -450,6 +450,7 @@ def sync_database(self, session: sa.orm.Session) -> None:
450450
# try to cancel the job directly on the scheduler
451451
cancel_jobs_on_scheduler(self.client, job_ids=[request.request_uid])
452452
kill_job_on_worker(self.client, request.request_uid)
453+
kill_job_on_worker(self.client, request.request_uid)
453454
session = self.manage_dismissed_request(request, session)
454455
session.commit()
455456

cads_broker/entry_points.py

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import enum
55
import os
66
import random
7+
import time
78
import uuid
89
from pathlib import Path
910
from typing import Any, List, Optional
1011

12+
import prettytable
1113
import sqlalchemy as sa
1214
import typer
1315
from typing_extensions import Annotated
@@ -64,7 +66,10 @@ def add_dummy_requests(
6466

6567
@app.command()
6668
def requests_cleaner(
67-
connection_string: Optional[str] = None, older_than_days: Optional[int] = 365
69+
connection_string: Optional[str] = None,
70+
older_than_days: Optional[int] = 365,
71+
delete_bulk_size: Optional[int] = None,
72+
delete_sleep_time: Optional[int] = None,
6873
) -> None:
6974
"""Remove records from the system_requests table older than `older_than_days`."""
7075
if not connection_string:
@@ -73,18 +78,29 @@ def requests_cleaner(
7378
engine = sa.create_engine(connection_string)
7479
time_delta = datetime.datetime.now() - datetime.timedelta(days=older_than_days)
7580
# clean system requests and (via cascading delete) events
76-
with engine.begin() as conn:
77-
database.logger.info("deleting old system_requests and events...")
81+
database.logger.info("deleting old system_requests and events...")
82+
curr_deleted = 1
83+
subquery = sa.select(database.SystemRequest.request_uid).where(
84+
database.SystemRequest.created_at <= time_delta
85+
)
86+
with engine.connect() as conn:
87+
if delete_bulk_size is not None:
88+
# delete in sized bulks to give time to db replicas for synch
89+
subquery = subquery.limit(delete_bulk_size)
7890
stmt = sa.delete(database.SystemRequest).where(
79-
database.SystemRequest.created_at <= time_delta
80-
)
81-
result = conn.execute(stmt)
82-
conn.commit()
83-
num_requests_deleted = result.rowcount
84-
database.logger.info(
85-
f"{num_requests_deleted} old system requests "
86-
f"successfully removed from the broker database."
91+
database.SystemRequest.request_uid.in_(subquery)
8792
)
93+
while curr_deleted:
94+
with conn.begin():
95+
result = conn.execute(stmt)
96+
conn.commit()
97+
curr_deleted = result.rowcount
98+
database.logger.info(
99+
f"{curr_deleted} old system requests "
100+
f"successfully removed from the broker database."
101+
)
102+
if delete_sleep_time is not None:
103+
time.sleep(delete_sleep_time)
88104
# clean adaptor_properties
89105
with engine.begin() as conn:
90106
try:
@@ -131,6 +147,61 @@ class RequestStatus(str, enum.Enum):
131147
accepted = "accepted"
132148

133149

150+
@app.command()
151+
def get_dynamic_priority(
152+
request_uid: Optional[str] = None,
153+
request_uids_file: Annotated[
154+
Path, typer.Argument(exists=True, file_okay=True, dir_okay=False)
155+
]
156+
| None = None,
157+
interval: float = 24 * 60 * 60,
158+
origin: Optional[str] = None,
159+
resource_mul: float = -1.0,
160+
last_completed_mul: float = 0.8,
161+
):
162+
with database.ensure_session_obj(None)() as session:
163+
users_resources = database.get_users_queue_from_processing_time(
164+
session=session,
165+
interval_stop=datetime.datetime.now(),
166+
interval=datetime.timedelta(hours=interval / 60 / 60),
167+
origin=origin,
168+
)
169+
if request_uid:
170+
request_uids = [request_uid]
171+
elif request_uids_file:
172+
request_uids = request_uids_file.open().read().splitlines()
173+
table = prettytable.PrettyTable(
174+
[
175+
"user_uid",
176+
"request_uid",
177+
"process_id",
178+
"entry_point",
179+
"user_resources_used",
180+
"user_last_completed_request",
181+
"priority",
182+
]
183+
)
184+
for request_uid in request_uids:
185+
request = database.get_request(request_uid, session)
186+
resources = users_resources[request.user_uid]
187+
last_completed_request = database.user_last_completed_request(
188+
session, request.user_uid, interval
189+
)
190+
table.add_row(
191+
[
192+
request.user_uid,
193+
request_uid,
194+
request.process_id,
195+
request.entry_point,
196+
resources,
197+
last_completed_request,
198+
resource_mul * resources
199+
+ last_completed_mul * last_completed_request,
200+
]
201+
)
202+
typer.echo(table)
203+
204+
134205
@app.command()
135206
def delete_requests(
136207
status: RequestStatus = RequestStatus.running,

cads_broker/factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def request_contains_all(context, key, values):
2727
return contains_all(request_values, values)
2828

2929

30-
def request_contains_any(context, column, key, values):
30+
def request_contains_any(context, key, values):
3131
request_values = context.request.request_body.get("request").get(key)
3232
return contains_any(request_values, values)
3333

environment.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ dependencies:
1515
- dask-core
1616
- distributed
1717
- lz4 # dask distributed needs this to compress / de-compress
18+
- prettytable
1819
- psycopg2
1920
- pydantic>2
2021
- pydantic-settings

tests/test_90_entry_points.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import datetime
2+
import json
3+
import logging
24
import os
35
import unittest.mock
46
import uuid
57
from typing import Any
68

79
import cacholote
10+
import pytest
811
import sqlalchemy as sa
912
from psycopg import Connection
1013
from typer.testing import CliRunner
@@ -147,7 +150,9 @@ def prepare_db(
147150
# not existing case of recent properties with old requests
148151

149152

150-
def test_requests_cleaner(session_obj: sa.orm.sessionmaker):
153+
def test_requests_cleaner(
154+
session_obj: sa.orm.sessionmaker, caplog: pytest.LogCaptureFixture
155+
):
151156
connection_string = session_obj.kw["bind"].url
152157

153158
# test remove nothing, older_than_days=365 by default
@@ -207,3 +212,45 @@ def test_requests_cleaner(session_obj: sa.orm.sessionmaker):
207212
assert len(all_requests) == 5
208213
assert len(all_events) == 5
209214
assert len(all_props) == 5
215+
216+
# test remove 10 requests in bulk size of 3 (all old props have old requests)
217+
prepare_db(
218+
session_obj,
219+
num_old_props_used_by_old=10,
220+
num_recent_props_used_by_recent=5,
221+
)
222+
caplog.clear()
223+
caplog.set_level(logging.INFO)
224+
result = runner.invoke(
225+
entry_points.app,
226+
[
227+
"requests-cleaner",
228+
"--connection-string",
229+
connection_string,
230+
"--delete-bulk-size",
231+
3,
232+
"--delete-sleep-time",
233+
1,
234+
],
235+
)
236+
assert result.exit_code == 0
237+
with session_obj() as session:
238+
all_requests = session.query(database.SystemRequest).all()
239+
all_events = session.query(database.Events).all()
240+
all_props = session.query(database.AdaptorProperties).all()
241+
assert len(all_requests) == 5
242+
assert len(all_events) == 5
243+
assert len(all_props) == 5
244+
with caplog.at_level(logging.ERROR):
245+
log_msgs = [json.loads(r.msg)["event"] for r in caplog.records]
246+
assert log_msgs == [
247+
"deleting old system_requests and events...",
248+
"3 old system requests successfully removed from the broker database.",
249+
"3 old system requests successfully removed from the broker database.",
250+
"3 old system requests successfully removed from the broker database.",
251+
"1 old system requests successfully removed from the broker database.",
252+
"0 old system requests successfully removed from the broker database.",
253+
"deleting old adaptor_properties...",
254+
"10 old adaptor properties successfully removed from the broker database.",
255+
]
256+
caplog.clear()

0 commit comments

Comments
 (0)