|
6 | 6 |
|
7 | 7 | import sqlalchemy as sa
|
8 | 8 | import typer
|
| 9 | +from typing_extensions import Annotated |
9 | 10 |
|
10 | 11 | from cads_broker import config, database, dispatcher, object_storage
|
11 | 12 |
|
@@ -63,6 +64,55 @@ def requests_cleaner(
|
63 | 64 | raise
|
64 | 65 |
|
65 | 66 |
|
| 67 | +@app.command() |
| 68 | +def delete_running_requests( |
| 69 | + connection_string: Optional[str] = None, |
| 70 | + minutes: float = typer.Option(0.0), |
| 71 | + seconds: float = typer.Option(0.0), |
| 72 | + hours: float = typer.Option(0.0), |
| 73 | + days: float = typer.Option(0.0), |
| 74 | + skip_confirmation: Annotated[bool, typer.Option("--yes", "-y")] = False, |
| 75 | +) -> None: |
| 76 | + """Remove records from the system_requests table that are currently running. |
| 77 | +
|
| 78 | + Parameters |
| 79 | + ---------- |
| 80 | + connection_string: something like 'postgresql://user:password@netloc:port/dbname' |
| 81 | + """ |
| 82 | + if not connection_string: |
| 83 | + dbsettings = config.ensure_settings(config.dbsettings) |
| 84 | + connection_string = dbsettings.connection_string |
| 85 | + timestamp = datetime.datetime.now() - datetime.timedelta( |
| 86 | + minutes=minutes, seconds=seconds, hours=hours, days=days |
| 87 | + ) |
| 88 | + with database.ensure_session_obj(None)() as session: |
| 89 | + database.logger.info(f"deleting old system_requests before {timestamp}.") |
| 90 | + statement = ( |
| 91 | + sa.select(database.SystemRequest) |
| 92 | + .where(database.SystemRequest.status == "running") |
| 93 | + .where(database.SystemRequest.created_at < timestamp) |
| 94 | + ) |
| 95 | + requests = session.scalars(statement).all() |
| 96 | + number_of_requests = len(requests) |
| 97 | + if not skip_confirmation: |
| 98 | + typer.confirm( |
| 99 | + f"Deleting {number_of_requests} requests. Do you want to continue?", |
| 100 | + abort=False, |
| 101 | + default=True, |
| 102 | + ) |
| 103 | + else: |
| 104 | + database.logger.info(f"Deleting {number_of_requests} requests.") |
| 105 | + for request in requests: |
| 106 | + database.logger.info(f"deleting {request.request_id}...") |
| 107 | + database.set_request_status(request, "dismissed", session=session) |
| 108 | + |
| 109 | + session.commit() |
| 110 | + |
| 111 | + database.logger.info( |
| 112 | + f"{number_of_requests} requests successfully removed from the broker database." |
| 113 | + ) |
| 114 | + |
| 115 | + |
66 | 116 | @app.command()
|
67 | 117 | def info(connection_string: Optional[str] = None) -> None:
|
68 | 118 | """Test connection to the database located at URI `connection_string`.
|
|
0 commit comments