Skip to content

Commit f3461b1

Browse files
Quality of Service status refactor (#153)
* Refactor requeue_request and set_successful_request functions to remove unnecessary status checks * Refactor Broker class to replace number_of_workers property with set_number_of_workers method and update references to use environment.number_of_workers * prevent too often reload of qos * Increase broker_get_number_of_workers_cache_time from 10 to 60 seconds * debug message * fix error * Reset queue in Broker class after refreshing QoS rules * Refactor BrokerConfig class for improved readability of memory error log and workers gap configuration * qa
1 parent f1b3522 commit f3461b1

File tree

3 files changed

+203
-134
lines changed

3 files changed

+203
-134
lines changed

cads_broker/config.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
class BrokerConfig(pydantic_settings.BaseSettings):
2929
broker_priority_algorithm: str = "legacy"
3030
broker_priority_interval_hours: int = 24
31-
broker_get_number_of_workers_cache_time: int = 10
31+
broker_get_number_of_workers_cache_time: int = 60
3232
broker_qos_rules_cache_time: int = 10
3333
broker_get_tasks_from_scheduler_cache_time: int = 1
3434
broker_rules_path: str = "/src/rules.qos"
@@ -42,7 +42,12 @@ class BrokerConfig(pydantic_settings.BaseSettings):
4242
broker_max_dismissed_requests: int = 100
4343
broker_cancel_stuck_requests_cache_ttl: int = 60
4444
broker_stuck_requests_limit_minutes: int = 15
45-
broker_memory_error_user_visible_log: str = "Worker has been killed due to memory usage."
45+
broker_memory_error_user_visible_log: str = (
46+
"Worker has been killed due to memory usage."
47+
)
48+
broker_workers_gap: int = (
49+
10 # max discrepancy of workers number before qos rules are reloaded
50+
)
4651

4752

4853
class SqlalchemySettings(pydantic_settings.BaseSettings):

cads_broker/database.py

+13-18
Original file line numberDiff line numberDiff line change
@@ -652,21 +652,18 @@ def get_qos_status_from_request(
652652
def requeue_request(
653653
request: SystemRequest,
654654
session: sa.orm.Session,
655-
) -> SystemRequest | None:
656-
if request.status == "running":
657-
# ugly implementation because sqlalchemy doesn't allow to directly update JSONB
658-
# FIXME: use a specific column for resubmit_number
659-
metadata = dict(request.request_metadata)
660-
metadata.update(
661-
{"resubmit_number": request.request_metadata.get("resubmit_number", 0) + 1}
662-
)
663-
request.request_metadata = metadata
664-
request.status = "accepted"
665-
session.commit()
666-
logger.info("requeueing request", **logger_kwargs(request=request))
667-
return request
668-
else:
669-
return None
655+
) -> SystemRequest:
656+
# ugly implementation because sqlalchemy doesn't allow to directly update JSONB
657+
# FIXME: use a specific column for resubmit_number
658+
metadata = dict(request.request_metadata)
659+
metadata.update(
660+
{"resubmit_number": request.request_metadata.get("resubmit_number", 0) + 1}
661+
)
662+
request.request_metadata = metadata
663+
request.status = "accepted"
664+
session.commit()
665+
logger.info("requeueing request", **logger_kwargs(request=request))
666+
return request
670667

671668

672669
def set_request_cache_id(request_uid: str, cache_id: int, session: sa.orm.Session):
@@ -680,11 +677,9 @@ def set_request_cache_id(request_uid: str, cache_id: int, session: sa.orm.Sessio
680677
def set_successful_request(
681678
request_uid: str,
682679
session: sa.orm.Session,
683-
) -> SystemRequest | None:
680+
) -> SystemRequest:
684681
statement = sa.select(SystemRequest).where(SystemRequest.request_uid == request_uid)
685682
request = session.scalars(statement).one()
686-
if request.status == "successful":
687-
return None
688683
request.status = "successful"
689684
request.finished_at = sa.func.now()
690685
session.commit()

0 commit comments

Comments
 (0)