Skip to content

Commit 905ef7a

Browse files
Include deleted jobs in processing time algorithm (#140)
* Refactor request status handling to ensure finished_at is set for deleted requests * add logs * qa * fix tests
1 parent 490e98f commit 905ef7a

File tree

4 files changed

+20
-13
lines changed

4 files changed

+20
-13
lines changed

cads_broker/database.py

-1
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,6 @@ def get_users_queue_from_processing_time(
467467
interval_clause = sa.sql.and_(
468468
SystemRequest.finished_at >= interval_start,
469469
SystemRequest.finished_at < interval_stop,
470-
SystemRequest.status != "deleted",
471470
SystemRequest.started_at.is_not(None),
472471
)
473472
where_clause = sa.sql.or_(interval_clause, SystemRequest.status == "running")

cads_broker/dispatcher.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -384,11 +384,8 @@ def manage_dismissed_request(self, request, session):
384384
previous_status = dismission_metadata.get("previous_status", "accepted")
385385
if dismission_metadata.get("reason", "DismissedRequest") == "PermissionError":
386386
request.status = "failed"
387-
request.finished_at = datetime.datetime.now()
388387
else:
389388
request.status = "deleted"
390-
if request.finished_at is None:
391-
request.finished_at = datetime.datetime.now()
392389
if previous_status == "running":
393390
self.qos.notify_end_of_request(
394391
request, session, scheduler=self.internal_scheduler
@@ -398,6 +395,9 @@ def manage_dismissed_request(self, request, session):
398395
self.qos.notify_dismission_of_request(
399396
request, session, scheduler=self.internal_scheduler
400397
)
398+
# set finished_at if it is not set
399+
if request.finished_at is None:
400+
request.finished_at = datetime.datetime.now()
401401
logger.info("job has finished", **db.logger_kwargs(request=request))
402402
return session
403403

@@ -683,13 +683,9 @@ def processing_time_priority_algorithm(
683683
request, session=session_write, scheduler=self.internal_scheduler
684684
)
685685
if can_run and may_run and requests_counter < number_of_requests:
686-
logger.info(
687-
"user priority",
688-
user=user_uid,
689-
request_uid=request.request_uid,
690-
priority=user_cost,
686+
self.submit_request(
687+
request, priority=user_cost, session=session_write
691688
)
692-
self.submit_request(request, session=session_write)
693689
may_run = False
694690
requests_counter += 1
695691

@@ -722,7 +718,10 @@ def submit_requests(
722718
requests_counter += 1
723719

724720
def submit_request(
725-
self, request: db.SystemRequest, session: sa.orm.Session
721+
self,
722+
request: db.SystemRequest,
723+
session: sa.orm.Session,
724+
priority: int | None = None,
726725
) -> None:
727726
"""Submit the request to the dask scheduler and update the qos rules accordingly."""
728727
request = db.set_request_status(
@@ -749,6 +748,7 @@ def submit_request(
749748
self.futures[request.request_uid] = future
750749
logger.info(
751750
"submitted job to scheduler",
751+
priority=priority,
752752
**db.logger_kwargs(request=request),
753753
)
754754

tests/test_02_database.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,13 @@ def test_get_users_queue_from_processing_time(session_obj: sa.orm.sessionmaker)
747747
started_at=None,
748748
finished_at=datetime.datetime.now() - datetime.timedelta(hours=10),
749749
)
750+
request_8 = mock_system_request(
751+
status="deleted",
752+
adaptor_properties_hash=adaptor_properties.hash,
753+
user_uid="user2",
754+
started_at=datetime.datetime.now() - datetime.timedelta(hours=15),
755+
finished_at=datetime.datetime.now() - datetime.timedelta(hours=10),
756+
)
750757
with session_obj() as session:
751758
session.add(adaptor_properties)
752759
session.add(request_1)
@@ -756,14 +763,15 @@ def test_get_users_queue_from_processing_time(session_obj: sa.orm.sessionmaker)
756763
session.add(request_5)
757764
session.add(request_6)
758765
session.add(request_7)
766+
session.add(request_8)
759767
session.commit()
760768
with session_obj() as session:
761769
users_cost = db.get_users_queue_from_processing_time(
762770
session, interval_stop=datetime.datetime.now()
763771
)
764772
assert users_cost["user3"] == 0
765773
assert users_cost["user1"] == 15 * 60 * 60
766-
assert users_cost["user2"] == 30 * 60 * 60
774+
assert users_cost["user2"] == (10 + 20 + 5) * 60 * 60
767775

768776

769777
def test_get_request_result(session_obj: sa.orm.sessionmaker) -> None:

tests/test_20_dispatcher.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def mock_get_users_queue_from_processing_time():
147147

148148
submitted_requests = []
149149

150-
def mock_submit_request(self, request, session):
150+
def mock_submit_request(self, request, priority, session):
151151
submitted_requests.append(request.request_uid)
152152
for candidate in candidates:
153153
if candidate.request_uid == request.request_uid:

0 commit comments

Comments
 (0)