Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 3bb55e2

Browse files
committed
Store information in the database about partial-state joins
When we get a partial_state response from send_join, store information in the database about it: * store a record about the room as a whole having partial state, and stash the list of member servers too. * flag the join event itself as having partial state * also, for any new events whose prev-events are partial-stated, note that they will *also* be partial-stated. We don't yet make any attempt to interpret this data, so API calls (and a bunch of other things) are just going to get incorrect datat.
1 parent 092a2eb commit 3bb55e2

File tree

7 files changed

+135
-5
lines changed

7 files changed

+135
-5
lines changed

synapse/events/snapshot.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ class EventContext:
101101
102102
As with _current_state_ids, this is a private attribute. It should be
103103
accessed via get_prev_state_ids.
104+
105+
partial_state: if True, we may be storing this event with a temporary,
106+
incomplete state.
104107
"""
105108

106109
rejected: Union[bool, str] = False
@@ -113,12 +116,15 @@ class EventContext:
113116
_current_state_ids: Optional[StateMap[str]] = None
114117
_prev_state_ids: Optional[StateMap[str]] = None
115118

119+
partial_state: bool = False
120+
116121
@staticmethod
117122
def with_state(
118123
state_group: Optional[int],
119124
state_group_before_event: Optional[int],
120125
current_state_ids: Optional[StateMap[str]],
121126
prev_state_ids: Optional[StateMap[str]],
127+
partial_state: bool,
122128
prev_group: Optional[int] = None,
123129
delta_ids: Optional[StateMap[str]] = None,
124130
) -> "EventContext":
@@ -129,6 +135,7 @@ def with_state(
129135
state_group_before_event=state_group_before_event,
130136
prev_group=prev_group,
131137
delta_ids=delta_ids,
138+
partial_state=partial_state,
132139
)
133140

134141
@staticmethod
@@ -170,6 +177,7 @@ async def serialize(self, event: EventBase, store: "DataStore") -> JsonDict:
170177
"prev_group": self.prev_group,
171178
"delta_ids": _encode_state_dict(self.delta_ids),
172179
"app_service_id": self.app_service.id if self.app_service else None,
180+
"partial_state": self.partial_state,
173181
}
174182

175183
@staticmethod
@@ -196,6 +204,7 @@ def deserialize(storage: "Storage", input: JsonDict) -> "EventContext":
196204
prev_group=input["prev_group"],
197205
delta_ids=_decode_state_dict(input["delta_ids"]),
198206
rejected=input["rejected"],
207+
partial_state=input.get("partial_state", False),
199208
)
200209

201210
app_service_id = input["app_service_id"]

synapse/handlers/federation.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,8 +519,17 @@ async def do_invite_join(
519519
auth_events=auth_chain,
520520
)
521521

522+
if ret.partial_state:
523+
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
524+
522525
max_stream_id = await self._federation_event_handler.process_remote_join(
523-
origin, room_id, auth_chain, state, event, room_version_obj
526+
origin,
527+
room_id,
528+
auth_chain,
529+
state,
530+
event,
531+
room_version_obj,
532+
partial_state=ret.partial_state,
524533
)
525534

526535
# We wait here until this instance has seen the events come down

synapse/handlers/federation_event.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ async def process_remote_join(
397397
state: List[EventBase],
398398
event: EventBase,
399399
room_version: RoomVersion,
400+
partial_state: bool,
400401
) -> int:
401402
"""Persists the events returned by a send_join
402403
@@ -412,6 +413,7 @@ async def process_remote_join(
412413
event
413414
room_version: The room version we expect this room to have, and
414415
will raise if it doesn't match the version in the create event.
416+
partial_state: True if the state omits non-critical membership events
415417
416418
Returns:
417419
The stream ID after which all events have been persisted.
@@ -453,10 +455,14 @@ async def process_remote_join(
453455
)
454456

455457
# and now persist the join event itself.
456-
logger.info("Peristing join-via-remote %s", event)
458+
logger.info(
459+
"Peristing join-via-remote %s (partial_state: %s)", event, partial_state
460+
)
457461
with nested_logging_context(suffix=event.event_id):
458462
context = await self._state_handler.compute_event_context(
459-
event, old_state=state
463+
event,
464+
old_state=state,
465+
partial_state=partial_state,
460466
)
461467

462468
context = await self._check_event_auth(origin, event, context)
@@ -1791,6 +1797,7 @@ async def _update_context_for_auth_events(
17911797
prev_state_ids=prev_state_ids,
17921798
prev_group=prev_group,
17931799
delta_ids=state_updates,
1800+
partial_state=context.partial_state,
17941801
)
17951802

17961803
async def _run_push_actions_and_persist_event(

synapse/state/__init__.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,10 @@ async def get_hosts_in_room_at_events(
258258
return await self.store.get_joined_hosts(room_id, entry)
259259

260260
async def compute_event_context(
261-
self, event: EventBase, old_state: Optional[Iterable[EventBase]] = None
261+
self,
262+
event: EventBase,
263+
old_state: Optional[Iterable[EventBase]] = None,
264+
partial_state: bool = False,
262265
) -> EventContext:
263266
"""Build an EventContext structure for a non-outlier event.
264267
@@ -273,6 +276,8 @@ async def compute_event_context(
273276
calculated from existing events. This is normally only specified
274277
when receiving an event from federation where we don't have the
275278
prev events for, e.g. when backfilling.
279+
partial_state: True if `old_state` is partial and omits non-critical
280+
membership events
276281
Returns:
277282
The event context.
278283
"""
@@ -295,8 +300,24 @@ async def compute_event_context(
295300

296301
else:
297302
# otherwise, we'll need to resolve the state across the prev_events.
298-
logger.debug("calling resolve_state_groups from compute_event_context")
299303

304+
# partial_state should not be set explicitly in this case:
305+
# we work it out dynamically
306+
assert not partial_state
307+
308+
# if any of the prev-events have partial state, so do we.
309+
# (This is slightly racy - the prev-events might get fixed up before we use
310+
# their states - but I don't think that really matters; it just means we
311+
# might redundantly recalculate the state for this event later.)
312+
prev_event_ids = event.prev_event_ids()
313+
incomplete_prev_events = await self.store.get_partial_state_events(
314+
prev_event_ids
315+
)
316+
if any(incomplete_prev_events):
317+
logger.debug("Incoming event refers to prev-events with partial state")
318+
partial_state = True
319+
320+
logger.debug("calling resolve_state_groups from compute_event_context")
300321
entry = await self.resolve_state_groups_for_events(
301322
event.room_id, event.prev_event_ids()
302323
)
@@ -342,6 +363,7 @@ async def compute_event_context(
342363
prev_state_ids=state_ids_before_event,
343364
prev_group=state_group_before_event_prev_group,
344365
delta_ids=deltas_to_state_group_before_event,
366+
partial_state=partial_state,
345367
)
346368

347369
#
@@ -373,6 +395,7 @@ async def compute_event_context(
373395
prev_state_ids=state_ids_before_event,
374396
prev_group=state_group_before_event,
375397
delta_ids=delta_ids,
398+
partial_state=partial_state,
376399
)
377400

378401
@measure_func()

synapse/storage/databases/main/events.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2152,6 +2152,23 @@ def _store_event_state_mappings_txn(
21522152

21532153
state_groups[event.event_id] = context.state_group
21542154

2155+
# if we have partial state for these events, record the fact. (This happens
2156+
# here rather than in _store_event_txn because it also needs to happen when
2157+
# we de-outlier an event.)
2158+
self.db_pool.simple_insert_many_txn(
2159+
txn,
2160+
table="partial_state_events",
2161+
keys=("room_id", "event_id"),
2162+
values=[
2163+
(
2164+
event.room_id,
2165+
event.event_id,
2166+
)
2167+
for event, ctx in events_and_contexts
2168+
if ctx.partial_state
2169+
],
2170+
)
2171+
21552172
self.db_pool.simple_upsert_many_txn(
21562173
txn,
21572174
table="event_to_state_groups",

synapse/storage/databases/main/events_worker.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1953,3 +1953,31 @@ def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]:
19531953
"get_event_id_for_timestamp_txn",
19541954
get_event_id_for_timestamp_txn,
19551955
)
1956+
1957+
@cachedList("is_partial_state_event", list_name="event_ids")
1958+
async def get_partial_state_events(
1959+
self, event_ids: Collection[str]
1960+
) -> Dict[str, bool]:
1961+
"""Checks which of the given events have partial state"""
1962+
result = await self.db_pool.simple_select_many_batch(
1963+
table="partial_state_events",
1964+
column="event_id",
1965+
iterable=event_ids,
1966+
retcols=["event_id"],
1967+
desc="get_partial_state_events",
1968+
)
1969+
# convert the result to a dict, to make @cachedList work
1970+
partial = {r["event_id"] for r in result}
1971+
return {e_id: e_id in partial for e_id in event_ids}
1972+
1973+
@cached()
1974+
async def is_partial_state_event(self, event_id: str) -> bool:
1975+
"""Checks if the given event has partial state"""
1976+
result = await self.db_pool.simple_select_one_onecol(
1977+
table="partial_state_events",
1978+
keyvalues={"event_id": event_id},
1979+
retcol="1",
1980+
allow_none=True,
1981+
desc="is_partial_state_event",
1982+
)
1983+
return result is not None

synapse/storage/databases/main/room.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
TYPE_CHECKING,
2121
Any,
2222
Awaitable,
23+
Collection,
2324
Dict,
2425
List,
2526
Optional,
@@ -1543,6 +1544,42 @@ async def upsert_room_on_join(
15431544
lock=False,
15441545
)
15451546

1547+
async def store_partial_state_room(
1548+
self,
1549+
room_id: str,
1550+
servers: Collection[str],
1551+
) -> None:
1552+
"""Mark the given room as containing events with partial state
1553+
1554+
Args:
1555+
room_id: the ID of the room
1556+
servers: other servers known to be in the room
1557+
"""
1558+
await self.db_pool.runInteraction(
1559+
"store_partial_state_room",
1560+
self._store_partial_state_room_txn,
1561+
room_id,
1562+
servers,
1563+
)
1564+
1565+
@staticmethod
1566+
def _store_partial_state_room_txn(
1567+
txn: LoggingTransaction, room_id: str, servers: Collection[str]
1568+
):
1569+
DatabasePool.simple_insert_txn(
1570+
txn,
1571+
table="partial_state_rooms",
1572+
values={
1573+
"room_id": room_id,
1574+
},
1575+
)
1576+
DatabasePool.simple_insert_many_txn(
1577+
txn,
1578+
table="partial_state_rooms_servers",
1579+
keys=("room_id", "server_name"),
1580+
values=((room_id, s) for s in servers),
1581+
)
1582+
15461583
async def maybe_store_room_on_outlier_membership(
15471584
self, room_id: str, room_version: RoomVersion
15481585
) -> None:

0 commit comments

Comments
 (0)