|
24 | 24 | from synapse.storage.types import Cursor
|
25 | 25 | from synapse.storage.util.sequence import build_sequence_generator
|
26 | 26 | from synapse.types import MutableStateMap, StateMap
|
| 27 | +from synapse.util.async_helpers import ObservableDeferred |
27 | 28 | from synapse.util.caches.descriptors import cached
|
28 | 29 | from synapse.util.caches.dictionary_cache import DictionaryCache
|
29 | 30 |
|
@@ -91,6 +92,12 @@ def __init__(self, database: DatabasePool, db_conn, hs):
|
91 | 92 | 500000,
|
92 | 93 | )
|
93 | 94 |
|
| 95 | + # Current ongoing get_state_for_groups in-flight requests |
| 96 | + # {group ID -> {StateFilter -> ObservableDeferred}} |
| 97 | + self._state_group_inflight_requests: Dict[ |
| 98 | + int, Dict[StateFilter, ObservableDeferred[StateMap[str]]] |
| 99 | + ] = {} |
| 100 | + |
94 | 101 | def get_max_state_group_txn(txn: Cursor):
|
95 | 102 | txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups")
|
96 | 103 | return txn.fetchone()[0]
|
@@ -249,6 +256,44 @@ async def _get_state_for_groups(
|
249 | 256 | if not incomplete_groups:
|
250 | 257 | return state
|
251 | 258 |
|
| 259 | + # try and rely on in-flight requests to complete this request without |
| 260 | + # needing to spawn additional queries. |
| 261 | + |
| 262 | + # (group ID, ObservableDeferred of request result) |
| 263 | + reusable_inflight_requests: List[ |
| 264 | + Tuple[int, ObservableDeferred[StateMap[str]]] |
| 265 | + ] = [] |
| 266 | + # group ID -> left over StateFilter to request |
| 267 | + requests_to_spawn: Dict[int, StateFilter] = {} |
| 268 | + |
| 269 | + for group in incomplete_groups: |
| 270 | + requests_in_flight_for_group = self._state_group_inflight_requests.get( |
| 271 | + group |
| 272 | + ) |
| 273 | + if requests_in_flight_for_group is None: |
| 274 | + continue |
| 275 | + |
| 276 | + state_filter_left_over = state_filter |
| 277 | + for ( |
| 278 | + request_state_filter, |
| 279 | + request_deferred, |
| 280 | + ) in requests_in_flight_for_group.items(): |
| 281 | + new_state_filter_left_over = state_filter_left_over.approx_difference( |
| 282 | + request_state_filter |
| 283 | + ) |
| 284 | + if new_state_filter_left_over != state_filter_left_over: |
| 285 | + # reusing this request narrows our StateFilter down a bit. |
| 286 | + reusable_inflight_requests.append((group, request_deferred)) |
| 287 | + state_filter_left_over = new_state_filter_left_over |
| 288 | + if state_filter_left_over == StateFilter.none(): |
| 289 | + # we have managed to collect enough of the in-flight requests |
| 290 | + # to cover our StateFilter and give us the state we need. |
| 291 | + break |
| 292 | + else: |
| 293 | + # we have some of the state filter left over, so need to spawn |
| 294 | + # a request |
| 295 | + requests_to_spawn[group] = state_filter_left_over |
| 296 | + |
252 | 297 | cache_sequence_nm = self._state_group_cache.sequence
|
253 | 298 | cache_sequence_m = self._state_group_members_cache.sequence
|
254 | 299 |
|
|
0 commit comments