Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Refactor EventContext #12689

Merged
merged 9 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions synapse/events/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,24 +270,6 @@ async def get_prev_state_ids(self) -> StateMap[str]:
assert self._prev_state_ids is not None
return self._prev_state_ids

def get_cached_current_state_ids(self) -> Optional[StateMap[str]]:
"""Gets the current state IDs if we have them already cached.

It is an error to access this for a rejected event, since rejected state should
not make it into the room state. This method will raise an exception if
``rejected`` is set.

Returns:
Returns None if we haven't cached the state or if state_group is None
(which happens when the associated event is an outlier).

Otherwise, returns the the current state IDs.
"""
if self.rejected:
raise RuntimeError("Attempt to access state_ids of rejected event")

return self._current_state_ids

async def _ensure_fetched(self) -> None:
return None

Expand Down
6 changes: 0 additions & 6 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ async def _persist_events_and_state_updates(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
*,
current_state_for_room: Dict[str, StateMap[str]],
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremities: Dict[str, Set[str]],
use_negative_stream_ordering: bool = False,
Expand All @@ -139,8 +138,6 @@ async def _persist_events_and_state_updates(

Args:
events_and_contexts:
current_state_for_room: Map from room_id to the current state of
the room based on forward extremities
state_delta_for_room: Map from room_id to the delta to apply to
room state
new_forward_extremities: Map from room_id to set of event IDs
Expand Down Expand Up @@ -215,9 +212,6 @@ async def _persist_events_and_state_updates(

event_counter.labels(event.type, origin_type, origin_entity).inc()

for room_id, new_state in current_state_for_room.items():
self.store.get_current_state_ids.prefill((room_id,), new_state)

for room_id, latest_event_ids in new_forward_extremities.items():
self.store.get_latest_event_ids_in_room.prefill(
(room_id,), list(latest_event_ids)
Expand Down
35 changes: 4 additions & 31 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,12 +487,6 @@ async def _persist_event_batch(
# extremities in each room
new_forward_extremities: Dict[str, Set[str]] = {}

# map room_id->(type,state_key)->event_id tracking the full
# state in each room after adding these events.
# This is simply used to prefill the get_current_state_ids
# cache
current_state_for_room: Dict[str, StateMap[str]] = {}

# map room_id->(to_delete, to_insert) where to_delete is a list
# of type/state keys to remove from current state, and to_insert
# is a map (type,key)->event_id giving the state delta in each
Expand Down Expand Up @@ -628,14 +622,8 @@ async def _persist_event_batch(

state_delta_for_room[room_id] = delta

# If we have the current_state then lets prefill
# the cache with it.
if current_state is not None:
current_state_for_room[room_id] = current_state

await self.persist_events_store._persist_events_and_state_updates(
chunk,
current_state_for_room=current_state_for_room,
state_delta_for_room=state_delta_for_room,
new_forward_extremities=new_forward_extremities,
use_negative_stream_ordering=backfilled,
Expand Down Expand Up @@ -743,9 +731,6 @@ async def _get_new_state_after_events(
the new current state is only returned if we've already calculated
it.
"""
# map from state_group to ((type, key) -> event_id) state map
state_groups_map = {}

# Map from (prev state group, new state group) -> delta state dict
state_group_deltas = {}

Expand All @@ -759,16 +744,6 @@ async def _get_new_state_after_events(
)
continue

if ctx.state_group in state_groups_map:
continue

# We're only interested in pulling out state that has already
# been cached in the context. We'll pull stuff out of the DB later
# if necessary.
current_state_ids = ctx.get_cached_current_state_ids()
if current_state_ids is not None:
state_groups_map[ctx.state_group] = current_state_ids

if ctx.prev_group:
state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids

Expand Down Expand Up @@ -829,15 +804,13 @@ async def _get_new_state_after_events(
# so lets just return that. If we happen to already have
# the current state in memory then lets also return that,
# but it doesn't matter if we don't.
new_state = state_groups_map.get(new_state_group)
return new_state, delta_ids, new_latest_event_ids
return None, delta_ids, new_latest_event_ids

# Now that we have calculated new_state_groups we need to get
# their state IDs so we can resolve to a single state set.
missing_state = new_state_groups - set(state_groups_map)
if missing_state:
group_to_state = await self.state_store._get_state_for_groups(missing_state)
state_groups_map.update(group_to_state)
state_groups_map = await self.state_store._get_state_for_groups(
new_state_groups
)

if len(new_state_groups) == 1:
# If there is only one state group, then we know what the current
Expand Down