Skip to content

Commit c8ac301

Browse files
authored
Merge pull request #149 from ecmwf-projects/COPDS-2359-request-cleaner
request cleaner with bulk size for system requests
2 parents d9ad9a1 + e9f975a commit c8ac301

File tree

2 files changed

+74
-12
lines changed

2 files changed

+74
-12
lines changed

cads_broker/entry_points.py

+26-11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import enum
55
import os
66
import random
7+
import time
78
import uuid
89
from pathlib import Path
910
from typing import Any, Optional
@@ -65,7 +66,10 @@ def add_dummy_requests(
6566

6667
@app.command()
6768
def requests_cleaner(
68-
connection_string: Optional[str] = None, older_than_days: Optional[int] = 365
69+
connection_string: Optional[str] = None,
70+
older_than_days: Optional[int] = 365,
71+
delete_bulk_size: Optional[int] = None,
72+
delete_sleep_time: Optional[int] = None,
6973
) -> None:
7074
"""Remove records from the system_requests table older than `older_than_days`."""
7175
if not connection_string:
@@ -74,18 +78,29 @@ def requests_cleaner(
7478
engine = sa.create_engine(connection_string)
7579
time_delta = datetime.datetime.now() - datetime.timedelta(days=older_than_days)
7680
# clean system requests and (via cascading delete) events
77-
with engine.begin() as conn:
78-
database.logger.info("deleting old system_requests and events...")
81+
database.logger.info("deleting old system_requests and events...")
82+
curr_deleted = 1
83+
subquery = sa.select(database.SystemRequest.request_uid).where(
84+
database.SystemRequest.created_at <= time_delta
85+
)
86+
with engine.connect() as conn:
87+
if delete_bulk_size is not None:
88+
# delete in sized bulks to give time to db replicas for synch
89+
subquery = subquery.limit(delete_bulk_size)
7990
stmt = sa.delete(database.SystemRequest).where(
80-
database.SystemRequest.created_at <= time_delta
81-
)
82-
result = conn.execute(stmt)
83-
conn.commit()
84-
num_requests_deleted = result.rowcount
85-
database.logger.info(
86-
f"{num_requests_deleted} old system requests "
87-
f"successfully removed from the broker database."
91+
database.SystemRequest.request_uid.in_(subquery)
8892
)
93+
while curr_deleted:
94+
with conn.begin():
95+
result = conn.execute(stmt)
96+
conn.commit()
97+
curr_deleted = result.rowcount
98+
database.logger.info(
99+
f"{curr_deleted} old system requests "
100+
f"successfully removed from the broker database."
101+
)
102+
if delete_sleep_time is not None:
103+
time.sleep(delete_sleep_time)
89104
# clean adaptor_properties
90105
with engine.begin() as conn:
91106
try:

tests/test_90_entry_points.py

+48-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import datetime
2+
import json
3+
import logging
24
import uuid
35
from typing import Any
46

57
import cacholote
8+
import pytest
69
import sqlalchemy as sa
710
from psycopg import Connection
811
from typer.testing import CliRunner
@@ -140,7 +143,9 @@ def prepare_db(
140143
# not existing case of recent properties with old requests
141144

142145

143-
def test_requests_cleaner(session_obj: sa.orm.sessionmaker):
146+
def test_requests_cleaner(
147+
session_obj: sa.orm.sessionmaker, caplog: pytest.LogCaptureFixture
148+
):
144149
connection_string = session_obj.kw["bind"].url
145150

146151
# test remove nothing, older_than_days=365 by default
@@ -200,3 +205,45 @@ def test_requests_cleaner(session_obj: sa.orm.sessionmaker):
200205
assert len(all_requests) == 5
201206
assert len(all_events) == 5
202207
assert len(all_props) == 5
208+
209+
# test remove 10 requests in bulk size of 3 (all old props have old requests)
210+
prepare_db(
211+
session_obj,
212+
num_old_props_used_by_old=10,
213+
num_recent_props_used_by_recent=5,
214+
)
215+
caplog.clear()
216+
caplog.set_level(logging.INFO)
217+
result = runner.invoke(
218+
entry_points.app,
219+
[
220+
"requests-cleaner",
221+
"--connection-string",
222+
connection_string,
223+
"--delete-bulk-size",
224+
3,
225+
"--delete-sleep-time",
226+
1,
227+
],
228+
)
229+
assert result.exit_code == 0
230+
with session_obj() as session:
231+
all_requests = session.query(database.SystemRequest).all()
232+
all_events = session.query(database.Events).all()
233+
all_props = session.query(database.AdaptorProperties).all()
234+
assert len(all_requests) == 5
235+
assert len(all_events) == 5
236+
assert len(all_props) == 5
237+
with caplog.at_level(logging.ERROR):
238+
log_msgs = [json.loads(r.msg)["event"] for r in caplog.records]
239+
assert log_msgs == [
240+
"deleting old system_requests and events...",
241+
"3 old system requests successfully removed from the broker database.",
242+
"3 old system requests successfully removed from the broker database.",
243+
"3 old system requests successfully removed from the broker database.",
244+
"1 old system requests successfully removed from the broker database.",
245+
"0 old system requests successfully removed from the broker database.",
246+
"deleting old adaptor_properties...",
247+
"10 old adaptor properties successfully removed from the broker database.",
248+
]
249+
caplog.clear()

0 commit comments

Comments
 (0)