Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 3201863

Browse files
authored
Resync state after partial-state join (#12394)
We work through all the events with partial state, updating the state at each of them. Once it's done, we recalculate the state for the whole room, and then mark the room as having complete state.
1 parent 86cf6a3 commit 3201863

File tree

8 files changed

+289
-0
lines changed

8 files changed

+289
-0
lines changed

changelog.d/12394.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Preparation for faster-room-join work: start a background process to resynchronise the room state after a room join.

synapse/handlers/federation.py

+75
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,8 @@ async def do_invite_join(
466466
)
467467

468468
if ret.partial_state:
469+
# TODO(faster_joins): roll this back if we don't manage to start the
470+
# background resync (eg process_remote_join fails)
469471
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
470472

471473
max_stream_id = await self._federation_event_handler.process_remote_join(
@@ -478,6 +480,18 @@ async def do_invite_join(
478480
partial_state=ret.partial_state,
479481
)
480482

483+
if ret.partial_state:
484+
# Kick off the process of asynchronously fetching the state for this
485+
# room.
486+
#
487+
# TODO(faster_joins): pick this up again on restart
488+
run_as_background_process(
489+
desc="sync_partial_state_room",
490+
func=self._sync_partial_state_room,
491+
destination=origin,
492+
room_id=room_id,
493+
)
494+
481495
# We wait here until this instance has seen the events come down
482496
# replication (if we're using replication) as the below uses caches.
483497
await self._replication.wait_for_stream_position(
@@ -1370,3 +1384,64 @@ async def get_room_complexity(
13701384
# We fell off the bottom, couldn't get the complexity from anyone. Oh
13711385
# well.
13721386
return None
1387+
1388+
async def _sync_partial_state_room(
1389+
self,
1390+
destination: str,
1391+
room_id: str,
1392+
) -> None:
1393+
"""Background process to resync the state of a partial-state room
1394+
1395+
Args:
1396+
destination: homeserver to pull the state from
1397+
room_id: room to be resynced
1398+
"""
1399+
1400+
# TODO(faster_joins): do we need to lock to avoid races? What happens if other
1401+
# worker processes kick off a resync in parallel? Perhaps we should just elect
1402+
# a single worker to do the resync.
1403+
#
1404+
# TODO(faster_joins): what happens if we leave the room during a resync? if we
1405+
# really leave, that might mean we have difficulty getting the room state over
1406+
# federation.
1407+
#
1408+
# TODO(faster_joins): try other destinations if the one we have fails
1409+
1410+
logger.info("Syncing state for room %s via %s", room_id, destination)
1411+
1412+
# we work through the queue in order of increasing stream ordering.
1413+
while True:
1414+
batch = await self.store.get_partial_state_events_batch(room_id)
1415+
if not batch:
1416+
# all the events are updated, so we can update current state and
1417+
# clear the lazy-loading flag.
1418+
logger.info("Updating current state for %s", room_id)
1419+
assert (
1420+
self.storage.persistence is not None
1421+
), "TODO(faster_joins): support for workers"
1422+
await self.storage.persistence.update_current_state(room_id)
1423+
1424+
logger.info("Clearing partial-state flag for %s", room_id)
1425+
success = await self.store.clear_partial_state_room(room_id)
1426+
if success:
1427+
logger.info("State resync complete for %s", room_id)
1428+
1429+
# TODO(faster_joins) update room stats and user directory?
1430+
return
1431+
1432+
# we raced against more events arriving with partial state. Go round
1433+
# the loop again. We've already logged a warning, so no need for more.
1434+
# TODO(faster_joins): there is still a race here, whereby incoming events which raced
1435+
# with us will fail to be persisted after the call to `clear_partial_state_room` due to
1436+
# having partial state.
1437+
continue
1438+
1439+
events = await self.store.get_events_as_list(
1440+
batch,
1441+
redact_behaviour=EventRedactBehaviour.AS_IS,
1442+
allow_rejected=True,
1443+
)
1444+
for event in events:
1445+
await self._federation_event_handler.update_state_for_partial_state_event(
1446+
destination, event
1447+
)

synapse/handlers/federation_event.py

+39
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,45 @@ async def process_remote_join(
477477

478478
return await self.persist_events_and_notify(room_id, [(event, context)])
479479

480+
async def update_state_for_partial_state_event(
481+
self, destination: str, event: EventBase
482+
) -> None:
483+
"""Recalculate the state at an event as part of a de-partial-stating process
484+
485+
Args:
486+
destination: server to request full state from
487+
event: partial-state event to be de-partial-stated
488+
"""
489+
logger.info("Updating state for %s", event.event_id)
490+
with nested_logging_context(suffix=event.event_id):
491+
# if we have all the event's prev_events, then we can work out the
492+
# state based on their states. Otherwise, we request it from the destination
493+
# server.
494+
#
495+
# This is the same operation as we do when we receive a regular event
496+
# over federation.
497+
state = await self._resolve_state_at_missing_prevs(destination, event)
498+
499+
# build a new state group for it if need be
500+
context = await self._state_handler.compute_event_context(
501+
event,
502+
old_state=state,
503+
)
504+
if context.partial_state:
505+
# this can happen if some or all of the event's prev_events still have
506+
# partial state - ie, an event has an earlier stream_ordering than one
507+
# or more of its prev_events, so we de-partial-state it before its
508+
# prev_events.
509+
#
510+
# TODO(faster_joins): we probably need to be more intelligent, and
511+
# exclude partial-state prev_events from consideration
512+
logger.warning(
513+
"%s still has partial state: can't de-partial-state it yet",
514+
event.event_id,
515+
)
516+
return
517+
await self._store.update_state_for_partial_state_event(event, context)
518+
480519
async def backfill(
481520
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
482521
) -> None:

synapse/storage/databases/main/events.py

+15
Original file line numberDiff line numberDiff line change
@@ -963,6 +963,21 @@ def _persist_transaction_ids_txn(
963963
values=to_insert,
964964
)
965965

966+
async def update_current_state(
967+
self,
968+
room_id: str,
969+
state_delta: DeltaState,
970+
stream_id: int,
971+
) -> None:
972+
"""Update the current state stored in the datatabase for the given room"""
973+
974+
await self.db_pool.runInteraction(
975+
"update_current_state",
976+
self._update_current_state_txn,
977+
state_delta_by_room={room_id: state_delta},
978+
stream_id=stream_id,
979+
)
980+
966981
def _update_current_state_txn(
967982
self,
968983
txn: LoggingTransaction,

synapse/storage/databases/main/events_worker.py

+24
Original file line numberDiff line numberDiff line change
@@ -1979,3 +1979,27 @@ async def is_partial_state_event(self, event_id: str) -> bool:
19791979
desc="is_partial_state_event",
19801980
)
19811981
return result is not None
1982+
1983+
async def get_partial_state_events_batch(self, room_id: str) -> List[str]:
1984+
"""Get a list of events in the given room that have partial state"""
1985+
return await self.db_pool.runInteraction(
1986+
"get_partial_state_events_batch",
1987+
self._get_partial_state_events_batch_txn,
1988+
room_id,
1989+
)
1990+
1991+
@staticmethod
1992+
def _get_partial_state_events_batch_txn(
1993+
txn: LoggingTransaction, room_id: str
1994+
) -> List[str]:
1995+
txn.execute(
1996+
"""
1997+
SELECT event_id FROM partial_state_events AS pse
1998+
JOIN events USING (event_id)
1999+
WHERE pse.room_id = ?
2000+
ORDER BY events.stream_ordering
2001+
LIMIT 100
2002+
""",
2003+
(room_id,),
2004+
)
2005+
return [row[0] for row in txn]

synapse/storage/databases/main/room.py

+31
Original file line numberDiff line numberDiff line change
@@ -1077,6 +1077,37 @@ def get_rooms_for_retention_period_in_range_txn(
10771077
get_rooms_for_retention_period_in_range_txn,
10781078
)
10791079

1080+
async def clear_partial_state_room(self, room_id: str) -> bool:
1081+
# this can race with incoming events, so we watch out for FK errors.
1082+
# TODO(faster_joins): this still doesn't completely fix the race, since the persist process
1083+
# is not atomic. I fear we need an application-level lock.
1084+
try:
1085+
await self.db_pool.runInteraction(
1086+
"clear_partial_state_room", self._clear_partial_state_room_txn, room_id
1087+
)
1088+
return True
1089+
except self.db_pool.engine.module.DatabaseError as e:
1090+
# TODO(faster_joins): how do we distinguish between FK errors and other errors?
1091+
logger.warning(
1092+
"Exception while clearing lazy partial-state-room %s, retrying: %s",
1093+
room_id,
1094+
e,
1095+
)
1096+
return False
1097+
1098+
@staticmethod
1099+
def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None:
1100+
DatabasePool.simple_delete_txn(
1101+
txn,
1102+
table="partial_state_rooms_servers",
1103+
keyvalues={"room_id": room_id},
1104+
)
1105+
DatabasePool.simple_delete_one_txn(
1106+
txn,
1107+
table="partial_state_rooms",
1108+
keyvalues={"room_id": room_id},
1109+
)
1110+
10801111

10811112
class _BackgroundUpdates:
10821113
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"

synapse/storage/databases/main/state.py

+48
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
2222
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
2323
from synapse.events import EventBase
24+
from synapse.events.snapshot import EventContext
2425
from synapse.storage._base import SQLBaseStore
2526
from synapse.storage.database import (
2627
DatabasePool,
@@ -354,6 +355,53 @@ async def get_referenced_state_groups(
354355

355356
return {row["state_group"] for row in rows}
356357

358+
async def update_state_for_partial_state_event(
359+
self,
360+
event: EventBase,
361+
context: EventContext,
362+
) -> None:
363+
"""Update the state group for a partial state event"""
364+
await self.db_pool.runInteraction(
365+
"update_state_for_partial_state_event",
366+
self._update_state_for_partial_state_event_txn,
367+
event,
368+
context,
369+
)
370+
371+
def _update_state_for_partial_state_event_txn(
372+
self,
373+
txn,
374+
event: EventBase,
375+
context: EventContext,
376+
):
377+
# we shouldn't have any outliers here
378+
assert not event.internal_metadata.is_outlier()
379+
380+
# anything that was rejected should have the same state as its
381+
# predecessor.
382+
if context.rejected:
383+
assert context.state_group == context.state_group_before_event
384+
385+
self.db_pool.simple_update_txn(
386+
txn,
387+
table="event_to_state_groups",
388+
keyvalues={"event_id": event.event_id},
389+
updatevalues={"state_group": context.state_group},
390+
)
391+
392+
self.db_pool.simple_delete_one_txn(
393+
txn,
394+
table="partial_state_events",
395+
keyvalues={"event_id": event.event_id},
396+
)
397+
398+
# TODO(faster_joins): need to do something about workers here
399+
txn.call_after(
400+
self._get_state_group_for_event.prefill,
401+
(event.event_id,),
402+
context.state_group,
403+
)
404+
357405

358406
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
359407

synapse/storage/persist_events.py

+56
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,62 @@ async def persist_event(
376376
pos = PersistedEventPosition(self._instance_name, event_stream_id)
377377
return event, pos, self.main_store.get_room_max_token()
378378

379+
async def update_current_state(self, room_id: str) -> None:
380+
"""Recalculate the current state for a room, and persist it"""
381+
state = await self._calculate_current_state(room_id)
382+
delta = await self._calculate_state_delta(room_id, state)
383+
384+
# TODO(faster_joins): get a real stream ordering, to make this work correctly
385+
# across workers.
386+
#
387+
# TODO(faster_joins): this can race against event persistence, in which case we
388+
# will end up with incorrect state. Perhaps we should make this a job we
389+
# farm out to the event persister, somehow.
390+
stream_id = self.main_store.get_room_max_stream_ordering()
391+
await self.persist_events_store.update_current_state(room_id, delta, stream_id)
392+
393+
async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
394+
"""Calculate the current state of a room, based on the forward extremities
395+
396+
Args:
397+
room_id: room for which to calculate current state
398+
399+
Returns:
400+
map from (type, state_key) to event id for the current state in the room
401+
"""
402+
latest_event_ids = await self.main_store.get_latest_event_ids_in_room(room_id)
403+
state_groups = set(
404+
(
405+
await self.main_store._get_state_group_for_events(latest_event_ids)
406+
).values()
407+
)
408+
409+
state_maps_by_state_group = await self.state_store._get_state_for_groups(
410+
state_groups
411+
)
412+
413+
if len(state_groups) == 1:
414+
# If there is only one state group, then we know what the current
415+
# state is.
416+
return state_maps_by_state_group[state_groups.pop()]
417+
418+
# Ok, we need to defer to the state handler to resolve our state sets.
419+
logger.debug("calling resolve_state_groups from preserve_events")
420+
421+
# Avoid a circular import.
422+
from synapse.state import StateResolutionStore
423+
424+
room_version = await self.main_store.get_room_version_id(room_id)
425+
res = await self._state_resolution_handler.resolve_state_groups(
426+
room_id,
427+
room_version,
428+
state_maps_by_state_group,
429+
event_map=None,
430+
state_res_store=StateResolutionStore(self.main_store),
431+
)
432+
433+
return res.state
434+
379435
async def _persist_event_batch(
380436
self,
381437
events_and_contexts: List[Tuple[EventBase, EventContext]],

0 commit comments

Comments
 (0)