Skip to content

Commit 8359c93

Browse files
committed
allow getting disk usage from database
1 parent 43d9e61 commit 8359c93

File tree

3 files changed

+155
-103
lines changed

3 files changed

+155
-103
lines changed

cacholote/cache.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
8282
return _decode_and_update(session, cache_entry, settings)
8383
except decode.DecodeError as ex:
8484
warnings.warn(str(ex), UserWarning)
85-
clean._delete_cache_entry(session, cache_entry)
85+
clean._delete_cache_entries(session, cache_entry)
8686

8787
result = func(*args, **kwargs)
8888
cache_entry = database.CacheEntry(

cacholote/clean.py

Lines changed: 102 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import posixpath
2121
from typing import Any, Callable, Literal, Optional
2222

23+
import fsspec
2324
import pydantic
2425
import sqlalchemy as sa
2526
import sqlalchemy.orm
@@ -35,7 +36,9 @@
3536
)
3637

3738

38-
def _get_files_from_cache_entry(cache_entry: database.CacheEntry) -> dict[str, str]:
39+
def _get_files_from_cache_entry(
40+
cache_entry: database.CacheEntry, key: str | None
41+
) -> dict[str, Any]:
3942
result = cache_entry.result
4043
if not isinstance(result, (list, tuple, set)):
4144
result = [result]
@@ -48,27 +51,57 @@ def _get_files_from_cache_entry(cache_entry: database.CacheEntry) -> dict[str, s
4851
and obj["callable"] in FILE_RESULT_CALLABLES
4952
):
5053
fs, urlpath = extra_encoders._get_fs_and_urlpath(*obj["args"][:2])
51-
files[fs.unstrip_protocol(urlpath)] = obj["args"][0]["type"]
54+
value = obj["args"][0]
55+
if key is not None:
56+
value = value[key]
57+
files[fs.unstrip_protocol(urlpath)] = value
5258
return files
5359

5460

55-
def _delete_cache_entry(
56-
session: sa.orm.Session, cache_entry: database.CacheEntry
61+
def _remove_files(
62+
fs: fsspec.AbstractFileSystem,
63+
files: list[str],
64+
max_tries: int = 10,
65+
**kwargs: Any,
5766
) -> None:
58-
fs, _ = utils.get_cache_files_fs_dirname()
59-
files_to_delete = _get_files_from_cache_entry(cache_entry)
60-
logger = config.get().logger
67+
assert max_tries >= 1
68+
if not files:
69+
return
70+
71+
config.get().logger.info("deleting files", n_files_to_delete=len(files), **kwargs)
72+
73+
n_tries = 0
74+
while files:
75+
n_tries += 1
76+
try:
77+
fs.rm(files, **kwargs)
78+
return
79+
except FileNotFoundError:
80+
# Another concurrent process might have deleted files
81+
if n_tries >= max_tries:
82+
raise
83+
files = [file for file in files if fs.exists(file)]
6184

62-
# First, delete database entry
63-
logger.info("deleting cache entry", cache_entry=cache_entry)
64-
session.delete(cache_entry)
85+
86+
def _delete_cache_entries(
87+
session: sa.orm.Session, *cache_entries: database.CacheEntry
88+
) -> None:
89+
fs, _ = utils.get_cache_files_fs_dirname()
90+
files_to_delete = []
91+
dirs_to_delete = []
92+
for cache_entry in cache_entries:
93+
session.delete(cache_entry)
94+
95+
files = _get_files_from_cache_entry(cache_entry, key="type")
96+
for file, file_type in files.items():
97+
if file_type == "application/vnd+zarr":
98+
dirs_to_delete.append(file)
99+
else:
100+
files_to_delete.append(file)
65101
database._commit_or_rollback(session)
66102

67-
# Then, delete files
68-
for urlpath, file_type in files_to_delete.items():
69-
if fs.exists(urlpath):
70-
logger.info("deleting cache file", urlpath=urlpath)
71-
fs.rm(urlpath, recursive=file_type == "application/vnd+zarr")
103+
_remove_files(fs, files_to_delete, recursive=False)
104+
_remove_files(fs, dirs_to_delete, recursive=True)
72105

73106

74107
def delete(func_to_del: str | Callable[..., Any], *args: Any, **kwargs: Any) -> None:
@@ -88,25 +121,25 @@ def delete(func_to_del: str | Callable[..., Any], *args: Any, **kwargs: Any) ->
88121
for cache_entry in session.scalars(
89122
sa.select(database.CacheEntry).filter(database.CacheEntry.key == hexdigest)
90123
):
91-
_delete_cache_entry(session, cache_entry)
124+
_delete_cache_entries(session, cache_entry)
92125

93126

94127
class _Cleaner:
95-
def __init__(self) -> None:
128+
def __init__(self, depth: int, use_database: bool) -> None:
96129
self.logger = config.get().logger
97130
self.fs, self.dirname = utils.get_cache_files_fs_dirname()
98131

99-
urldir = self.fs.unstrip_protocol(self.dirname)
132+
self.urldir = self.fs.unstrip_protocol(self.dirname)
100133

101134
self.logger.info("getting disk usage")
102135
self.file_sizes: dict[str, int] = collections.defaultdict(int)
103-
for path, size in self.fs.du(self.dirname, total=False).items():
136+
du = self.known_files if use_database else self.fs.du(self.dirname, total=False)
137+
for path, size in du.items():
104138
# Group dirs
105139
urlpath = self.fs.unstrip_protocol(path)
106-
basename, *_ = urlpath.replace(urldir, "", 1).strip("/").split("/")
107-
if basename:
108-
self.file_sizes[posixpath.join(urldir, basename)] += size
109-
140+
parts = urlpath.replace(self.urldir, "", 1).strip("/").split("/")
141+
if parts:
142+
self.file_sizes[posixpath.join(self.urldir, *parts[:depth])] += size
110143
self.disk_usage = sum(self.file_sizes.values())
111144
self.log_disk_usage()
112145

@@ -121,6 +154,16 @@ def log_disk_usage(self) -> None:
121154
def stop_cleaning(self, maxsize: int) -> bool:
122155
return self.disk_usage <= maxsize
123156

157+
@property
158+
def known_files(self) -> dict[str, int]:
159+
known_files: dict[str, int] = {}
160+
with config.get().instantiated_sessionmaker() as session:
161+
for cache_entry in session.scalars(sa.select(database.CacheEntry)):
162+
known_files.update(
163+
_get_files_from_cache_entry(cache_entry, key="file:size")
164+
)
165+
return known_files
166+
124167
def get_unknown_files(self, lock_validity_period: float | None) -> set[str]:
125168
self.logger.info("getting unknown files")
126169

@@ -138,25 +181,15 @@ def get_unknown_files(self, lock_validity_period: float | None) -> set[str]:
138181
locked_files.add(urlpath)
139182
locked_files.add(urlpath.rsplit(".lock", 1)[0])
140183

141-
if unknown_files := (set(self.file_sizes) - locked_files):
142-
with config.get().instantiated_sessionmaker() as session:
143-
for cache_entry in session.scalars(sa.select(database.CacheEntry)):
144-
for known_file in _get_files_from_cache_entry(cache_entry):
145-
unknown_files.discard(known_file)
146-
if not unknown_files:
147-
break
148-
return unknown_files
184+
return set(self.file_sizes) - locked_files - set(self.known_files)
149185

150186
def delete_unknown_files(
151187
self, lock_validity_period: float | None, recursive: bool
152188
) -> None:
153189
unknown_files = self.get_unknown_files(lock_validity_period)
154190
for urlpath in unknown_files:
155191
self.pop_file_size(urlpath)
156-
self.remove_files(
157-
list(unknown_files),
158-
recursive=recursive,
159-
)
192+
_remove_files(self.fs, list(unknown_files), recursive=recursive)
160193
self.log_disk_usage()
161194

162195
@staticmethod
@@ -208,30 +241,6 @@ def _get_method_sorters(
208241
sorters.append(database.CacheEntry.expiration)
209242
return sorters
210243

211-
def remove_files(
212-
self,
213-
files: list[str],
214-
max_tries: int = 10,
215-
**kwargs: Any,
216-
) -> None:
217-
assert max_tries >= 1
218-
if not files:
219-
return
220-
221-
self.logger.info("deleting files", n_files_to_delete=len(files), **kwargs)
222-
223-
n_tries = 0
224-
while files:
225-
n_tries += 1
226-
try:
227-
self.fs.rm(files, **kwargs)
228-
return
229-
except FileNotFoundError:
230-
# Another concurrent process might have deleted files
231-
if n_tries >= max_tries:
232-
raise
233-
files = [file for file in files if self.fs.exists(file)]
234-
235244
def delete_cache_files(
236245
self,
237246
maxsize: int,
@@ -245,37 +254,27 @@ def delete_cache_files(
245254
if self.stop_cleaning(maxsize):
246255
return
247256

248-
files_to_delete = []
249-
dirs_to_delete = []
257+
entries_to_delete = []
250258
self.logger.info("getting cache entries to delete")
251-
n_entries_to_delete = 0
252259
with config.get().instantiated_sessionmaker() as session:
253260
for cache_entry in session.scalars(
254261
sa.select(database.CacheEntry).filter(*filters).order_by(*sorters)
255262
):
256-
files = _get_files_from_cache_entry(cache_entry)
257-
if files:
258-
n_entries_to_delete += 1
259-
session.delete(cache_entry)
260-
261-
for file, file_type in files.items():
262-
self.pop_file_size(file)
263-
if file_type == "application/vnd+zarr":
264-
dirs_to_delete.append(file)
265-
else:
266-
files_to_delete.append(file)
263+
files = _get_files_from_cache_entry(cache_entry, key="file:size")
264+
if any(file.startswith(self.urldir) for file in files):
265+
entries_to_delete.append(cache_entry)
266+
for file in files:
267+
self.pop_file_size(file)
267268

268269
if self.stop_cleaning(maxsize):
269270
break
270271

271-
if n_entries_to_delete:
272+
if entries_to_delete:
272273
self.logger.info(
273-
"deleting cache entries", n_entries_to_delete=n_entries_to_delete
274+
"deleting cache entries", n_entries_to_delete=len(entries_to_delete)
274275
)
275-
database._commit_or_rollback(session)
276+
_delete_cache_entries(session, *entries_to_delete)
276277

277-
self.remove_files(files_to_delete, recursive=False)
278-
self.remove_files(dirs_to_delete, recursive=True)
279278
self.log_disk_usage()
280279

281280
if not self.stop_cleaning(maxsize):
@@ -296,6 +295,8 @@ def clean_cache_files(
296295
lock_validity_period: float | None = None,
297296
tags_to_clean: list[str | None] | None = None,
298297
tags_to_keep: list[str | None] | None = None,
298+
depth: int = 1,
299+
use_database: bool = False,
299300
) -> None:
300301
"""Clean cache files.
301302
@@ -316,8 +317,17 @@ def clean_cache_files(
316317
Tags to clean/keep. If None, delete all cache entries.
317318
To delete/keep untagged entries, add None in the list (e.g., [None, 'tag1', ...]).
318319
tags_to_clean and tags_to_keep are mutually exclusive.
320+
depth: int, default: 1
321+
depth for grouping cache files
322+
use_database: bool, default: False
323+
Whether to infer disk usage from the cacholote database
319324
"""
320-
cleaner = _Cleaner()
325+
if use_database and delete_unknown_files:
326+
raise ValueError(
327+
"'use_database' and 'delete_unknown_files' are mutually exclusive"
328+
)
329+
330+
cleaner = _Cleaner(depth=depth, use_database=use_database)
321331

322332
if delete_unknown_files:
323333
cleaner.delete_unknown_files(lock_validity_period, recursive)
@@ -350,21 +360,22 @@ def clean_invalid_cache_entries(
350360
for cache_entry in session.scalars(
351361
sa.select(database.CacheEntry).filter(*filters)
352362
):
353-
_delete_cache_entry(session, cache_entry)
363+
_delete_cache_entries(session, cache_entry)
354364

355365
if try_decode:
356366
with config.get().instantiated_sessionmaker() as session:
357367
for cache_entry in session.scalars(sa.select(database.CacheEntry)):
358368
try:
359369
decode.loads(cache_entry._result_as_string)
360370
except decode.DecodeError:
361-
_delete_cache_entry(session, cache_entry)
371+
_delete_cache_entries(session, cache_entry)
362372

363373

364374
def expire_cache_entries(
365375
tags: list[str] | None = None,
366376
before: datetime.datetime | None = None,
367377
after: datetime.date | None = None,
378+
delete: bool = False,
368379
) -> int:
369380
now = utils.utcnow()
370381

@@ -376,12 +387,14 @@ def expire_cache_entries(
376387
if after is not None:
377388
filters.append(database.CacheEntry.created_at > after)
378389

379-
count = 0
380390
with config.get().instantiated_sessionmaker() as session:
381-
for cache_entry in session.scalars(
382-
sa.select(database.CacheEntry).filter(*filters)
383-
):
384-
count += 1
385-
cache_entry.expiration = now
386-
database._commit_or_rollback(session)
387-
return count
391+
cache_entries = list(
392+
session.scalars(sa.select(database.CacheEntry).filter(*filters))
393+
)
394+
if delete:
395+
_delete_cache_entries(session, *cache_entries)
396+
else:
397+
for cache_entry in cache_entries:
398+
cache_entry.expiration = now
399+
database._commit_or_rollback(session)
400+
return len(cache_entries)

0 commit comments

Comments
 (0)