Skip to content

Commit 0df66ea

Browse files
committed
use creation time as reference for remove_old_requests
1 parent 7eab9e2 commit 0df66ea

File tree

4 files changed

+72
-112
lines changed

4 files changed

+72
-112
lines changed

cads_broker/database.py

+20-29
Original file line numberDiff line numberDiff line change
@@ -505,27 +505,24 @@ def generate_adaptor_properties_hash(
505505
).hexdigest()
506506

507507

508-
def get_adaptor_properties(
509-
adaptor_properties_hash: str,
510-
session: sa.orm.Session,
511-
) -> AdaptorProperties | None:
512-
try:
513-
statement = sa.select(AdaptorProperties.hash).where(
514-
AdaptorProperties.hash == adaptor_properties_hash
515-
)
516-
return session.execute(statement).one()
517-
except sqlalchemy.orm.exc.NoResultFound:
518-
return None
519-
520-
521-
def add_adaptor_properties(
508+
def ensure_adaptor_properties(
522509
hash: str,
523510
config: dict[str, Any],
524511
form: dict[str, Any],
525512
session: sa.orm.Session,
526-
):
527-
adaptor_properties = AdaptorProperties(hash=hash, config=config, form=form)
528-
session.add(adaptor_properties)
513+
) -> None:
514+
"""Create adaptor properties (if not exists) or update its timestamp."""
515+
try:
516+
statement = (
517+
AdaptorProperties.__table__.update()
518+
.returning(AdaptorProperties.hash)
519+
.where(AdaptorProperties.__table__.c.hash == hash)
520+
.values(timestamp=datetime.datetime.now())
521+
)
522+
session.execute(statement).one()
523+
except sqlalchemy.orm.exc.NoResultFound:
524+
adaptor_properties = AdaptorProperties(hash=hash, config=config, form=form)
525+
session.add(adaptor_properties)
529526

530527

531528
def add_event(
@@ -557,18 +554,12 @@ def create_request(
557554
request_uid: str | None = None,
558555
) -> dict[str, Any]:
559556
"""Create a request."""
560-
if (
561-
get_adaptor_properties(
562-
adaptor_properties_hash=adaptor_properties_hash, session=session
563-
)
564-
is None
565-
):
566-
add_adaptor_properties(
567-
hash=adaptor_properties_hash,
568-
config=adaptor_config,
569-
form=adaptor_form,
570-
session=session,
571-
)
557+
ensure_adaptor_properties(
558+
hash=adaptor_properties_hash,
559+
config=adaptor_config,
560+
form=adaptor_form,
561+
session=session,
562+
)
572563
metadata["resources"] = resources
573564
metadata["qos_tags"] = qos_tags
574565
request = SystemRequest(

cads_broker/entry_points.py

+13-30
Original file line numberDiff line numberDiff line change
@@ -26,57 +26,40 @@ def remove_old_requests(
2626
dbsettings = config.ensure_settings(config.dbsettings)
2727
connection_string = dbsettings.connection_string
2828
engine = sa.create_engine(connection_string)
29+
time_delta = datetime.datetime.now() - datetime.timedelta(days=older_than_days)
2930
# clean system requests and (via cascading delete) events
3031
with engine.begin() as conn:
3132
database.logger.info("deleting old system_requests and events...")
3233
stmt = sa.delete(database.SystemRequest).where(
33-
database.SystemRequest.finished_at
34-
<= (sa.func.now() - datetime.timedelta(days=older_than_days))
34+
database.SystemRequest.created_at <= time_delta
3535
)
3636
result = conn.execute(stmt)
3737
conn.commit()
3838
num_requests_deleted = result.rowcount
39+
database.logger.info(
40+
f"{num_requests_deleted} old system requests "
41+
f"successfully removed from the broker database."
42+
)
3943
# clean adaptor_properties
4044
with engine.begin() as conn:
4145
try:
42-
database.logger.info(
43-
"deleting old adaptor_properties (trying in a block)..."
44-
)
46+
database.logger.info("deleting old adaptor_properties...")
4547
stmt_ap_delete = sa.delete(database.AdaptorProperties).where(
46-
database.AdaptorProperties.timestamp
47-
<= (sa.func.now() - datetime.timedelta(days=older_than_days))
48+
database.AdaptorProperties.timestamp <= time_delta
4849
)
4950
result = conn.execute(stmt_ap_delete)
5051
conn.commit()
5152
num_ap_deleted = result.rowcount
5253
database.logger.info(
53-
f"{num_requests_deleted + num_ap_deleted} old records "
54+
f"{num_ap_deleted} old adaptor properties "
5455
f"successfully removed from the broker database."
5556
)
5657
return
5758
except sa.exc.IntegrityError:
58-
# some requests still use some old adaptor_properties: do not return and continue
59-
pass
60-
database.logger.info("deleting old adaptor_properties...")
61-
num_ap_deleted = 0
62-
session_obj = sa.orm.sessionmaker(engine)
63-
with session_obj.begin() as session:
64-
stmt = sa.select(database.AdaptorProperties).where(
65-
database.AdaptorProperties.timestamp
66-
<= (sa.func.now() - datetime.timedelta(days=older_than_days))
67-
)
68-
for record in session.scalars(stmt):
69-
try:
70-
with session.begin_nested():
71-
session.delete(record)
72-
num_ap_deleted += 1
73-
except sa.exc.IntegrityError:
74-
pass
75-
database.logger.info(
76-
f"{num_requests_deleted + num_ap_deleted} old records "
77-
f"successfully removed from the broker database."
78-
)
79-
return
59+
database.logger.error(
60+
"cannot remove some old records from table adaptor_properties."
61+
)
62+
raise
8063

8164

8265
@app.command()

tests/test_02_database.py

+25
Original file line numberDiff line numberDiff line change
@@ -784,8 +784,33 @@ def test_create_request(session_obj: sa.orm.sessionmaker) -> None:
784784
db.SystemRequest.request_uid == request_dict["request_uid"]
785785
)
786786
request = session.scalars(statement).one()
787+
adaptor_properties = request.adaptor_properties
788+
assert adaptor_properties.hash == "adaptor_properties_hash"
789+
initial_timestamp = adaptor_properties.timestamp
787790
assert request.request_uid == request_dict["request_uid"]
788791
assert request.user_uid == request_dict["user_uid"]
792+
# create again a new request using the same adaptor properties: timestamp updated
793+
with session_obj() as session:
794+
request_dict = db.create_request(
795+
user_uid="abc456",
796+
setup_code="",
797+
entry_point="sum",
798+
request={},
799+
metadata={},
800+
process_id="submit-workflow",
801+
session=session,
802+
portal="c3s",
803+
adaptor_config={"dummy_config": {"foo": "bar"}},
804+
adaptor_form={},
805+
adaptor_properties_hash="adaptor_properties_hash",
806+
)
807+
statement = sa.select(db.SystemRequest).where(
808+
db.SystemRequest.request_uid == request_dict["request_uid"]
809+
)
810+
request = session.scalars(statement).one()
811+
adaptor_properties = request.adaptor_properties
812+
assert adaptor_properties.hash == "adaptor_properties_hash"
813+
adaptor_properties.timestamp > initial_timestamp
789814

790815

791816
def test_get_request(session_obj: sa.orm.sessionmaker) -> None:

tests/test_90_entry_points.py

+14-53
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,12 @@ def mock_config(
4848
form: dict[str, Any] = {},
4949
timestamp: datetime.datetime | None = None,
5050
):
51-
if timestamp is None:
52-
adaptor_properties = database.AdaptorProperties(
53-
hash=hash,
54-
config=config,
55-
form=form,
56-
)
57-
else:
58-
adaptor_properties = database.AdaptorProperties(
59-
hash=hash,
60-
config=config,
61-
form=form,
62-
timestamp=timestamp,
63-
)
51+
adaptor_properties = database.AdaptorProperties(
52+
hash=hash,
53+
config=config,
54+
form=form,
55+
timestamp=timestamp,
56+
)
6457
return adaptor_properties
6558

6659

@@ -102,14 +95,14 @@ def test_init_db(postgresql: Connection[str], mocker) -> None:
10295
def prepare_db(
10396
session_obj,
10497
num_old_props_used_by_old,
105-
num_old_props_used_by_recent,
10698
num_recent_props_used_by_recent,
10799
recent_days=3,
108100
old_days=370,
109101
):
110102
now = sa.func.now()
111103
old_date = now - datetime.timedelta(days=old_days)
112104
recent_date = now - datetime.timedelta(days=recent_days)
105+
113106
with session_obj() as session:
114107
# initialize
115108
session.query(database.SystemRequest).delete()
@@ -121,35 +114,25 @@ def prepare_db(
121114
adaptor_properties = mock_config(hash=old_hash, timestamp=old_date)
122115
session.add(adaptor_properties)
123116
request = mock_system_request(
124-
adaptor_properties_hash=old_hash, finished_at=old_date
125-
)
126-
session.add(request)
127-
event = database.Events(request_uid=request.request_uid)
128-
session.add(event)
129-
# add old properties with recent requests
130-
for x in range(num_old_props_used_by_recent):
131-
old_hash2 = f"old_hash_{x}_with_new_request"
132-
adaptor_properties = mock_config(hash=old_hash2, timestamp=old_date)
133-
session.add(adaptor_properties)
134-
request = mock_system_request(
135-
adaptor_properties_hash=old_hash2, finished_at=recent_date
117+
adaptor_properties_hash=old_hash, created_at=old_date
136118
)
137119
session.add(request)
138120
event = database.Events(request_uid=request.request_uid)
139121
session.add(event)
122+
session.commit()
140123
# add recent properties with recent requests
141124
for x in range(num_recent_props_used_by_recent):
142125
new_hash = f"new_hash_{x}"
143126
adaptor_properties = mock_config(hash=new_hash, timestamp=recent_date)
144127
session.add(adaptor_properties)
145128
request = mock_system_request(
146-
adaptor_properties_hash=new_hash, finished_at=recent_date
129+
adaptor_properties_hash=new_hash, created_at=recent_date
147130
)
148131
session.add(request)
149132
event = database.Events(request_uid=request.request_uid)
150133
session.add(event)
134+
session.commit()
151135
# not existing case of recent properties with old requests
152-
session.commit()
153136

154137

155138
def test_remove_old_records(session_obj: sa.orm.sessionmaker):
@@ -159,7 +142,6 @@ def test_remove_old_records(session_obj: sa.orm.sessionmaker):
159142
prepare_db(
160143
session_obj,
161144
num_old_props_used_by_old=0,
162-
num_old_props_used_by_recent=6,
163145
num_recent_props_used_by_recent=5,
164146
)
165147
result = runner.invoke(
@@ -171,9 +153,9 @@ def test_remove_old_records(session_obj: sa.orm.sessionmaker):
171153
all_requests = session.query(database.SystemRequest).all()
172154
all_events = session.query(database.Events).all()
173155
all_props = session.query(database.AdaptorProperties).all()
174-
assert len(all_requests) == 11
175-
assert len(all_events) == 11
176-
assert len(all_props) == 11
156+
assert len(all_requests) == 5
157+
assert len(all_events) == 5
158+
assert len(all_props) == 5
177159

178160
# test remove all (most recent is 3 days old)
179161
result = runner.invoke(
@@ -199,7 +181,6 @@ def test_remove_old_records(session_obj: sa.orm.sessionmaker):
199181
prepare_db(
200182
session_obj,
201183
num_old_props_used_by_old=10,
202-
num_old_props_used_by_recent=0,
203184
num_recent_props_used_by_recent=5,
204185
)
205186
result = runner.invoke(
@@ -214,23 +195,3 @@ def test_remove_old_records(session_obj: sa.orm.sessionmaker):
214195
assert len(all_requests) == 5
215196
assert len(all_events) == 5
216197
assert len(all_props) == 5
217-
218-
# test remove only some requests (some old props have recent requests)
219-
prepare_db(
220-
session_obj,
221-
num_old_props_used_by_old=10,
222-
num_old_props_used_by_recent=3,
223-
num_recent_props_used_by_recent=5,
224-
)
225-
result = runner.invoke(
226-
entry_points.app,
227-
["remove-old-requests", "--connection-string", connection_string],
228-
)
229-
assert result.exit_code == 0
230-
with session_obj() as session:
231-
all_requests = session.query(database.SystemRequest).all()
232-
all_events = session.query(database.Events).all()
233-
all_props = session.query(database.AdaptorProperties).all()
234-
assert len(all_requests) == 8
235-
assert len(all_events) == 8
236-
assert len(all_props) == 8

0 commit comments

Comments
 (0)