Skip to content

Quality of Service status refactor #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 7, 2025
9 changes: 7 additions & 2 deletions cads_broker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
class BrokerConfig(pydantic_settings.BaseSettings):
broker_priority_algorithm: str = "legacy"
broker_priority_interval_hours: int = 24
broker_get_number_of_workers_cache_time: int = 10
broker_get_number_of_workers_cache_time: int = 60
broker_qos_rules_cache_time: int = 10
broker_get_tasks_from_scheduler_cache_time: int = 1
broker_rules_path: str = "/src/rules.qos"
Expand All @@ -42,7 +42,12 @@ class BrokerConfig(pydantic_settings.BaseSettings):
broker_max_dismissed_requests: int = 100
broker_cancel_stuck_requests_cache_ttl: int = 60
broker_stuck_requests_limit_minutes: int = 15
broker_memory_error_user_visible_log: str = "Worker has been killed due to memory usage."
broker_memory_error_user_visible_log: str = (
"Worker has been killed due to memory usage."
)
broker_workers_gap: int = (
10 # max discrepancy of workers number before qos rules are reloaded
)


class SqlalchemySettings(pydantic_settings.BaseSettings):
Expand Down
31 changes: 13 additions & 18 deletions cads_broker/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,21 +652,18 @@ def get_qos_status_from_request(
def requeue_request(
request: SystemRequest,
session: sa.orm.Session,
) -> SystemRequest | None:
if request.status == "running":
# ugly implementation because sqlalchemy doesn't allow to directly update JSONB
# FIXME: use a specific column for resubmit_number
metadata = dict(request.request_metadata)
metadata.update(
{"resubmit_number": request.request_metadata.get("resubmit_number", 0) + 1}
)
request.request_metadata = metadata
request.status = "accepted"
session.commit()
logger.info("requeueing request", **logger_kwargs(request=request))
return request
else:
return None
) -> SystemRequest:
# ugly implementation because sqlalchemy doesn't allow to directly update JSONB
# FIXME: use a specific column for resubmit_number
metadata = dict(request.request_metadata)
metadata.update(
{"resubmit_number": request.request_metadata.get("resubmit_number", 0) + 1}
)
request.request_metadata = metadata
request.status = "accepted"
session.commit()
logger.info("requeueing request", **logger_kwargs(request=request))
return request


def set_request_cache_id(request_uid: str, cache_id: int, session: sa.orm.Session):
Expand All @@ -680,11 +677,9 @@ def set_request_cache_id(request_uid: str, cache_id: int, session: sa.orm.Sessio
def set_successful_request(
request_uid: str,
session: sa.orm.Session,
) -> SystemRequest | None:
) -> SystemRequest:
statement = sa.select(SystemRequest).where(SystemRequest.request_uid == request_uid)
request = session.scalars(statement).one()
if request.status == "successful":
return None
request.status = "successful"
request.finished_at = sa.func.now()
session.commit()
Expand Down
Loading
Loading