Skip to content

Commit 4a20af2

Browse files
committed
infer disk usage from database
1 parent 3a3f023 commit 4a20af2

File tree

3 files changed

+94
-78
lines changed

3 files changed

+94
-78
lines changed

cacholote/cache.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
9797
return _decode_and_update(session, cache_entry, settings)
9898
except decode.DecodeError as ex:
9999
warnings.warn(str(ex), UserWarning)
100-
clean._delete_cache_entry(session, cache_entry)
100+
clean._delete_cache_entries(session, cache_entry)
101101

102102
result = func(*args, **kwargs)
103103
cache_entry = database.CacheEntry(

cacholote/clean.py

Lines changed: 89 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import posixpath
2121
from typing import Any, Callable, Literal, Optional
2222

23+
import fsspec
2324
import pydantic
2425
import sqlalchemy as sa
2526
import sqlalchemy.orm
@@ -35,7 +36,9 @@
3536
)
3637

3738

38-
def _get_files_from_cache_entry(cache_entry: database.CacheEntry) -> dict[str, str]:
39+
def _get_files_from_cache_entry(
40+
cache_entry: database.CacheEntry, key: str = "file:size"
41+
) -> dict[str, Any]:
3942
result = cache_entry.result
4043
if not isinstance(result, (list, tuple, set)):
4144
result = [result]
@@ -48,27 +51,54 @@ def _get_files_from_cache_entry(cache_entry: database.CacheEntry) -> dict[str, s
4851
and obj["callable"] in FILE_RESULT_CALLABLES
4952
):
5053
fs, urlpath = extra_encoders._get_fs_and_urlpath(*obj["args"][:2])
51-
files[fs.unstrip_protocol(urlpath)] = obj["args"][0]["type"]
54+
files[fs.unstrip_protocol(urlpath)] = obj["args"][0][key]
5255
return files
5356

5457

55-
def _delete_cache_entry(
56-
session: sa.orm.Session, cache_entry: database.CacheEntry
58+
def _remove_files(
59+
fs: fsspec.AbstractFileSystem,
60+
files: list[str],
61+
max_tries: int = 10,
62+
**kwargs: Any,
5763
) -> None:
58-
fs, _ = utils.get_cache_files_fs_dirname()
59-
files_to_delete = _get_files_from_cache_entry(cache_entry)
60-
logger = config.get().logger
64+
assert max_tries >= 1
65+
if not files:
66+
return
67+
68+
config.get().logger.info("deleting files", n_files_to_delete=len(files), **kwargs)
69+
70+
n_tries = 0
71+
while files:
72+
n_tries += 1
73+
try:
74+
fs.rm(files, **kwargs)
75+
return
76+
except FileNotFoundError:
77+
# Another concurrent process might have deleted files
78+
if n_tries >= max_tries:
79+
raise
80+
files = [file for file in files if fs.exists(file)]
6181

62-
# First, delete database entry
63-
logger.info("deleting cache entry", cache_entry=cache_entry)
64-
session.delete(cache_entry)
82+
83+
def _delete_cache_entries(
84+
session: sa.orm.Session, *cache_entries: database.CacheEntry
85+
) -> None:
86+
fs, _ = utils.get_cache_files_fs_dirname()
87+
files_to_delete = []
88+
dirs_to_delete = []
89+
for cache_entry in cache_entries:
90+
session.delete(cache_entry)
91+
92+
files = _get_files_from_cache_entry(cache_entry, key="type")
93+
for file, file_type in files.items():
94+
if file_type == "application/vnd+zarr":
95+
dirs_to_delete.append(file)
96+
else:
97+
files_to_delete.append(file)
6598
database._commit_or_rollback(session)
6699

67-
# Then, delete files
68-
for urlpath, file_type in files_to_delete.items():
69-
if fs.exists(urlpath):
70-
logger.info("deleting cache file", urlpath=urlpath)
71-
fs.rm(urlpath, recursive=file_type == "application/vnd+zarr")
100+
_remove_files(fs, files_to_delete, recursive=False)
101+
_remove_files(fs, dirs_to_delete, recursive=True)
72102

73103

74104
def delete(func_to_del: str | Callable[..., Any], *args: Any, **kwargs: Any) -> None:
@@ -88,19 +118,24 @@ def delete(func_to_del: str | Callable[..., Any], *args: Any, **kwargs: Any) ->
88118
for cache_entry in session.scalars(
89119
sa.select(database.CacheEntry).filter(database.CacheEntry.key == hexdigest)
90120
):
91-
_delete_cache_entry(session, cache_entry)
121+
_delete_cache_entries(session, cache_entry)
92122

93123

94124
class _Cleaner:
95-
def __init__(self, depth: int) -> None:
125+
def __init__(self, depth: int, use_database: bool) -> None:
96126
self.logger = config.get().logger
97127
self.fs, self.dirname = utils.get_cache_files_fs_dirname()
98128

99129
self.urldir = self.fs.unstrip_protocol(self.dirname)
100130

101131
self.logger.info("getting disk usage")
102132
self.file_sizes: dict[str, int] = collections.defaultdict(int)
103-
for path, size in self.fs.du(self.dirname, total=False).items():
133+
du = (
134+
self.get_known_files()
135+
if use_database
136+
else self.fs.du(self.dirname, total=False)
137+
)
138+
for path, size in du.items():
104139
# Group dirs
105140
urlpath = self.fs.unstrip_protocol(path)
106141
parts = urlpath.replace(self.urldir, "", 1).strip("/").split("/")
@@ -120,6 +155,15 @@ def log_disk_usage(self) -> None:
120155
def stop_cleaning(self, maxsize: int) -> bool:
121156
return self.disk_usage <= maxsize
122157

158+
def get_known_files(self) -> dict[str, int]:
159+
known_files: dict[str, int] = {}
160+
with config.get().instantiated_sessionmaker() as session:
161+
for cache_entry in session.scalars(sa.select(database.CacheEntry)):
162+
known_files.update(
163+
_get_files_from_cache_entry(cache_entry, key="file:size")
164+
)
165+
return known_files
166+
123167
def get_unknown_files(self, lock_validity_period: float | None) -> set[str]:
124168
self.logger.info("getting unknown files")
125169

@@ -152,10 +196,7 @@ def delete_unknown_files(
152196
unknown_files = self.get_unknown_files(lock_validity_period)
153197
for urlpath in unknown_files:
154198
self.pop_file_size(urlpath)
155-
self.remove_files(
156-
list(unknown_files),
157-
recursive=recursive,
158-
)
199+
_remove_files(self.fs, list(unknown_files), recursive=recursive)
159200
self.log_disk_usage()
160201

161202
@staticmethod
@@ -207,30 +248,6 @@ def _get_method_sorters(
207248
sorters.append(database.CacheEntry.expiration)
208249
return sorters
209250

210-
def remove_files(
211-
self,
212-
files: list[str],
213-
max_tries: int = 10,
214-
**kwargs: Any,
215-
) -> None:
216-
assert max_tries >= 1
217-
if not files:
218-
return
219-
220-
self.logger.info("deleting files", n_files_to_delete=len(files), **kwargs)
221-
222-
n_tries = 0
223-
while files:
224-
n_tries += 1
225-
try:
226-
self.fs.rm(files, **kwargs)
227-
return
228-
except FileNotFoundError:
229-
# Another concurrent process might have deleted files
230-
if n_tries >= max_tries:
231-
raise
232-
files = [file for file in files if self.fs.exists(file)]
233-
234251
def delete_cache_files(
235252
self,
236253
maxsize: int,
@@ -244,37 +261,27 @@ def delete_cache_files(
244261
if self.stop_cleaning(maxsize):
245262
return
246263

247-
files_to_delete = []
248-
dirs_to_delete = []
264+
entries_to_delete = []
249265
self.logger.info("getting cache entries to delete")
250-
n_entries_to_delete = 0
251266
with config.get().instantiated_sessionmaker() as session:
252267
for cache_entry in session.scalars(
253268
sa.select(database.CacheEntry).filter(*filters).order_by(*sorters)
254269
):
255270
files = _get_files_from_cache_entry(cache_entry)
256271
if any(file.startswith(self.urldir) for file in files):
257-
n_entries_to_delete += 1
258-
session.delete(cache_entry)
259-
260-
for file, file_type in files.items():
272+
entries_to_delete.append(cache_entry)
273+
for file in files:
261274
self.pop_file_size(file)
262-
if file_type == "application/vnd+zarr":
263-
dirs_to_delete.append(file)
264-
else:
265-
files_to_delete.append(file)
266275

267276
if self.stop_cleaning(maxsize):
268277
break
269278

270-
if n_entries_to_delete:
279+
if entries_to_delete:
271280
self.logger.info(
272-
"deleting cache entries", n_entries_to_delete=n_entries_to_delete
281+
"deleting cache entries", n_entries_to_delete=len(entries_to_delete)
273282
)
274-
database._commit_or_rollback(session)
283+
_delete_cache_entries(session, *entries_to_delete)
275284

276-
self.remove_files(files_to_delete, recursive=False)
277-
self.remove_files(dirs_to_delete, recursive=True)
278285
self.log_disk_usage()
279286

280287
if not self.stop_cleaning(maxsize):
@@ -296,6 +303,7 @@ def clean_cache_files(
296303
tags_to_clean: list[str | None] | None = None,
297304
tags_to_keep: list[str | None] | None = None,
298305
depth: int = 1,
306+
use_database: bool = False,
299307
) -> None:
300308
"""Clean cache files.
301309
@@ -318,8 +326,15 @@ def clean_cache_files(
318326
tags_to_clean and tags_to_keep are mutually exclusive.
319327
depth: int, default: 1
320328
depth for grouping cache files
329+
use_database: bool, default: False
330+
Whether to infer disk usage from the cacholote database
321331
"""
322-
cleaner = _Cleaner(depth=depth)
332+
if use_database and delete_unknown_files:
333+
raise ValueError(
334+
"'use_database' and 'delete_unknown_files' are mutually exclusive"
335+
)
336+
337+
cleaner = _Cleaner(depth=depth, use_database=use_database)
323338

324339
if delete_unknown_files:
325340
cleaner.delete_unknown_files(lock_validity_period, recursive)
@@ -352,15 +367,15 @@ def clean_invalid_cache_entries(
352367
for cache_entry in session.scalars(
353368
sa.select(database.CacheEntry).filter(*filters)
354369
):
355-
_delete_cache_entry(session, cache_entry)
370+
_delete_cache_entries(session, cache_entry)
356371

357372
if try_decode:
358373
with config.get().instantiated_sessionmaker() as session:
359374
for cache_entry in session.scalars(sa.select(database.CacheEntry)):
360375
try:
361376
decode.loads(cache_entry._result_as_string)
362377
except decode.DecodeError:
363-
_delete_cache_entry(session, cache_entry)
378+
_delete_cache_entries(session, cache_entry)
364379

365380

366381
def expire_cache_entries(
@@ -379,15 +394,14 @@ def expire_cache_entries(
379394
if after is not None:
380395
filters.append(database.CacheEntry.created_at > after)
381396

382-
count = 0
383397
with config.get().instantiated_sessionmaker() as session:
384-
for cache_entry in session.scalars(
385-
sa.select(database.CacheEntry).filter(*filters)
386-
):
387-
count += 1
388-
if delete:
389-
session.delete(cache_entry)
390-
else:
398+
cache_entries = list(
399+
session.scalars(sa.select(database.CacheEntry).filter(*filters))
400+
)
401+
if delete:
402+
_delete_cache_entries(session, *cache_entries)
403+
else:
404+
for cache_entry in cache_entries:
391405
cache_entry.expiration = now
392-
database._commit_or_rollback(session)
393-
return count
406+
database._commit_or_rollback(session)
407+
return len(cache_entries)

tests/test_60_clean.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@ def cached_now() -> datetime.datetime:
4141
@pytest.mark.parametrize("method", ["LRU", "LFU"])
4242
@pytest.mark.parametrize("set_cache", ["file", "cads"], indirect=True)
4343
@pytest.mark.parametrize("folder,depth", [("", 1), ("", 2), ("foo", 2)])
44+
@pytest.mark.parametrize("use_database", [True, False])
4445
def test_clean_cache_files(
4546
tmp_path: pathlib.Path,
4647
set_cache: str,
4748
method: Literal["LRU", "LFU"],
4849
folder: str,
4950
depth: int,
51+
use_database: bool,
5052
) -> None:
5153
con = config.get().engine.raw_connection()
5254
cur = con.cursor()
@@ -66,12 +68,12 @@ def test_clean_cache_files(
6668
assert set(fs.ls(dirname)) == {lru_path, lfu_path}
6769

6870
# Do not clean
69-
clean.clean_cache_files(2, method=method, depth=depth)
71+
clean.clean_cache_files(2, method=method, depth=depth, use_database=use_database)
7072
cur.execute("SELECT COUNT(*) FROM cache_entries", ())
7173
assert cur.fetchone() == (fs.du(dirname),) == (2,)
7274

7375
# Delete one file
74-
clean.clean_cache_files(1, method=method, depth=depth)
76+
clean.clean_cache_files(1, method=method, depth=depth, use_database=use_database)
7577
cur.execute("SELECT COUNT(*) FROM cache_entries", ())
7678
assert cur.fetchone() == (fs.du(dirname),) == (1,)
7779
assert not fs.exists(lru_path if method == "LRU" else lfu_path)

0 commit comments

Comments
 (0)