Skip to content

Commit 53e62b6

Browse files
committed
Improved and cleaner cost query
1 parent 5e7b14f commit 53e62b6

File tree

2 files changed

+24
-7
lines changed

2 files changed

+24
-7
lines changed

cads_broker/database.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -532,12 +532,26 @@ def decrement_qos_rule_running(
532532

533533
def get_users_queue_from_processing_time(
534534
session: sa.orm.Session,
535-
) -> list[tuple[str, int]]:
535+
interval_stop: datetime.datetime,
536+
interval: datetime.timedelta = datetime.timedelta(hours=24),
537+
) -> list[tuple[str, float]]:
536538
"""Build the queue of the users from the processing time."""
537-
statement = sa.text("select user_uid, sum(EXTRACT(EPOCH FROM (coalesce(finished_at, now()) -" \
538-
" coalesce(started_at, coalesce(finished_at, now()))))::integer) as cost" \
539-
" from system_requests where finished_at > (now() - interval '24h') " \
540-
" or status='running' or status='accepted' group by user_uid order by cost")
539+
interval_start = interval_stop - interval
540+
request_processing_time = sa.sql.func.least(
541+
SystemRequest.finished_at, interval_stop
542+
) - sa.sql.func.greatest(SystemRequest.started_at, interval_start)
543+
user_cumulative_processing_time = sa.sql.func.sum(request_processing_time)
544+
user_cost = sa.sql.func.extract("epoch", user_cumulative_processing_time)
545+
interval_clause = sa.sql.and_(
546+
SystemRequest.finished_at >= interval_start,
547+
SystemRequest.finished_at < interval_stop,
548+
)
549+
where_clause = sa.sql.or_(
550+
interval_clause, SystemRequest.status in ["running", "accepted"]
551+
)
552+
553+
statement = sa.sql.select(SystemRequest.user_uid, user_cost).where(where_clause)
554+
541555
return session.execute(statement).all()
542556

543557

cads_broker/dispatcher.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -579,9 +579,12 @@ def submit_requests(
579579
for request in candidates:
580580
user_requests.setdefault(request.user_uid, []).append(request)
581581
# FIXME: this is a temporary solution to prioritize subrequests from the high priority user
582+
interval_stop = datetime.datetime.now()
582583
users_queue = [
583-
(HIGH_PRIORITY_USER_UID, 0)
584-
] + db.get_users_queue_from_processing_time(session=session_write)
584+
(HIGH_PRIORITY_USER_UID, 0.0)
585+
] + db.get_users_queue_from_processing_time(
586+
interval_stop, session=session_write
587+
)
585588
requests_counter = 0
586589
for user_uid, _ in users_queue:
587590
if user_uid not in user_requests:

0 commit comments

Comments
 (0)