Skip to content

Commit 7fa5a4b

Browse files
authored
allow getting disk usage from database (#131)
1 parent 3a3f023 commit 7fa5a4b

File tree

3 files changed

+96
-87
lines changed

3 files changed

+96
-87
lines changed

cacholote/cache.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
9797
return _decode_and_update(session, cache_entry, settings)
9898
except decode.DecodeError as ex:
9999
warnings.warn(str(ex), UserWarning)
100-
clean._delete_cache_entry(session, cache_entry)
100+
clean._delete_cache_entries(session, cache_entry)
101101

102102
result = func(*args, **kwargs)
103103
cache_entry = database.CacheEntry(

cacholote/clean.py

Lines changed: 91 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import posixpath
2121
from typing import Any, Callable, Literal, Optional
2222

23+
import fsspec
2324
import pydantic
2425
import sqlalchemy as sa
2526
import sqlalchemy.orm
@@ -35,7 +36,9 @@
3536
)
3637

3738

38-
def _get_files_from_cache_entry(cache_entry: database.CacheEntry) -> dict[str, str]:
39+
def _get_files_from_cache_entry(
40+
cache_entry: database.CacheEntry, key: str | None
41+
) -> dict[str, Any]:
3942
result = cache_entry.result
4043
if not isinstance(result, (list, tuple, set)):
4144
result = [result]
@@ -48,27 +51,57 @@ def _get_files_from_cache_entry(cache_entry: database.CacheEntry) -> dict[str, s
4851
and obj["callable"] in FILE_RESULT_CALLABLES
4952
):
5053
fs, urlpath = extra_encoders._get_fs_and_urlpath(*obj["args"][:2])
51-
files[fs.unstrip_protocol(urlpath)] = obj["args"][0]["type"]
54+
value = obj["args"][0]
55+
if key is not None:
56+
value = value[key]
57+
files[fs.unstrip_protocol(urlpath)] = value
5258
return files
5359

5460

55-
def _delete_cache_entry(
56-
session: sa.orm.Session, cache_entry: database.CacheEntry
61+
def _remove_files(
62+
fs: fsspec.AbstractFileSystem,
63+
files: list[str],
64+
max_tries: int = 10,
65+
**kwargs: Any,
5766
) -> None:
58-
fs, _ = utils.get_cache_files_fs_dirname()
59-
files_to_delete = _get_files_from_cache_entry(cache_entry)
60-
logger = config.get().logger
67+
assert max_tries >= 1
68+
if not files:
69+
return
70+
71+
config.get().logger.info("deleting files", n_files_to_delete=len(files), **kwargs)
72+
73+
n_tries = 0
74+
while files:
75+
n_tries += 1
76+
try:
77+
fs.rm(files, **kwargs)
78+
return
79+
except FileNotFoundError:
80+
# Another concurrent process might have deleted files
81+
if n_tries >= max_tries:
82+
raise
83+
files = [file for file in files if fs.exists(file)]
6184

62-
# First, delete database entry
63-
logger.info("deleting cache entry", cache_entry=cache_entry)
64-
session.delete(cache_entry)
85+
86+
def _delete_cache_entries(
87+
session: sa.orm.Session, *cache_entries: database.CacheEntry
88+
) -> None:
89+
fs, _ = utils.get_cache_files_fs_dirname()
90+
files_to_delete = []
91+
dirs_to_delete = []
92+
for cache_entry in cache_entries:
93+
session.delete(cache_entry)
94+
95+
files = _get_files_from_cache_entry(cache_entry, key="type")
96+
for file, file_type in files.items():
97+
if file_type == "application/vnd+zarr":
98+
dirs_to_delete.append(file)
99+
else:
100+
files_to_delete.append(file)
65101
database._commit_or_rollback(session)
66102

67-
# Then, delete files
68-
for urlpath, file_type in files_to_delete.items():
69-
if fs.exists(urlpath):
70-
logger.info("deleting cache file", urlpath=urlpath)
71-
fs.rm(urlpath, recursive=file_type == "application/vnd+zarr")
103+
_remove_files(fs, files_to_delete, recursive=False)
104+
_remove_files(fs, dirs_to_delete, recursive=True)
72105

73106

74107
def delete(func_to_del: str | Callable[..., Any], *args: Any, **kwargs: Any) -> None:
@@ -88,19 +121,20 @@ def delete(func_to_del: str | Callable[..., Any], *args: Any, **kwargs: Any) ->
88121
for cache_entry in session.scalars(
89122
sa.select(database.CacheEntry).filter(database.CacheEntry.key == hexdigest)
90123
):
91-
_delete_cache_entry(session, cache_entry)
124+
_delete_cache_entries(session, cache_entry)
92125

93126

94127
class _Cleaner:
95-
def __init__(self, depth: int) -> None:
128+
def __init__(self, depth: int, use_database: bool) -> None:
96129
self.logger = config.get().logger
97130
self.fs, self.dirname = utils.get_cache_files_fs_dirname()
98131

99132
self.urldir = self.fs.unstrip_protocol(self.dirname)
100133

101134
self.logger.info("getting disk usage")
102135
self.file_sizes: dict[str, int] = collections.defaultdict(int)
103-
for path, size in self.fs.du(self.dirname, total=False).items():
136+
du = self.known_files if use_database else self.fs.du(self.dirname, total=False)
137+
for path, size in du.items():
104138
# Group dirs
105139
urlpath = self.fs.unstrip_protocol(path)
106140
parts = urlpath.replace(self.urldir, "", 1).strip("/").split("/")
@@ -120,6 +154,16 @@ def log_disk_usage(self) -> None:
120154
def stop_cleaning(self, maxsize: int) -> bool:
121155
return self.disk_usage <= maxsize
122156

157+
@property
158+
def known_files(self) -> dict[str, int]:
159+
known_files: dict[str, int] = {}
160+
with config.get().instantiated_sessionmaker() as session:
161+
for cache_entry in session.scalars(sa.select(database.CacheEntry)):
162+
known_files.update(
163+
_get_files_from_cache_entry(cache_entry, key="file:size")
164+
)
165+
return known_files
166+
123167
def get_unknown_files(self, lock_validity_period: float | None) -> set[str]:
124168
self.logger.info("getting unknown files")
125169

@@ -137,25 +181,15 @@ def get_unknown_files(self, lock_validity_period: float | None) -> set[str]:
137181
locked_files.add(urlpath)
138182
locked_files.add(urlpath.rsplit(".lock", 1)[0])
139183

140-
if unknown_files := (set(self.file_sizes) - locked_files):
141-
with config.get().instantiated_sessionmaker() as session:
142-
for cache_entry in session.scalars(sa.select(database.CacheEntry)):
143-
for known_file in _get_files_from_cache_entry(cache_entry):
144-
unknown_files.discard(known_file)
145-
if not unknown_files:
146-
break
147-
return unknown_files
184+
return set(self.file_sizes) - locked_files - set(self.known_files)
148185

149186
def delete_unknown_files(
150187
self, lock_validity_period: float | None, recursive: bool
151188
) -> None:
152189
unknown_files = self.get_unknown_files(lock_validity_period)
153190
for urlpath in unknown_files:
154191
self.pop_file_size(urlpath)
155-
self.remove_files(
156-
list(unknown_files),
157-
recursive=recursive,
158-
)
192+
_remove_files(self.fs, list(unknown_files), recursive=recursive)
159193
self.log_disk_usage()
160194

161195
@staticmethod
@@ -207,30 +241,6 @@ def _get_method_sorters(
207241
sorters.append(database.CacheEntry.expiration)
208242
return sorters
209243

210-
def remove_files(
211-
self,
212-
files: list[str],
213-
max_tries: int = 10,
214-
**kwargs: Any,
215-
) -> None:
216-
assert max_tries >= 1
217-
if not files:
218-
return
219-
220-
self.logger.info("deleting files", n_files_to_delete=len(files), **kwargs)
221-
222-
n_tries = 0
223-
while files:
224-
n_tries += 1
225-
try:
226-
self.fs.rm(files, **kwargs)
227-
return
228-
except FileNotFoundError:
229-
# Another concurrent process might have deleted files
230-
if n_tries >= max_tries:
231-
raise
232-
files = [file for file in files if self.fs.exists(file)]
233-
234244
def delete_cache_files(
235245
self,
236246
maxsize: int,
@@ -244,37 +254,27 @@ def delete_cache_files(
244254
if self.stop_cleaning(maxsize):
245255
return
246256

247-
files_to_delete = []
248-
dirs_to_delete = []
257+
entries_to_delete = []
249258
self.logger.info("getting cache entries to delete")
250-
n_entries_to_delete = 0
251259
with config.get().instantiated_sessionmaker() as session:
252260
for cache_entry in session.scalars(
253261
sa.select(database.CacheEntry).filter(*filters).order_by(*sorters)
254262
):
255-
files = _get_files_from_cache_entry(cache_entry)
263+
files = _get_files_from_cache_entry(cache_entry, key="file:size")
256264
if any(file.startswith(self.urldir) for file in files):
257-
n_entries_to_delete += 1
258-
session.delete(cache_entry)
259-
260-
for file, file_type in files.items():
265+
entries_to_delete.append(cache_entry)
266+
for file in files:
261267
self.pop_file_size(file)
262-
if file_type == "application/vnd+zarr":
263-
dirs_to_delete.append(file)
264-
else:
265-
files_to_delete.append(file)
266268

267269
if self.stop_cleaning(maxsize):
268270
break
269271

270-
if n_entries_to_delete:
272+
if entries_to_delete:
271273
self.logger.info(
272-
"deleting cache entries", n_entries_to_delete=n_entries_to_delete
274+
"deleting cache entries", n_entries_to_delete=len(entries_to_delete)
273275
)
274-
database._commit_or_rollback(session)
276+
_delete_cache_entries(session, *entries_to_delete)
275277

276-
self.remove_files(files_to_delete, recursive=False)
277-
self.remove_files(dirs_to_delete, recursive=True)
278278
self.log_disk_usage()
279279

280280
if not self.stop_cleaning(maxsize):
@@ -296,6 +296,7 @@ def clean_cache_files(
296296
tags_to_clean: list[str | None] | None = None,
297297
tags_to_keep: list[str | None] | None = None,
298298
depth: int = 1,
299+
use_database: bool = False,
299300
) -> None:
300301
"""Clean cache files.
301302
@@ -318,8 +319,15 @@ def clean_cache_files(
318319
tags_to_clean and tags_to_keep are mutually exclusive.
319320
depth: int, default: 1
320321
depth for grouping cache files
322+
use_database: bool, default: False
323+
Whether to infer disk usage from the cacholote database
321324
"""
322-
cleaner = _Cleaner(depth=depth)
325+
if use_database and delete_unknown_files:
326+
raise ValueError(
327+
"'use_database' and 'delete_unknown_files' are mutually exclusive"
328+
)
329+
330+
cleaner = _Cleaner(depth=depth, use_database=use_database)
323331

324332
if delete_unknown_files:
325333
cleaner.delete_unknown_files(lock_validity_period, recursive)
@@ -352,15 +360,15 @@ def clean_invalid_cache_entries(
352360
for cache_entry in session.scalars(
353361
sa.select(database.CacheEntry).filter(*filters)
354362
):
355-
_delete_cache_entry(session, cache_entry)
363+
_delete_cache_entries(session, cache_entry)
356364

357365
if try_decode:
358366
with config.get().instantiated_sessionmaker() as session:
359367
for cache_entry in session.scalars(sa.select(database.CacheEntry)):
360368
try:
361369
decode.loads(cache_entry._result_as_string)
362370
except decode.DecodeError:
363-
_delete_cache_entry(session, cache_entry)
371+
_delete_cache_entries(session, cache_entry)
364372

365373

366374
def expire_cache_entries(
@@ -379,15 +387,14 @@ def expire_cache_entries(
379387
if after is not None:
380388
filters.append(database.CacheEntry.created_at > after)
381389

382-
count = 0
383390
with config.get().instantiated_sessionmaker() as session:
384-
for cache_entry in session.scalars(
385-
sa.select(database.CacheEntry).filter(*filters)
386-
):
387-
count += 1
388-
if delete:
389-
session.delete(cache_entry)
390-
else:
391+
cache_entries = list(
392+
session.scalars(sa.select(database.CacheEntry).filter(*filters))
393+
)
394+
if delete:
395+
_delete_cache_entries(session, *cache_entries)
396+
else:
397+
for cache_entry in cache_entries:
391398
cache_entry.expiration = now
392-
database._commit_or_rollback(session)
393-
return count
399+
database._commit_or_rollback(session)
400+
return len(cache_entries)

tests/test_60_clean.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@ def cached_now() -> datetime.datetime:
4141
@pytest.mark.parametrize("method", ["LRU", "LFU"])
4242
@pytest.mark.parametrize("set_cache", ["file", "cads"], indirect=True)
4343
@pytest.mark.parametrize("folder,depth", [("", 1), ("", 2), ("foo", 2)])
44+
@pytest.mark.parametrize("use_database", [True, False])
4445
def test_clean_cache_files(
4546
tmp_path: pathlib.Path,
4647
set_cache: str,
4748
method: Literal["LRU", "LFU"],
4849
folder: str,
4950
depth: int,
51+
use_database: bool,
5052
) -> None:
5153
con = config.get().engine.raw_connection()
5254
cur = con.cursor()
@@ -66,12 +68,12 @@ def test_clean_cache_files(
6668
assert set(fs.ls(dirname)) == {lru_path, lfu_path}
6769

6870
# Do not clean
69-
clean.clean_cache_files(2, method=method, depth=depth)
71+
clean.clean_cache_files(2, method=method, depth=depth, use_database=use_database)
7072
cur.execute("SELECT COUNT(*) FROM cache_entries", ())
7173
assert cur.fetchone() == (fs.du(dirname),) == (2,)
7274

7375
# Delete one file
74-
clean.clean_cache_files(1, method=method, depth=depth)
76+
clean.clean_cache_files(1, method=method, depth=depth, use_database=use_database)
7577
cur.execute("SELECT COUNT(*) FROM cache_entries", ())
7678
assert cur.fetchone() == (fs.du(dirname),) == (1,)
7779
assert not fs.exists(lru_path if method == "LRU" else lfu_path)

0 commit comments

Comments
 (0)