Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit d3d9ca1

Browse files
authored
Cancel the processing of key query requests when they time out. (#13680)
1 parent c2fe48a commit d3d9ca1

File tree

18 files changed

+110
-20
lines changed

18 files changed

+110
-20
lines changed

changelog.d/13680.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Cancel the processing of key query requests when they time out.

synapse/api/auth.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
trace,
3939
)
4040
from synapse.types import Requester, create_requester
41+
from synapse.util.cancellation import cancellable
4142

4243
if TYPE_CHECKING:
4344
from synapse.server import HomeServer
@@ -118,6 +119,7 @@ async def check_user_in_room(
118119
errcode=Codes.NOT_JOINED,
119120
)
120121

122+
@cancellable
121123
async def get_user_by_req(
122124
self,
123125
request: SynapseRequest,
@@ -166,6 +168,7 @@ async def get_user_by_req(
166168
parent_span.set_tag("appservice_id", requester.app_service.id)
167169
return requester
168170

171+
@cancellable
169172
async def _wrapped_get_user_by_req(
170173
self,
171174
request: SynapseRequest,
@@ -281,6 +284,7 @@ async def validate_appservice_can_control_user_id(
281284
403, "Application service has not registered this user (%s)" % user_id
282285
)
283286

287+
@cancellable
284288
async def _get_appservice_user(self, request: Request) -> Optional[Requester]:
285289
"""
286290
Given a request, reads the request parameters to determine:
@@ -523,6 +527,7 @@ def has_access_token(request: Request) -> bool:
523527
return bool(query_params) or bool(auth_headers)
524528

525529
@staticmethod
530+
@cancellable
526531
def get_access_token_from_request(request: Request) -> str:
527532
"""Extracts the access_token from the request.
528533

synapse/handlers/device.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
from synapse.util import stringutils
5353
from synapse.util.async_helpers import Linearizer
5454
from synapse.util.caches.expiringcache import ExpiringCache
55+
from synapse.util.cancellation import cancellable
5556
from synapse.util.metrics import measure_func
5657
from synapse.util.retryutils import NotRetryingDestination
5758

@@ -124,6 +125,7 @@ async def get_device(self, user_id: str, device_id: str) -> JsonDict:
124125

125126
return device
126127

128+
@cancellable
127129
async def get_device_changes_in_shared_rooms(
128130
self, user_id: str, room_ids: Collection[str], from_token: StreamToken
129131
) -> Collection[str]:
@@ -163,6 +165,7 @@ async def get_device_changes_in_shared_rooms(
163165

164166
@trace
165167
@measure_func("device.get_user_ids_changed")
168+
@cancellable
166169
async def get_user_ids_changed(
167170
self, user_id: str, from_token: StreamToken
168171
) -> JsonDict:

synapse/handlers/e2e_keys.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
get_verify_key_from_cross_signing_key,
3838
)
3939
from synapse.util import json_decoder, unwrapFirstError
40-
from synapse.util.async_helpers import Linearizer
40+
from synapse.util.async_helpers import Linearizer, delay_cancellation
41+
from synapse.util.cancellation import cancellable
4142
from synapse.util.retryutils import NotRetryingDestination
4243

4344
if TYPE_CHECKING:
@@ -91,6 +92,7 @@ def __init__(self, hs: "HomeServer"):
9192
)
9293

9394
@trace
95+
@cancellable
9496
async def query_devices(
9597
self,
9698
query_body: JsonDict,
@@ -208,22 +210,26 @@ async def query_devices(
208210
r[user_id] = remote_queries[user_id]
209211

210212
# Now fetch any devices that we don't have in our cache
213+
# TODO It might make sense to propagate cancellations into the
214+
# deferreds which are querying remote homeservers.
211215
await make_deferred_yieldable(
212-
defer.gatherResults(
213-
[
214-
run_in_background(
215-
self._query_devices_for_destination,
216-
results,
217-
cross_signing_keys,
218-
failures,
219-
destination,
220-
queries,
221-
timeout,
222-
)
223-
for destination, queries in remote_queries_not_in_cache.items()
224-
],
225-
consumeErrors=True,
226-
).addErrback(unwrapFirstError)
216+
delay_cancellation(
217+
defer.gatherResults(
218+
[
219+
run_in_background(
220+
self._query_devices_for_destination,
221+
results,
222+
cross_signing_keys,
223+
failures,
224+
destination,
225+
queries,
226+
timeout,
227+
)
228+
for destination, queries in remote_queries_not_in_cache.items()
229+
],
230+
consumeErrors=True,
231+
).addErrback(unwrapFirstError)
232+
)
227233
)
228234

229235
ret = {"device_keys": results, "failures": failures}
@@ -347,6 +353,7 @@ async def _query_devices_for_destination(
347353

348354
return
349355

356+
@cancellable
350357
async def get_cross_signing_keys_from_cache(
351358
self, query: Iterable[str], from_user_id: Optional[str]
352359
) -> Dict[str, Dict[str, dict]]:
@@ -393,6 +400,7 @@ async def get_cross_signing_keys_from_cache(
393400
}
394401

395402
@trace
403+
@cancellable
396404
async def query_local_devices(
397405
self, query: Mapping[str, Optional[List[str]]]
398406
) -> Dict[str, Dict[str, dict]]:

synapse/rest/client/keys.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
)
2828
from synapse.http.site import SynapseRequest
2929
from synapse.logging.opentracing import log_kv, set_tag
30+
from synapse.rest.client._base import client_patterns, interactive_auth_handler
3031
from synapse.types import JsonDict, StreamToken
31-
32-
from ._base import client_patterns, interactive_auth_handler
32+
from synapse.util.cancellation import cancellable
3333

3434
if TYPE_CHECKING:
3535
from synapse.server import HomeServer
@@ -156,6 +156,7 @@ def __init__(self, hs: "HomeServer"):
156156
self.auth = hs.get_auth()
157157
self.e2e_keys_handler = hs.get_e2e_keys_handler()
158158

159+
@cancellable
159160
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
160161
requester = await self.auth.get_user_by_req(request, allow_guest=True)
161162
user_id = requester.user.to_string()
@@ -199,6 +200,7 @@ def __init__(self, hs: "HomeServer"):
199200
self.device_handler = hs.get_device_handler()
200201
self.store = hs.get_datastores().main
201202

203+
@cancellable
202204
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
203205
requester = await self.auth.get_user_by_req(request, allow_guest=True)
204206

synapse/storage/controllers/state.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
PartialStateEventsTracker,
3737
)
3838
from synapse.types import MutableStateMap, StateMap
39+
from synapse.util.cancellation import cancellable
3940

4041
if TYPE_CHECKING:
4142
from synapse.server import HomeServer
@@ -229,6 +230,7 @@ async def get_state_for_events(
229230

230231
@trace
231232
@tag_args
233+
@cancellable
232234
async def get_state_ids_for_events(
233235
self,
234236
event_ids: Collection[str],
@@ -350,6 +352,7 @@ def get_state_for_groups(
350352

351353
@trace
352354
@tag_args
355+
@cancellable
353356
async def get_state_group_for_events(
354357
self,
355358
event_ids: Collection[str],
@@ -398,6 +401,7 @@ async def store_state_group(
398401
event_id, room_id, prev_group, delta_ids, current_state_ids
399402
)
400403

404+
@cancellable
401405
async def get_current_state_ids(
402406
self,
403407
room_id: str,

synapse/storage/databases/main/devices.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
from synapse.util.caches.descriptors import cached, cachedList
5454
from synapse.util.caches.lrucache import LruCache
5555
from synapse.util.caches.stream_change_cache import StreamChangeCache
56+
from synapse.util.cancellation import cancellable
5657
from synapse.util.iterutils import batch_iter
5758
from synapse.util.stringutils import shortstr
5859

@@ -668,6 +669,7 @@ def get_device_stream_token(self) -> int:
668669
...
669670

670671
@trace
672+
@cancellable
671673
async def get_user_devices_from_cache(
672674
self, query_list: List[Tuple[str, Optional[str]]]
673675
) -> Tuple[Set[str], Dict[str, Dict[str, JsonDict]]]:
@@ -743,6 +745,7 @@ def get_cached_device_list_changes(
743745

744746
return self._device_list_stream_cache.get_all_entities_changed(from_key)
745747

748+
@cancellable
746749
async def get_users_whose_devices_changed(
747750
self,
748751
from_key: int,
@@ -1221,6 +1224,7 @@ async def _get_min_device_lists_changes_in_room(self) -> int:
12211224
desc="get_min_device_lists_changes_in_room",
12221225
)
12231226

1227+
@cancellable
12241228
async def get_device_list_changes_in_rooms(
12251229
self, room_ids: Collection[str], from_id: int
12261230
) -> Optional[Set[str]]:

synapse/storage/databases/main/end_to_end_keys.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
from synapse.types import JsonDict
5151
from synapse.util import json_encoder
5252
from synapse.util.caches.descriptors import cached, cachedList
53+
from synapse.util.cancellation import cancellable
5354
from synapse.util.iterutils import batch_iter
5455

5556
if TYPE_CHECKING:
@@ -135,6 +136,7 @@ async def get_e2e_device_keys_for_federation_query(
135136
return now_stream_id, []
136137

137138
@trace
139+
@cancellable
138140
async def get_e2e_device_keys_for_cs_api(
139141
self, query_list: List[Tuple[str, Optional[str]]]
140142
) -> Dict[str, Dict[str, JsonDict]]:
@@ -197,6 +199,7 @@ async def get_e2e_device_keys_and_signatures(
197199
...
198200

199201
@trace
202+
@cancellable
200203
async def get_e2e_device_keys_and_signatures(
201204
self,
202205
query_list: Collection[Tuple[str, Optional[str]]],
@@ -887,6 +890,7 @@ def _get_e2e_cross_signing_signatures_txn(
887890

888891
return keys
889892

893+
@cancellable
890894
async def get_e2e_cross_signing_keys_bulk(
891895
self, user_ids: List[str], from_user_id: Optional[str] = None
892896
) -> Dict[str, Optional[Dict[str, JsonDict]]]:
@@ -902,7 +906,6 @@ async def get_e2e_cross_signing_keys_bulk(
902906
keys were not found, either their user ID will not be in the dict,
903907
or their user ID will map to None.
904908
"""
905-
906909
result = await self._get_bare_e2e_cross_signing_keys_bulk(user_ids)
907910

908911
if from_user_id:

synapse/storage/databases/main/event_federation.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from synapse.util import json_encoder
4949
from synapse.util.caches.descriptors import cached
5050
from synapse.util.caches.lrucache import LruCache
51+
from synapse.util.cancellation import cancellable
5152
from synapse.util.iterutils import batch_iter
5253

5354
if TYPE_CHECKING:
@@ -976,6 +977,7 @@ def _get_min_depth_interaction(
976977

977978
return int(min_depth) if min_depth is not None else None
978979

980+
@cancellable
979981
async def get_forward_extremities_for_room_at_stream_ordering(
980982
self, room_id: str, stream_ordering: int
981983
) -> List[str]:

synapse/storage/databases/main/events_worker.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
8282
from synapse.util.caches.descriptors import cached, cachedList
8383
from synapse.util.caches.lrucache import AsyncLruCache
84+
from synapse.util.cancellation import cancellable
8485
from synapse.util.iterutils import batch_iter
8586
from synapse.util.metrics import Measure
8687

@@ -339,6 +340,7 @@ async def get_event(
339340
) -> Optional[EventBase]:
340341
...
341342

343+
@cancellable
342344
async def get_event(
343345
self,
344346
event_id: str,
@@ -433,6 +435,7 @@ async def get_events(
433435

434436
@trace
435437
@tag_args
438+
@cancellable
436439
async def get_events_as_list(
437440
self,
438441
event_ids: Collection[str],
@@ -584,6 +587,7 @@ async def get_events_as_list(
584587

585588
return events
586589

590+
@cancellable
587591
async def _get_events_from_cache_or_db(
588592
self, event_ids: Iterable[str], allow_rejected: bool = False
589593
) -> Dict[str, EventCacheEntry]:

synapse/storage/databases/main/roommember.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
from synapse.util.async_helpers import Linearizer
5656
from synapse.util.caches import intern_string
5757
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
58+
from synapse.util.cancellation import cancellable
5859
from synapse.util.iterutils import batch_iter
5960
from synapse.util.metrics import Measure
6061

@@ -770,6 +771,7 @@ def _get_users_server_still_shares_room_with_txn(
770771
_get_users_server_still_shares_room_with_txn,
771772
)
772773

774+
@cancellable
773775
async def get_rooms_for_user(
774776
self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
775777
) -> FrozenSet[str]:

synapse/storage/databases/main/state.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from synapse.types import JsonDict, JsonMapping, StateMap
3737
from synapse.util.caches import intern_string
3838
from synapse.util.caches.descriptors import cached, cachedList
39+
from synapse.util.cancellation import cancellable
3940
from synapse.util.iterutils import batch_iter
4041

4142
if TYPE_CHECKING:
@@ -281,6 +282,7 @@ def _get_current_state_ids_txn(txn: LoggingTransaction) -> StateMap[str]:
281282
)
282283

283284
# FIXME: how should this be cached?
285+
@cancellable
284286
async def get_partial_filtered_current_state_ids(
285287
self, room_id: str, state_filter: Optional[StateFilter] = None
286288
) -> StateMap[str]:

synapse/storage/databases/main/stream.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
from synapse.types import PersistedEventPosition, RoomStreamToken
7373
from synapse.util.caches.descriptors import cached
7474
from synapse.util.caches.stream_change_cache import StreamChangeCache
75+
from synapse.util.cancellation import cancellable
7576

7677
if TYPE_CHECKING:
7778
from synapse.server import HomeServer
@@ -597,6 +598,7 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
597598

598599
return ret, key
599600

601+
@cancellable
600602
async def get_membership_changes_for_user(
601603
self,
602604
user_id: str,

synapse/storage/databases/state/store.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from synapse.types import MutableStateMap, StateKey, StateMap
3232
from synapse.util.caches.descriptors import cached
3333
from synapse.util.caches.dictionary_cache import DictionaryCache
34+
from synapse.util.cancellation import cancellable
3435

3536
if TYPE_CHECKING:
3637
from synapse.server import HomeServer
@@ -156,6 +157,7 @@ def _get_state_group_delta_txn(txn: LoggingTransaction) -> _GetStateGroupDelta:
156157
"get_state_group_delta", _get_state_group_delta_txn
157158
)
158159

160+
@cancellable
159161
async def _get_state_groups_from_groups(
160162
self, groups: List[int], state_filter: StateFilter
161163
) -> Dict[int, StateMap[str]]:
@@ -235,6 +237,7 @@ def _get_state_for_group_using_cache(
235237

236238
return state_filter.filter_state(state_dict_ids), not missing_types
237239

240+
@cancellable
238241
async def _get_state_for_groups(
239242
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
240243
) -> Dict[int, MutableStateMap[str]]:

0 commit comments

Comments
 (0)