Skip to content

Commit 6725fa3

Browse files
authored
Merge pull request #99 from ecmwf-projects/COPDS-1489-old-requests
COPDS-1489: remove old requests
2 parents 8f34be8 + 0df66ea commit 6725fa3

File tree

6 files changed

+313
-39
lines changed

6 files changed

+313
-39
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
"""cascading delete on events.
2+
3+
Revision ID: 6ee20703d353
4+
Revises: 8924bc485ad5
5+
Create Date: 2024-03-28 12:07:05.247016
6+
7+
"""
8+
import datetime
9+
10+
import sqlalchemy as sa
11+
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "6ee20703d353"
16+
down_revision = "8924bc485ad5"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade() -> None:
22+
op.drop_column("system_requests", "request_id")
23+
op.create_primary_key("system_requests_pkey", "system_requests", ["request_uid"])
24+
op.drop_constraint("events_request_uid_fkey", "events")
25+
op.drop_index("ix_system_requests_request_uid", "system_requests_pkey")
26+
op.create_foreign_key(
27+
"events_request_uid_fkey",
28+
"events",
29+
"system_requests",
30+
["request_uid"],
31+
["request_uid"],
32+
ondelete="CASCADE",
33+
)
34+
op.add_column(
35+
"adaptor_properties",
36+
sa.Column("timestamp", sa.TIMESTAMP, default=sa.func.now()),
37+
)
38+
now_str = datetime.datetime.now().isoformat()
39+
op.execute(
40+
f"update adaptor_properties set timestamp='{now_str}' where timestamp is null"
41+
)
42+
43+
44+
def downgrade() -> None:
45+
# do only on empty table
46+
op.drop_constraint("events_request_uid_fkey", "events")
47+
op.drop_constraint("system_requests_pkey", "system_requests")
48+
op.add_column(
49+
"system_requests", sa.Column("request_id", sa.Integer, primary_key=True)
50+
)
51+
op.create_foreign_key(
52+
"events_request_uid_fkey",
53+
"events",
54+
"system_requests",
55+
["request_uid"],
56+
["request_uid"],
57+
)

cads_broker/database.py

+27-37
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class Events(BaseModel):
4646
event_type = sa.Column(sa.Text, index=True)
4747
request_uid = sa.Column(
4848
sa.dialects.postgresql.UUID(False),
49-
sa.ForeignKey("system_requests.request_uid"),
49+
sa.ForeignKey("system_requests.request_uid", ondelete="CASCADE"),
5050
index=True,
5151
)
5252
message = sa.Column(sa.Text)
@@ -61,19 +61,15 @@ class AdaptorProperties(BaseModel):
6161
hash = sa.Column(sa.Text, primary_key=True)
6262
config = sa.Column(JSONB)
6363
form = sa.Column(JSONB)
64+
timestamp = sa.Column(sa.TIMESTAMP, default=sa.func.now())
6465

6566

6667
class SystemRequest(BaseModel):
6768
"""System Request ORM model."""
6869

6970
__tablename__ = "system_requests"
7071

71-
request_id = sa.Column(sa.Integer, primary_key=True)
72-
request_uid = sa.Column(
73-
sa.dialects.postgresql.UUID(False),
74-
index=True,
75-
unique=True,
76-
)
72+
request_uid = sa.Column(sa.dialects.postgresql.UUID(False), primary_key=True)
7773
process_id = sa.Column(sa.Text, index=True)
7874
user_uid = sa.Column(sa.Text, index=True)
7975
status = sa.Column(status_enum)
@@ -102,11 +98,13 @@ class SystemRequest(BaseModel):
10298
),
10399
{},
104100
)
101+
# https://github.com/sqlalchemy/sqlalchemy/issues/11063#issuecomment-2008101926
102+
__mapper_args__ = {"eager_defaults": False}
105103

106104
# joined is temporary
107105
cache_entry = sa.orm.relationship(cacholote.database.CacheEntry, lazy="joined")
108106
adaptor_properties = sa.orm.relationship(AdaptorProperties, lazy="select")
109-
events = sa.orm.relationship(Events, lazy="select")
107+
events = sa.orm.relationship(Events, lazy="select", passive_deletes=True)
110108

111109
@property
112110
def age(self):
@@ -515,27 +513,24 @@ def generate_adaptor_properties_hash(
515513
).hexdigest()
516514

517515

518-
def get_adaptor_properties(
519-
adaptor_properties_hash: str,
520-
session: sa.orm.Session,
521-
) -> AdaptorProperties | None:
522-
try:
523-
statement = sa.select(AdaptorProperties.hash).where(
524-
AdaptorProperties.hash == adaptor_properties_hash
525-
)
526-
return session.execute(statement).one()
527-
except sqlalchemy.orm.exc.NoResultFound:
528-
return None
529-
530-
531-
def add_adaptor_properties(
516+
def ensure_adaptor_properties(
532517
hash: str,
533518
config: dict[str, Any],
534519
form: dict[str, Any],
535520
session: sa.orm.Session,
536-
):
537-
adaptor_properties = AdaptorProperties(hash=hash, config=config, form=form)
538-
session.add(adaptor_properties)
521+
) -> None:
522+
"""Create adaptor properties (if not exists) or update its timestamp."""
523+
try:
524+
statement = (
525+
AdaptorProperties.__table__.update()
526+
.returning(AdaptorProperties.hash)
527+
.where(AdaptorProperties.__table__.c.hash == hash)
528+
.values(timestamp=datetime.datetime.now())
529+
)
530+
session.execute(statement).one()
531+
except sqlalchemy.orm.exc.NoResultFound:
532+
adaptor_properties = AdaptorProperties(hash=hash, config=config, form=form)
533+
session.add(adaptor_properties)
539534

540535

541536
def add_event(
@@ -567,18 +562,12 @@ def create_request(
567562
request_uid: str | None = None,
568563
) -> dict[str, Any]:
569564
"""Create a request."""
570-
if (
571-
get_adaptor_properties(
572-
adaptor_properties_hash=adaptor_properties_hash, session=session
573-
)
574-
is None
575-
):
576-
add_adaptor_properties(
577-
hash=adaptor_properties_hash,
578-
config=adaptor_config,
579-
form=adaptor_form,
580-
session=session,
581-
)
565+
ensure_adaptor_properties(
566+
hash=adaptor_properties_hash,
567+
config=adaptor_config,
568+
form=adaptor_form,
569+
session=session,
570+
)
582571
metadata["resources"] = resources
583572
metadata["qos_tags"] = qos_tags
584573
request = SystemRequest(
@@ -678,6 +667,7 @@ def init_database(connection_string: str, force: bool = False) -> sa.engine.Engi
678667
conn = engine.connect()
679668
if "system_requests" not in conn.execute(query).scalars().all():
680669
force = True
670+
conn.close()
681671
if force:
682672
# cleanup and create the schema
683673
BaseModel.metadata.drop_all(engine)

cads_broker/entry_points.py

+52
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Module for entry points."""
2+
import datetime
23
import os
34
from typing import Any, Optional
45

@@ -10,6 +11,57 @@
1011
app = typer.Typer()
1112

1213

14+
@app.command()
15+
def remove_old_requests(
16+
connection_string: Optional[str] = None, older_than_days: Optional[int] = 365
17+
) -> None:
18+
"""Remove records from the system_requests table older than `older_than_days`.
19+
20+
Parameters
21+
----------
22+
connection_string: something like 'postgresql://user:password@netloc:port/dbname'
23+
older_than_days: minimum age (in days) to consider a record to be removed
24+
"""
25+
if not connection_string:
26+
dbsettings = config.ensure_settings(config.dbsettings)
27+
connection_string = dbsettings.connection_string
28+
engine = sa.create_engine(connection_string)
29+
time_delta = datetime.datetime.now() - datetime.timedelta(days=older_than_days)
30+
# clean system requests and (via cascading delete) events
31+
with engine.begin() as conn:
32+
database.logger.info("deleting old system_requests and events...")
33+
stmt = sa.delete(database.SystemRequest).where(
34+
database.SystemRequest.created_at <= time_delta
35+
)
36+
result = conn.execute(stmt)
37+
conn.commit()
38+
num_requests_deleted = result.rowcount
39+
database.logger.info(
40+
f"{num_requests_deleted} old system requests "
41+
f"successfully removed from the broker database."
42+
)
43+
# clean adaptor_properties
44+
with engine.begin() as conn:
45+
try:
46+
database.logger.info("deleting old adaptor_properties...")
47+
stmt_ap_delete = sa.delete(database.AdaptorProperties).where(
48+
database.AdaptorProperties.timestamp <= time_delta
49+
)
50+
result = conn.execute(stmt_ap_delete)
51+
conn.commit()
52+
num_ap_deleted = result.rowcount
53+
database.logger.info(
54+
f"{num_ap_deleted} old adaptor properties "
55+
f"successfully removed from the broker database."
56+
)
57+
return
58+
except sa.exc.IntegrityError:
59+
database.logger.error(
60+
"cannot remove some old records from table adaptor_properties."
61+
)
62+
raise
63+
64+
1365
@app.command()
1466
def info(connection_string: Optional[str] = None) -> None:
1567
"""Test connection to the database located at URI `connection_string`.

tests/test_02_database.py

+25
Original file line numberDiff line numberDiff line change
@@ -784,8 +784,33 @@ def test_create_request(session_obj: sa.orm.sessionmaker) -> None:
784784
db.SystemRequest.request_uid == request_dict["request_uid"]
785785
)
786786
request = session.scalars(statement).one()
787+
adaptor_properties = request.adaptor_properties
788+
assert adaptor_properties.hash == "adaptor_properties_hash"
789+
initial_timestamp = adaptor_properties.timestamp
787790
assert request.request_uid == request_dict["request_uid"]
788791
assert request.user_uid == request_dict["user_uid"]
792+
# create again a new request using the same adaptor properties: timestamp updated
793+
with session_obj() as session:
794+
request_dict = db.create_request(
795+
user_uid="abc456",
796+
setup_code="",
797+
entry_point="sum",
798+
request={},
799+
metadata={},
800+
process_id="submit-workflow",
801+
session=session,
802+
portal="c3s",
803+
adaptor_config={"dummy_config": {"foo": "bar"}},
804+
adaptor_form={},
805+
adaptor_properties_hash="adaptor_properties_hash",
806+
)
807+
statement = sa.select(db.SystemRequest).where(
808+
db.SystemRequest.request_uid == request_dict["request_uid"]
809+
)
810+
request = session.scalars(statement).one()
811+
adaptor_properties = request.adaptor_properties
812+
assert adaptor_properties.hash == "adaptor_properties_hash"
813+
adaptor_properties.timestamp > initial_timestamp
789814

790815

791816
def test_get_request(session_obj: sa.orm.sessionmaker) -> None:

tests/test_20_dispatcher.py

-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import datetime
2-
import random
32
import uuid
43
from typing import Any
54

@@ -32,7 +31,6 @@ def mock_system_request(
3231
adaptor_properties_hash: str = "adaptor_properties_hash",
3332
) -> db.SystemRequest:
3433
system_request = db.SystemRequest(
35-
request_id=random.randrange(1, 100),
3634
request_uid=request_uid or str(uuid.uuid4()),
3735
status=status,
3836
created_at=created_at,

0 commit comments

Comments
 (0)