Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 800ba87

Browse files
authored
Refactor and convert Linearizer to async (#12357)
Refactor and convert `Linearizer` to async. This makes a `Linearizer` cancellation bug easier to fix. Also refactor to use an async context manager, which eliminates an unlikely footgun where code that doesn't immediately use the context manager could forget to release the lock. Signed-off-by: Sean Quah <[email protected]>
1 parent ab3fdcf commit 800ba87

File tree

21 files changed

+104
-115
lines changed

21 files changed

+104
-115
lines changed

changelog.d/12357.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Refactor `Linearizer`, convert methods to async and use an async context manager.

synapse/federation/federation_server.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ async def _handle_old_staged_events(self) -> None:
188188
async def on_backfill_request(
189189
self, origin: str, room_id: str, versions: List[str], limit: int
190190
) -> Tuple[int, Dict[str, Any]]:
191-
with (await self._server_linearizer.queue((origin, room_id))):
191+
async with self._server_linearizer.queue((origin, room_id)):
192192
origin_host, _ = parse_server_name(origin)
193193
await self.check_server_matches_acl(origin_host, room_id)
194194

@@ -218,7 +218,7 @@ async def on_timestamp_to_event_request(
218218
Tuple indicating the response status code and dictionary response
219219
body including `event_id`.
220220
"""
221-
with (await self._server_linearizer.queue((origin, room_id))):
221+
async with self._server_linearizer.queue((origin, room_id)):
222222
origin_host, _ = parse_server_name(origin)
223223
await self.check_server_matches_acl(origin_host, room_id)
224224

@@ -529,7 +529,7 @@ async def on_room_state_request(
529529
# in the cache so we could return it without waiting for the linearizer
530530
# - but that's non-trivial to get right, and anyway somewhat defeats
531531
# the point of the linearizer.
532-
with (await self._server_linearizer.queue((origin, room_id))):
532+
async with self._server_linearizer.queue((origin, room_id)):
533533
resp: JsonDict = dict(
534534
await self._state_resp_cache.wrap(
535535
(room_id, event_id),
@@ -883,7 +883,7 @@ async def _on_send_membership_event(
883883
async def on_event_auth(
884884
self, origin: str, room_id: str, event_id: str
885885
) -> Tuple[int, Dict[str, Any]]:
886-
with (await self._server_linearizer.queue((origin, room_id))):
886+
async with self._server_linearizer.queue((origin, room_id)):
887887
origin_host, _ = parse_server_name(origin)
888888
await self.check_server_matches_acl(origin_host, room_id)
889889

@@ -945,7 +945,7 @@ async def on_get_missing_events(
945945
latest_events: List[str],
946946
limit: int,
947947
) -> Dict[str, list]:
948-
with (await self._server_linearizer.queue((origin, room_id))):
948+
async with self._server_linearizer.queue((origin, room_id)):
949949
origin_host, _ = parse_server_name(origin)
950950
await self.check_server_matches_acl(origin_host, room_id)
951951

synapse/handlers/appservice.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,10 +330,8 @@ async def _notify_interested_services_ephemeral(
330330
continue
331331

332332
# Since we read/update the stream position for this AS/stream
333-
with (
334-
await self._ephemeral_events_linearizer.queue(
335-
(service.id, stream_key)
336-
)
333+
async with self._ephemeral_events_linearizer.queue(
334+
(service.id, stream_key)
337335
):
338336
if stream_key == "receipt_key":
339337
events = await self._handle_receipts(service, new_token)

synapse/handlers/device.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,7 @@ async def incoming_device_list_update(
833833
async def _handle_device_updates(self, user_id: str) -> None:
834834
"Actually handle pending updates."
835835

836-
with (await self._remote_edu_linearizer.queue(user_id)):
836+
async with self._remote_edu_linearizer.queue(user_id):
837837
pending_updates = self._pending_updates.pop(user_id, [])
838838
if not pending_updates:
839839
# This can happen since we batch updates

synapse/handlers/e2e_keys.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ async def query_devices(
118118
from_device_id: the device making the query. This is used to limit
119119
the number of in-flight queries at a time.
120120
"""
121-
with await self._query_devices_linearizer.queue((from_user_id, from_device_id)):
121+
async with self._query_devices_linearizer.queue((from_user_id, from_device_id)):
122122
device_keys_query: Dict[str, Iterable[str]] = query_body.get(
123123
"device_keys", {}
124124
)
@@ -1386,7 +1386,7 @@ async def _handle_signing_key_updates(self, user_id: str) -> None:
13861386
device_handler = self.e2e_keys_handler.device_handler
13871387
device_list_updater = device_handler.device_list_updater
13881388

1389-
with (await self._remote_edu_linearizer.queue(user_id)):
1389+
async with self._remote_edu_linearizer.queue(user_id):
13901390
pending_updates = self._pending_updates.pop(user_id, [])
13911391
if not pending_updates:
13921392
# This can happen since we batch updates

synapse/handlers/e2e_room_keys.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ async def get_room_keys(
8383

8484
# we deliberately take the lock to get keys so that changing the version
8585
# works atomically
86-
with (await self._upload_linearizer.queue(user_id)):
86+
async with self._upload_linearizer.queue(user_id):
8787
# make sure the backup version exists
8888
try:
8989
await self.store.get_e2e_room_keys_version_info(user_id, version)
@@ -126,7 +126,7 @@ async def delete_room_keys(
126126
"""
127127

128128
# lock for consistency with uploading
129-
with (await self._upload_linearizer.queue(user_id)):
129+
async with self._upload_linearizer.queue(user_id):
130130
# make sure the backup version exists
131131
try:
132132
version_info = await self.store.get_e2e_room_keys_version_info(
@@ -187,7 +187,7 @@ async def upload_room_keys(
187187
# TODO: Validate the JSON to make sure it has the right keys.
188188

189189
# XXX: perhaps we should use a finer grained lock here?
190-
with (await self._upload_linearizer.queue(user_id)):
190+
async with self._upload_linearizer.queue(user_id):
191191

192192
# Check that the version we're trying to upload is the current version
193193
try:
@@ -332,7 +332,7 @@ async def create_version(self, user_id: str, version_info: JsonDict) -> str:
332332
# TODO: Validate the JSON to make sure it has the right keys.
333333

334334
# lock everyone out until we've switched version
335-
with (await self._upload_linearizer.queue(user_id)):
335+
async with self._upload_linearizer.queue(user_id):
336336
new_version = await self.store.create_e2e_room_keys_version(
337337
user_id, version_info
338338
)
@@ -359,7 +359,7 @@ async def get_version_info(
359359
}
360360
"""
361361

362-
with (await self._upload_linearizer.queue(user_id)):
362+
async with self._upload_linearizer.queue(user_id):
363363
try:
364364
res = await self.store.get_e2e_room_keys_version_info(user_id, version)
365365
except StoreError as e:
@@ -383,7 +383,7 @@ async def delete_version(self, user_id: str, version: Optional[str] = None) -> N
383383
NotFoundError: if this backup version doesn't exist
384384
"""
385385

386-
with (await self._upload_linearizer.queue(user_id)):
386+
async with self._upload_linearizer.queue(user_id):
387387
try:
388388
await self.store.delete_e2e_room_keys_version(user_id, version)
389389
except StoreError as e:
@@ -413,7 +413,7 @@ async def update_version(
413413
raise SynapseError(
414414
400, "Version in body does not match", Codes.INVALID_PARAM
415415
)
416-
with (await self._upload_linearizer.queue(user_id)):
416+
async with self._upload_linearizer.queue(user_id):
417417
try:
418418
old_info = await self.store.get_e2e_room_keys_version_info(
419419
user_id, version

synapse/handlers/federation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ async def maybe_backfill(
151151
return. This is used as part of the heuristic to decide if we
152152
should back paginate.
153153
"""
154-
with (await self._room_backfill.queue(room_id)):
154+
async with self._room_backfill.queue(room_id):
155155
return await self._maybe_backfill_inner(room_id, current_depth, limit)
156156

157157
async def _maybe_backfill_inner(

synapse/handlers/federation_event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
224224
len(missing_prevs),
225225
shortstr(missing_prevs),
226226
)
227-
with (await self._room_pdu_linearizer.queue(pdu.room_id)):
227+
async with self._room_pdu_linearizer.queue(pdu.room_id):
228228
logger.info(
229229
"Acquired room lock to fetch %d missing prev_events",
230230
len(missing_prevs),

synapse/handlers/message.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ async def create_and_send_nonmember_event(
851851
# a situation where event persistence can't keep up, causing
852852
# extremities to pile up, which in turn leads to state resolution
853853
# taking longer.
854-
with (await self.limiter.queue(event_dict["room_id"])):
854+
async with self.limiter.queue(event_dict["room_id"]):
855855
if txn_id and requester.access_token_id:
856856
existing_event_id = await self.store.get_event_id_from_transaction_id(
857857
event_dict["room_id"],

synapse/handlers/presence.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,7 +1030,7 @@ async def update_external_syncs_row(
10301030
is_syncing: Whether or not the user is now syncing
10311031
sync_time_msec: Time in ms when the user was last syncing
10321032
"""
1033-
with (await self.external_sync_linearizer.queue(process_id)):
1033+
async with self.external_sync_linearizer.queue(process_id):
10341034
prev_state = await self.current_state_for_user(user_id)
10351035

10361036
process_presence = self.external_process_to_current_syncs.setdefault(
@@ -1071,7 +1071,7 @@ async def update_external_syncs_clear(self, process_id: str) -> None:
10711071
10721072
Used when the process has stopped/disappeared.
10731073
"""
1074-
with (await self.external_sync_linearizer.queue(process_id)):
1074+
async with self.external_sync_linearizer.queue(process_id):
10751075
process_presence = self.external_process_to_current_syncs.pop(
10761076
process_id, set()
10771077
)

synapse/handlers/read_marker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ async def received_client_read_marker(
4040
the read marker has changed.
4141
"""
4242

43-
with await self.read_marker_linearizer.queue((room_id, user_id)):
43+
async with self.read_marker_linearizer.queue((room_id, user_id)):
4444
existing_read_marker = await self.store.get_account_data_for_room_and_type(
4545
user_id, room_id, "m.fully_read"
4646
)

synapse/handlers/room.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,7 @@ async def create_room(
883883
#
884884
# we also don't need to check the requester's shadow-ban here, as we
885885
# have already done so above (and potentially emptied invite_list).
886-
with (await self.room_member_handler.member_linearizer.queue((room_id,))):
886+
async with self.room_member_handler.member_linearizer.queue((room_id,)):
887887
content = {}
888888
is_direct = config.get("is_direct", None)
889889
if is_direct:

synapse/handlers/room_member.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,8 +515,8 @@ async def update_membership(
515515

516516
# We first linearise by the application service (to try to limit concurrent joins
517517
# by application services), and then by room ID.
518-
with (await self.member_as_limiter.queue(as_id)):
519-
with (await self.member_linearizer.queue(key)):
518+
async with self.member_as_limiter.queue(as_id):
519+
async with self.member_linearizer.queue(key):
520520
result = await self.update_membership_locked(
521521
requester,
522522
target,

synapse/handlers/sso.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ async def complete_sso_login_request(
430430
# grab a lock while we try to find a mapping for this user. This seems...
431431
# optimistic, especially for implementations that end up redirecting to
432432
# interstitial pages.
433-
with await self._mapping_lock.queue(auth_provider_id):
433+
async with self._mapping_lock.queue(auth_provider_id):
434434
# first of all, check if we already have a mapping for this user
435435
user_id = await self.get_sso_user_by_remote_user_id(
436436
auth_provider_id,

synapse/push/bulk_push_rule_evaluator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ async def get_rules(
397397
self.room_push_rule_cache_metrics.inc_hits()
398398
return self.data.rules_by_user
399399

400-
with (await self.linearizer.queue(self.room_id)):
400+
async with self.linearizer.queue(self.room_id):
401401
if state_group and self.data.state_group == state_group:
402402
logger.debug("Using cached rules for %r", self.room_id)
403403
self.room_push_rule_cache_metrics.inc_hits()

synapse/replication/tcp/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ async def _save_and_send_ack(self) -> None:
451451
# service for robustness? Or could we replace it with an assertion that
452452
# we're not being re-entered?
453453

454-
with (await self._fed_position_linearizer.queue(None)):
454+
async with self._fed_position_linearizer.queue(None):
455455
# We persist and ack the same position, so we take a copy of it
456456
# here as otherwise it can get modified from underneath us.
457457
current_position = self.federation_position

synapse/rest/media/v1/media_repository.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ async def get_remote_media(
258258
# We linearize here to ensure that we don't try and download remote
259259
# media multiple times concurrently
260260
key = (server_name, media_id)
261-
with (await self.remote_media_linearizer.queue(key)):
261+
async with self.remote_media_linearizer.queue(key):
262262
responder, media_info = await self._get_remote_media_impl(
263263
server_name, media_id
264264
)
@@ -294,7 +294,7 @@ async def get_remote_media_info(self, server_name: str, media_id: str) -> dict:
294294
# We linearize here to ensure that we don't try and download remote
295295
# media multiple times concurrently
296296
key = (server_name, media_id)
297-
with (await self.remote_media_linearizer.queue(key)):
297+
async with self.remote_media_linearizer.queue(key):
298298
responder, media_info = await self._get_remote_media_impl(
299299
server_name, media_id
300300
)
@@ -850,7 +850,7 @@ async def delete_old_remote_media(self, before_ts: int) -> Dict[str, int]:
850850

851851
# TODO: Should we delete from the backup store
852852

853-
with (await self.remote_media_linearizer.queue(key)):
853+
async with self.remote_media_linearizer.queue(key):
854854
full_path = self.filepaths.remote_media_filepath(origin, file_id)
855855
try:
856856
os.remove(full_path)

synapse/state/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ async def resolve_state_groups(
573573
"""
574574
group_names = frozenset(state_groups_ids.keys())
575575

576-
with (await self.resolve_linearizer.queue(group_names)):
576+
async with self.resolve_linearizer.queue(group_names):
577577
cache = self._state_cache.get(group_names, None)
578578
if cache:
579579
return cache

synapse/storage/databases/main/roommember.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,7 @@ async def _get_joined_hosts(
888888
return frozenset(cache.hosts_to_joined_users)
889889

890890
# Since we'll mutate the cache we need to lock.
891-
with (await self._joined_host_linearizer.queue(room_id)):
891+
async with self._joined_host_linearizer.queue(room_id):
892892
if state_entry.state_group == cache.state_group:
893893
# Same state group, so nothing to do. We've already checked for
894894
# this above, but the cache may have changed while waiting on

0 commit comments

Comments
 (0)