12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
import logging
15
- from typing import Any , Callable , Dict , Generic , Optional , TypeVar
15
+ from typing import Any , Awaitable , Callable , Dict , Generic , Optional , TypeVar
16
+
17
+ import attr
16
18
17
19
from twisted .internet import defer
18
20
23
25
24
26
logger = logging .getLogger (__name__ )
25
27
26
- T = TypeVar ("T" )
28
+ # the type of the key in the cache
29
+ KV = TypeVar ("KV" )
30
+
31
+ # the type of the result from the operation
32
+ RV = TypeVar ("RV" )
33
+
27
34
35
+ @attr .s (auto_attribs = True )
36
+ class ResponseCacheContext (Generic [KV ]):
37
+ """Information about a missed ResponseCache hit
28
38
29
- class ResponseCache (Generic [T ]):
39
+ This object can be passed into the callback for additional feedback
40
+ """
41
+
42
+ cache_key : KV
43
+ """The cache key that caused the cache miss
44
+
45
+ This should be considered read-only.
46
+
47
+ TODO: in attrs 20.1, make it frozen with an on_setattr.
48
+ """
49
+
50
+ should_cache : bool = True
51
+ """Whether the result should be cached once the request completes.
52
+
53
+ This can be modified by the callback if it decides its result should not be cached.
54
+ """
55
+
56
+
57
+ class ResponseCache (Generic [KV ]):
30
58
"""
31
59
This caches a deferred response. Until the deferred completes it will be
32
60
returned from the cache. This means that if the client retries the request
@@ -35,8 +63,10 @@ class ResponseCache(Generic[T]):
35
63
"""
36
64
37
65
def __init__ (self , clock : Clock , name : str , timeout_ms : float = 0 ):
38
- # Requests that haven't finished yet.
39
- self .pending_result_cache = {} # type: Dict[T, ObservableDeferred]
66
+ # This is poorly-named: it includes both complete and incomplete results.
67
+ # We keep complete results rather than switching to absolute values because
68
+ # that makes it easier to cache Failure results.
69
+ self .pending_result_cache = {} # type: Dict[KV, ObservableDeferred]
40
70
41
71
self .clock = clock
42
72
self .timeout_sec = timeout_ms / 1000.0
@@ -50,16 +80,13 @@ def size(self) -> int:
50
80
def __len__ (self ) -> int :
51
81
return self .size ()
52
82
53
- def get (self , key : T ) -> Optional [defer .Deferred ]:
83
+ def get (self , key : KV ) -> Optional [defer .Deferred ]:
54
84
"""Look up the given key.
55
85
56
- Can return either a new Deferred (which also doesn't follow the synapse
57
- logcontext rules), or, if the request has completed, the actual
58
- result. You will probably want to make_deferred_yieldable the result.
86
+ Returns a new Deferred (which also doesn't follow the synapse
87
+ logcontext rules). You will probably want to make_deferred_yieldable the result.
59
88
60
- If there is no entry for the key, returns None. It is worth noting that
61
- this means there is no way to distinguish a completed result of None
62
- from an absent cache entry.
89
+ If there is no entry for the key, returns None.
63
90
64
91
Args:
65
92
key: key to get/set in the cache
@@ -76,42 +103,56 @@ def get(self, key: T) -> Optional[defer.Deferred]:
76
103
self ._metrics .inc_misses ()
77
104
return None
78
105
79
- def set (self , key : T , deferred : defer .Deferred ) -> defer .Deferred :
106
+ def _set (
107
+ self , context : ResponseCacheContext [KV ], deferred : defer .Deferred
108
+ ) -> defer .Deferred :
80
109
"""Set the entry for the given key to the given deferred.
81
110
82
111
*deferred* should run its callbacks in the sentinel logcontext (ie,
83
112
you should wrap normal synapse deferreds with
84
113
synapse.logging.context.run_in_background).
85
114
86
- Can return either a new Deferred (which also doesn't follow the synapse
87
- logcontext rules), or, if *deferred* was already complete, the actual
88
- result. You will probably want to make_deferred_yieldable the result.
115
+ Returns a new Deferred (which also doesn't follow the synapse logcontext rules).
116
+ You will probably want to make_deferred_yieldable the result.
89
117
90
118
Args:
91
- key: key to get/set in the cache
119
+ context: Information about the cache miss
92
120
deferred: The deferred which resolves to the result.
93
121
94
122
Returns:
95
123
A new deferred which resolves to the actual result.
96
124
"""
97
125
result = ObservableDeferred (deferred , consumeErrors = True )
126
+ key = context .cache_key
98
127
self .pending_result_cache [key ] = result
99
128
100
- def remove (r ):
101
- if self .timeout_sec :
129
+ def on_complete (r ):
130
+ # if this cache has a non-zero timeout, and the callback has not cleared
131
+ # the should_cache bit, we leave it in the cache for now and schedule
132
+ # its removal later.
133
+ if self .timeout_sec and context .should_cache :
102
134
self .clock .call_later (
103
135
self .timeout_sec , self .pending_result_cache .pop , key , None
104
136
)
105
137
else :
138
+ # otherwise, remove the result immediately.
106
139
self .pending_result_cache .pop (key , None )
107
140
return r
108
141
109
- result .addBoth (remove )
142
+ # make sure we do this *after* adding the entry to pending_result_cache,
143
+ # in case the result is already complete (in which case flipping the order would
144
+ # leave us with a stuck entry in the cache).
145
+ result .addBoth (on_complete )
110
146
return result .observe ()
111
147
112
- def wrap (
113
- self , key : T , callback : Callable [..., Any ], * args : Any , ** kwargs : Any
114
- ) -> defer .Deferred :
148
+ async def wrap (
149
+ self ,
150
+ key : KV ,
151
+ callback : Callable [..., Awaitable [RV ]],
152
+ * args : Any ,
153
+ cache_context : bool = False ,
154
+ ** kwargs : Any ,
155
+ ) -> RV :
115
156
"""Wrap together a *get* and *set* call, taking care of logcontexts
116
157
117
158
First looks up the key in the cache, and if it is present makes it
@@ -140,22 +181,28 @@ async def handle_request(request):
140
181
141
182
*args: positional parameters to pass to the callback, if it is used
142
183
184
+ cache_context: if set, the callback will be given a `cache_context` kw arg,
185
+ which will be a ResponseCacheContext object.
186
+
143
187
**kwargs: named parameters to pass to the callback, if it is used
144
188
145
189
Returns:
146
- Deferred which resolves to the result
190
+ The result of the callback (from the cache, or otherwise)
147
191
"""
148
192
result = self .get (key )
149
193
if not result :
150
194
logger .debug (
151
195
"[%s]: no cached result for [%s], calculating new one" , self ._name , key
152
196
)
197
+ context = ResponseCacheContext (cache_key = key )
198
+ if cache_context :
199
+ kwargs ["cache_context" ] = context
153
200
d = run_in_background (callback , * args , ** kwargs )
154
- result = self .set ( key , d )
201
+ result = self ._set ( context , d )
155
202
elif not isinstance (result , defer .Deferred ) or result .called :
156
203
logger .info ("[%s]: using completed cached result for [%s]" , self ._name , key )
157
204
else :
158
205
logger .info (
159
206
"[%s]: using incomplete cached result for [%s]" , self ._name , key
160
207
)
161
- return make_deferred_yieldable (result )
208
+ return await make_deferred_yieldable (result )
0 commit comments