Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 71dfcd5

Browse files
committed
Add bulk lookup API to DeferredCache.
1 parent e6a9ae0 commit 71dfcd5

File tree

2 files changed

+102
-32
lines changed

2 files changed

+102
-32
lines changed

synapse/util/caches/deferred_cache.py

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,15 @@
1818
import enum
1919
import threading
2020
from typing import (
21-
Any,
2221
Callable,
2322
Collection,
2423
Dict,
2524
Generic,
26-
Iterable,
2725
MutableMapping,
2826
Optional,
2927
Set,
3028
Sized,
29+
Tuple,
3130
TypeVar,
3231
Union,
3332
cast,
@@ -38,10 +37,9 @@
3837
from twisted.internet import defer
3938
from twisted.python.failure import Failure
4039

41-
from synapse.logging.context import PreserveLoggingContext
4240
from synapse.util.async_helpers import ObservableDeferred
4341
from synapse.util.caches.lrucache import LruCache
44-
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
42+
from synapse.util.caches.treecache import TreeCache
4543

4644
cache_pending_metric = Gauge(
4745
"synapse_util_caches_cache_pending",
@@ -99,7 +97,7 @@ def __init__(
9997

10098
# _pending_deferred_cache maps from the key value to a `CacheEntry` object.
10199
self._pending_deferred_cache: Union[
102-
TreeCache, "MutableMapping[KT, CacheEntry]"
100+
TreeCache, "MutableMapping[KT, CacheEntry[KT, VT]]"
103101
] = cache_type()
104102

105103
def metrics_cb() -> None:
@@ -183,6 +181,73 @@ def get(
183181
else:
184182
return defer.succeed(val2)
185183

184+
def get_bulk(
185+
self,
186+
keys: Collection[KT],
187+
callback: Optional[Callable[[], None]] = None,
188+
) -> Tuple[Dict[KT, VT], Optional["defer.Deferred[Dict[KT, VT]]"], Collection[KT]]:
189+
"""Bulk lookup of items in the cache.
190+
191+
Returns:
192+
A 3-tuple of:
193+
1. a dict of key/value of items already cached;
194+
2. a deferred that resolves to a dict of key/value of items
195+
we're already fetching; and
196+
3. a collection of keys that don't appear in the previous two.
197+
"""
198+
199+
# The cached results
200+
cached = {}
201+
202+
# List of pending deferreds
203+
pending = []
204+
205+
# Dict that gets filled out when the pending deferreds complete
206+
pending_results = {}
207+
208+
# List of keys that aren't in either cache
209+
missing = []
210+
211+
callbacks = (callback,) if callback else ()
212+
213+
for key in keys:
214+
# Check if its in the main cache.
215+
immediate_value = self.cache.get(
216+
key,
217+
_Sentinel.sentinel,
218+
callbacks=callbacks,
219+
)
220+
if immediate_value is not _Sentinel.sentinel:
221+
cached[key] = immediate_value
222+
continue
223+
224+
# Check if its in the pending cache
225+
pending_value = self._pending_deferred_cache.get(key, _Sentinel.sentinel)
226+
if pending_value is not _Sentinel.sentinel:
227+
pending_value.add_callback(key, callback)
228+
229+
def completed_cb(value: VT, key: KT) -> VT:
230+
pending_results[key] = value
231+
return value
232+
233+
# Add a callback to fill out `pending_results` when that completes
234+
d = pending_value.deferred(key).addCallback(completed_cb, key)
235+
pending.append(d)
236+
continue
237+
238+
# Not in either cache
239+
missing.append(key)
240+
241+
# If we've got pending deferreds, squash them into a single one that
242+
# returns `pending_results`.
243+
pending_deferred = None
244+
if pending:
245+
pending_deferred = defer.gatherResults(
246+
pending, consumeErrors=True
247+
).addCallback(lambda _: pending_results)
248+
249+
return (cached, pending_deferred, missing)
250+
186251
def get_immediate(
187252
self, key: KT, default: T, update_metrics: bool = True
188253
) -> Union[VT, T]:

synapse/util/caches/descriptors.py

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
Generic,
2626
Hashable,
2727
Iterable,
28+
List,
2829
Mapping,
2930
Optional,
3031
Sequence,
@@ -435,60 +436,64 @@ def wrapped(*args: Any, **kwargs: Any) -> "defer.Deferred[Dict]":
435436
keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names]
436437
list_args = arg_dict[self.list_name]
437438

438-
results = {}
439-
440-
def update_results_dict(res: Any, arg: Hashable) -> None:
441-
results[arg] = res
442-
443-
# list of deferreds to wait for
444-
cached_defers = []
445-
446-
missing = set()
447-
448439
# If the cache takes a single arg then that is used as the key,
449440
# otherwise a tuple is used.
450441
if num_args == 1:
451442

452443
def arg_to_cache_key(arg: Hashable) -> Hashable:
453444
return arg
454445

446+
def cache_key_to_arg(key: tuple) -> Hashable:
447+
return key
448+
455449
else:
456450
keylist = list(keyargs)
457451

458452
def arg_to_cache_key(arg: Hashable) -> Hashable:
459453
keylist[self.list_pos] = arg
460454
return tuple(keylist)
461455

462-
for arg in list_args:
463-
try:
464-
res = cache.get(arg_to_cache_key(arg), callback=invalidate_callback)
465-
if not res.called:
466-
res.addCallback(update_results_dict, arg)
467-
cached_defers.append(res)
468-
else:
469-
results[arg] = res.result
470-
except KeyError:
471-
missing.add(arg)
456+
def cache_key_to_arg(key: tuple) -> Hashable:
457+
return key[self.list_pos]
458+
459+
cache_keys = [arg_to_cache_key(arg) for arg in list_args]
460+
immediate_results, pending_deferred, missing = cache.get_bulk(
461+
cache_keys, callback=invalidate_callback
462+
)
463+
464+
results = {cache_key_to_arg(key): v for key, v in immediate_results.items()}
465+
466+
cached_defers: List["defer.Deferred[Any]"] = []
467+
if pending_deferred:
468+
469+
def update_results(r: Dict) -> None:
470+
for k, v in r.items():
471+
results[cache_key_to_arg(k)] = v
472+
473+
pending_deferred.addCallback(update_results)
474+
cached_defers.append(pending_deferred)
472475

473476
if missing:
474-
cache_keys = [arg_to_cache_key(key) for key in missing]
475-
cache_entry = cache.set_bulk(cache_keys, callback=invalidate_callback)
477+
cache_entry = cache.set_bulk(missing, invalidate_callback)
476478

477479
def complete_all(res: Dict[Hashable, Any]) -> None:
478480
missing_results = {}
479481
for key in missing:
480-
val = res.get(key, None)
482+
arg = cache_key_to_arg(key)
483+
val = res.get(arg, None)
481484

482-
results[key] = val
483-
missing_results[arg_to_cache_key(key)] = val
485+
results[arg] = val
486+
missing_results[key] = val
484487

485488
cache_entry.complete_bulk(cache, missing_results)
486489

487490
def errback_all(f: Failure) -> None:
488-
cache_entry.error_bulk(cache, cache_keys, f)
491+
cache_entry.error_bulk(cache, missing, f)
489492

490493
args_to_call = dict(arg_dict)
491-
args_to_call[self.list_name] = missing
494+
args_to_call[self.list_name] = {
495+
cache_key_to_arg(key) for key in missing
496+
}
492497

493498
# dispatch the call, and attach the two handlers
494499
missing_d = defer.maybeDeferred(

0 commit comments

Comments
 (0)