Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 4182bb8

Browse files
committed
move DeferredCache into its own module
1 parent 9f87da0 commit 4182bb8

File tree

10 files changed

+367
-332
lines changed

10 files changed

+367
-332
lines changed

synapse/replication/slave/storage/client_ips.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
from synapse.storage.database import DatabasePool
1717
from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY
18-
from synapse.util.caches.descriptors import DeferredCache
18+
from synapse.util.caches.deferred_cache import DeferredCache
1919

2020
from ._base import BaseSlavedStore
2121

synapse/storage/databases/main/client_ips.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from synapse.metrics.background_process_metrics import wrap_as_background_process
2020
from synapse.storage._base import SQLBaseStore
2121
from synapse.storage.database import DatabasePool, make_tuple_comparison_clause
22-
from synapse.util.caches.descriptors import DeferredCache
22+
from synapse.util.caches.deferred_cache import DeferredCache
2323

2424
logger = logging.getLogger(__name__)
2525

synapse/storage/databases/main/devices.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
)
3535
from synapse.types import Collection, JsonDict, get_verify_key_from_cross_signing_key
3636
from synapse.util import json_decoder, json_encoder
37-
from synapse.util.caches.descriptors import DeferredCache, cached, cachedList
37+
from synapse.util.caches.deferred_cache import DeferredCache
38+
from synapse.util.caches.descriptors import cached, cachedList
3839
from synapse.util.iterutils import batch_iter
3940
from synapse.util.stringutils import shortstr
4041

synapse/storage/databases/main/events_worker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
from synapse.storage.engines import PostgresEngine
4343
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
4444
from synapse.types import Collection, get_domain_from_id
45-
from synapse.util.caches.descriptors import DeferredCache, cached
45+
from synapse.util.caches.deferred_cache import DeferredCache
46+
from synapse.util.caches.descriptors import cached
4647
from synapse.util.iterutils import batch_iter
4748
from synapse.util.metrics import Measure
4849

synapse/util/caches/deferred_cache.py

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2015, 2016 OpenMarket Ltd
3+
# Copyright 2018 New Vector Ltd
4+
# Copyright 2020 The Matrix.org Foundation C.I.C.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
import enum
19+
import threading
20+
from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, cast
21+
22+
from prometheus_client import Gauge
23+
24+
from twisted.internet import defer
25+
26+
from synapse.util.async_helpers import ObservableDeferred
27+
from synapse.util.caches import register_cache
28+
from synapse.util.caches.lrucache import LruCache
29+
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
30+
31+
cache_pending_metric = Gauge(
32+
"synapse_util_caches_cache_pending",
33+
"Number of lookups currently pending for this cache",
34+
["name"],
35+
)
36+
37+
38+
KT = TypeVar("KT")
39+
VT = TypeVar("VT")
40+
41+
42+
class _Sentinel(enum.Enum):
43+
# defining a sentinel in this way allows mypy to correctly handle the
44+
# type of a dictionary lookup.
45+
sentinel = object()
46+
47+
48+
class DeferredCache(Generic[KT, VT]):
49+
"""Wraps an LruCache, adding support for Deferred results.
50+
51+
It expects that each entry added with set() will be a Deferred; likewise get()
52+
may return an ObservableDeferred.
53+
"""
54+
55+
__slots__ = (
56+
"cache",
57+
"name",
58+
"keylen",
59+
"thread",
60+
"metrics",
61+
"_pending_deferred_cache",
62+
)
63+
64+
def __init__(
65+
self,
66+
name: str,
67+
max_entries: int = 1000,
68+
keylen: int = 1,
69+
tree: bool = False,
70+
iterable: bool = False,
71+
apply_cache_factor_from_config: bool = True,
72+
):
73+
"""
74+
Args:
75+
name: The name of the cache
76+
max_entries: Maximum amount of entries that the cache will hold
77+
keylen: The length of the tuple used as the cache key. Ignored unless
78+
`tree` is True.
79+
tree: Use a TreeCache instead of a dict as the underlying cache type
80+
iterable: If True, count each item in the cached object as an entry,
81+
rather than each cached object
82+
apply_cache_factor_from_config: Whether cache factors specified in the
83+
config file affect `max_entries`
84+
"""
85+
cache_type = TreeCache if tree else dict
86+
87+
# _pending_deferred_cache maps from the key value to a `CacheEntry` object.
88+
self._pending_deferred_cache = (
89+
cache_type()
90+
) # type: MutableMapping[KT, CacheEntry]
91+
92+
# cache is used for completed results and maps to the result itself, rather than
93+
# a Deferred.
94+
self.cache = LruCache(
95+
max_size=max_entries,
96+
keylen=keylen,
97+
cache_type=cache_type,
98+
size_callback=(lambda d: len(d)) if iterable else None,
99+
evicted_callback=self._on_evicted,
100+
apply_cache_factor_from_config=apply_cache_factor_from_config,
101+
)
102+
103+
self.name = name
104+
self.keylen = keylen
105+
self.thread = None # type: Optional[threading.Thread]
106+
self.metrics = register_cache(
107+
"cache",
108+
name,
109+
self.cache,
110+
collect_callback=self._metrics_collection_callback,
111+
)
112+
113+
@property
114+
def max_entries(self):
115+
return self.cache.max_size
116+
117+
def _on_evicted(self, evicted_count):
118+
self.metrics.inc_evictions(evicted_count)
119+
120+
def _metrics_collection_callback(self):
121+
cache_pending_metric.labels(self.name).set(len(self._pending_deferred_cache))
122+
123+
def check_thread(self):
124+
expected_thread = self.thread
125+
if expected_thread is None:
126+
self.thread = threading.current_thread()
127+
else:
128+
if expected_thread is not threading.current_thread():
129+
raise ValueError(
130+
"Cache objects can only be accessed from the main thread"
131+
)
132+
133+
def get(
134+
self,
135+
key: KT,
136+
default=_Sentinel.sentinel,
137+
callback: Optional[Callable[[], None]] = None,
138+
update_metrics: bool = True,
139+
):
140+
"""Looks the key up in the caches.
141+
142+
Args:
143+
key(tuple)
144+
default: What is returned if key is not in the caches. If not
145+
specified then function throws KeyError instead
146+
callback(fn): Gets called when the entry in the cache is invalidated
147+
update_metrics (bool): whether to update the cache hit rate metrics
148+
149+
Returns:
150+
Either an ObservableDeferred or the result itself
151+
"""
152+
callbacks = [callback] if callback else []
153+
val = self._pending_deferred_cache.get(key, _Sentinel.sentinel)
154+
if val is not _Sentinel.sentinel:
155+
val.callbacks.update(callbacks)
156+
if update_metrics:
157+
self.metrics.inc_hits()
158+
return val.deferred
159+
160+
val = self.cache.get(key, _Sentinel.sentinel, callbacks=callbacks)
161+
if val is not _Sentinel.sentinel:
162+
self.metrics.inc_hits()
163+
return val
164+
165+
if update_metrics:
166+
self.metrics.inc_misses()
167+
168+
if default is _Sentinel.sentinel:
169+
raise KeyError()
170+
else:
171+
return default
172+
173+
def set(
174+
self,
175+
key: KT,
176+
value: defer.Deferred,
177+
callback: Optional[Callable[[], None]] = None,
178+
) -> ObservableDeferred:
179+
if not isinstance(value, defer.Deferred):
180+
raise TypeError("not a Deferred")
181+
182+
callbacks = [callback] if callback else []
183+
self.check_thread()
184+
observable = ObservableDeferred(value, consumeErrors=True)
185+
observer = observable.observe()
186+
entry = CacheEntry(deferred=observable, callbacks=callbacks)
187+
188+
existing_entry = self._pending_deferred_cache.pop(key, None)
189+
if existing_entry:
190+
existing_entry.invalidate()
191+
192+
self._pending_deferred_cache[key] = entry
193+
194+
def compare_and_pop():
195+
"""Check if our entry is still the one in _pending_deferred_cache, and
196+
if so, pop it.
197+
198+
Returns true if the entries matched.
199+
"""
200+
existing_entry = self._pending_deferred_cache.pop(key, None)
201+
if existing_entry is entry:
202+
return True
203+
204+
# oops, the _pending_deferred_cache has been updated since
205+
# we started our query, so we are out of date.
206+
#
207+
# Better put back whatever we took out. (We do it this way
208+
# round, rather than peeking into the _pending_deferred_cache
209+
# and then removing on a match, to make the common case faster)
210+
if existing_entry is not None:
211+
self._pending_deferred_cache[key] = existing_entry
212+
213+
return False
214+
215+
def cb(result):
216+
if compare_and_pop():
217+
self.cache.set(key, result, entry.callbacks)
218+
else:
219+
# we're not going to put this entry into the cache, so need
220+
# to make sure that the invalidation callbacks are called.
221+
# That was probably done when _pending_deferred_cache was
222+
# updated, but it's possible that `set` was called without
223+
# `invalidate` being previously called, in which case it may
224+
# not have been. Either way, let's double-check now.
225+
entry.invalidate()
226+
227+
def eb(_fail):
228+
compare_and_pop()
229+
entry.invalidate()
230+
231+
# once the deferred completes, we can move the entry from the
232+
# _pending_deferred_cache to the real cache.
233+
#
234+
observer.addCallbacks(cb, eb)
235+
return observable
236+
237+
def prefill(self, key: KT, value: VT, callback: Callable[[], None] = None):
238+
callbacks = [callback] if callback else []
239+
self.cache.set(key, value, callbacks=callbacks)
240+
241+
def invalidate(self, key):
242+
self.check_thread()
243+
self.cache.pop(key, None)
244+
245+
# if we have a pending lookup for this key, remove it from the
246+
# _pending_deferred_cache, which will (a) stop it being returned
247+
# for future queries and (b) stop it being persisted as a proper entry
248+
# in self.cache.
249+
entry = self._pending_deferred_cache.pop(key, None)
250+
251+
# run the invalidation callbacks now, rather than waiting for the
252+
# deferred to resolve.
253+
if entry:
254+
entry.invalidate()
255+
256+
def invalidate_many(self, key: KT):
257+
self.check_thread()
258+
if not isinstance(key, tuple):
259+
raise TypeError("The cache key must be a tuple not %r" % (type(key),))
260+
self.cache.del_multi(key)
261+
262+
# if we have a pending lookup for this key, remove it from the
263+
# _pending_deferred_cache, as above
264+
entry_dict = self._pending_deferred_cache.pop(cast(KT, key), None)
265+
if entry_dict is not None:
266+
for entry in iterate_tree_cache_entry(entry_dict):
267+
entry.invalidate()
268+
269+
def invalidate_all(self):
270+
self.check_thread()
271+
self.cache.clear()
272+
for entry in self._pending_deferred_cache.values():
273+
entry.invalidate()
274+
self._pending_deferred_cache.clear()
275+
276+
277+
class CacheEntry:
278+
__slots__ = ["deferred", "callbacks", "invalidated"]
279+
280+
def __init__(
281+
self, deferred: ObservableDeferred, callbacks: Iterable[Callable[[], None]]
282+
):
283+
self.deferred = deferred
284+
self.callbacks = set(callbacks)
285+
self.invalidated = False
286+
287+
def invalidate(self):
288+
if not self.invalidated:
289+
self.invalidated = True
290+
for callback in self.callbacks:
291+
callback()
292+
self.callbacks.clear()

0 commit comments

Comments
 (0)