23
23
Dict ,
24
24
Iterable ,
25
25
List ,
26
- Optional ,
27
26
Sequence ,
28
27
Set ,
29
28
Tuple ,
@@ -278,19 +277,17 @@ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
278
277
)
279
278
280
279
try :
281
- await self ._process_received_pdu (
282
- origin , pdu , state_ids = None , partial_state = None
283
- )
280
+ context = await self ._state_handler .compute_event_context (pdu )
281
+ await self ._process_received_pdu (origin , pdu , context )
284
282
except PartialStateConflictError :
285
283
# The room was un-partial stated while we were processing the PDU.
286
284
# Try once more, with full state this time.
287
285
logger .info (
288
286
"Room %s was un-partial stated while processing the PDU, trying again." ,
289
287
room_id ,
290
288
)
291
- await self ._process_received_pdu (
292
- origin , pdu , state_ids = None , partial_state = None
293
- )
289
+ context = await self ._state_handler .compute_event_context (pdu )
290
+ await self ._process_received_pdu (origin , pdu , context )
294
291
295
292
async def on_send_membership_event (
296
293
self , origin : str , event : EventBase
@@ -320,6 +317,7 @@ async def on_send_membership_event(
320
317
The event and context of the event after inserting it into the room graph.
321
318
322
319
Raises:
320
+ RuntimeError if any prev_events are missing
323
321
SynapseError if the event is not accepted into the room
324
322
PartialStateConflictError if the room was un-partial stated in between
325
323
computing the state at the event and persisting it. The caller should
@@ -380,7 +378,7 @@ async def on_send_membership_event(
380
378
# need to.
381
379
await self ._event_creation_handler .cache_joined_hosts_for_event (event , context )
382
380
383
- await self ._check_for_soft_fail (event , None , origin = origin )
381
+ await self ._check_for_soft_fail (event , context = context , origin = origin )
384
382
await self ._run_push_actions_and_persist_event (event , context )
385
383
return event , context
386
384
@@ -538,36 +536,10 @@ async def update_state_for_partial_state_event(
538
536
#
539
537
# This is the same operation as we do when we receive a regular event
540
538
# over federation.
541
- state_ids , partial_state = await self ._resolve_state_at_missing_prevs (
539
+ context = await self ._compute_event_context_with_maybe_missing_prevs (
542
540
destination , event
543
541
)
544
-
545
- # There are three possible cases for (state_ids, partial_state):
546
- # * `state_ids` and `partial_state` are both `None` if we had all the
547
- # prev_events. The prev_events may or may not have partial state and
548
- # we won't know until we compute the event context.
549
- # * `state_ids` is not `None` and `partial_state` is `False` if we were
550
- # missing some prev_events (but we have full state for any we did
551
- # have). We calculated the full state after the prev_events.
552
- # * `state_ids` is not `None` and `partial_state` is `True` if we were
553
- # missing some, but not all, prev_events. At least one of the
554
- # prev_events we did have had partial state, so we calculated a partial
555
- # state after the prev_events.
556
-
557
- context = None
558
- if state_ids is not None and partial_state :
559
- # the state after the prev events is still partial. We can't de-partial
560
- # state the event, so don't bother building the event context.
561
- pass
562
- else :
563
- # build a new state group for it if need be
564
- context = await self ._state_handler .compute_event_context (
565
- event ,
566
- state_ids_before_event = state_ids ,
567
- partial_state = partial_state ,
568
- )
569
-
570
- if context is None or context .partial_state :
542
+ if context .partial_state :
571
543
# this can happen if some or all of the event's prev_events still have
572
544
# partial state. We were careful to only pick events from the db without
573
545
# partial-state prev events, so that implies that a prev event has
@@ -840,26 +812,25 @@ async def _process_pulled_event(
840
812
841
813
try :
842
814
try :
843
- state_ids , partial_state = await self ._resolve_state_at_missing_prevs (
815
+ context = await self ._compute_event_context_with_maybe_missing_prevs (
844
816
origin , event
845
817
)
846
818
await self ._process_received_pdu (
847
819
origin ,
848
820
event ,
849
- state_ids = state_ids ,
850
- partial_state = partial_state ,
821
+ context ,
851
822
backfilled = backfilled ,
852
823
)
853
824
except PartialStateConflictError :
854
825
# The room was un-partial stated while we were processing the event.
855
826
# Try once more, with full state this time.
856
- state_ids , partial_state = await self ._resolve_state_at_missing_prevs (
827
+ context = await self ._compute_event_context_with_maybe_missing_prevs (
857
828
origin , event
858
829
)
859
830
860
831
# We ought to have full state now, barring some unlikely race where we left and
861
832
# rejoned the room in the background.
862
- if state_ids is not None and partial_state :
833
+ if context . partial_state :
863
834
raise AssertionError (
864
835
f"Event { event .event_id } still has a partial resolved state "
865
836
f"after room { event .room_id } was un-partial stated"
@@ -868,8 +839,7 @@ async def _process_pulled_event(
868
839
await self ._process_received_pdu (
869
840
origin ,
870
841
event ,
871
- state_ids = state_ids ,
872
- partial_state = partial_state ,
842
+ context ,
873
843
backfilled = backfilled ,
874
844
)
875
845
except FederationError as e :
@@ -878,15 +848,18 @@ async def _process_pulled_event(
878
848
else :
879
849
raise
880
850
881
- async def _resolve_state_at_missing_prevs (
851
+ async def _compute_event_context_with_maybe_missing_prevs (
882
852
self , dest : str , event : EventBase
883
- ) -> Tuple [Optional [StateMap [str ]], Optional [bool ]]:
884
- """Calculate the state at an event with missing prev_events.
853
+ ) -> EventContext :
854
+ """Build an EventContext structure for a non-outlier event whose prev_events may
855
+ be missing.
885
856
886
- This is used when we have pulled a batch of events from a remote server, and
887
- still don't have all the prev_events.
857
+ This is used when we have pulled a batch of events from a remote server, and may
858
+ not have all the prev_events.
888
859
889
- If we already have all the prev_events for `event`, this method does nothing.
860
+ To build an EventContext, we need to calculate the state before the event. If we
861
+ already have all the prev_events for `event`, we can simply use the state after
862
+ the prev_events to calculate the state before `event`.
890
863
891
864
Otherwise, the missing prevs become new backwards extremities, and we fall back
892
865
to asking the remote server for the state after each missing `prev_event`,
@@ -907,10 +880,7 @@ async def _resolve_state_at_missing_prevs(
907
880
event: an event to check for missing prevs.
908
881
909
882
Returns:
910
- if we already had all the prev events, `None, None`. Otherwise, returns a
911
- tuple containing:
912
- * the event ids of the state at `event`.
913
- * a boolean indicating whether the state may be partial.
883
+ The event context.
914
884
915
885
Raises:
916
886
FederationError if we fail to get the state from the remote server after any
@@ -924,7 +894,7 @@ async def _resolve_state_at_missing_prevs(
924
894
missing_prevs = prevs - seen
925
895
926
896
if not missing_prevs :
927
- return None , None
897
+ return await self . _state_handler . compute_event_context ( event )
928
898
929
899
logger .info (
930
900
"Event %s is missing prev_events %s: calculating state for a "
@@ -990,7 +960,9 @@ async def _resolve_state_at_missing_prevs(
990
960
"We can't get valid state history." ,
991
961
affected = event_id ,
992
962
)
993
- return state_map , partial_state
963
+ return await self ._state_handler .compute_event_context (
964
+ event , state_ids_before_event = state_map , partial_state = partial_state
965
+ )
994
966
995
967
async def _get_state_ids_after_missing_prev_event (
996
968
self ,
@@ -1159,8 +1131,7 @@ async def _process_received_pdu(
1159
1131
self ,
1160
1132
origin : str ,
1161
1133
event : EventBase ,
1162
- state_ids : Optional [StateMap [str ]],
1163
- partial_state : Optional [bool ],
1134
+ context : EventContext ,
1164
1135
backfilled : bool = False ,
1165
1136
) -> None :
1166
1137
"""Called when we have a new non-outlier event.
@@ -1182,32 +1153,18 @@ async def _process_received_pdu(
1182
1153
1183
1154
event: event to be persisted
1184
1155
1185
- state_ids: Normally None, but if we are handling a gap in the graph
1186
- (ie, we are missing one or more prev_events), the resolved state at the
1187
- event
1188
-
1189
- partial_state:
1190
- `True` if `state_ids` is partial and omits non-critical membership
1191
- events.
1192
- `False` if `state_ids` is the full state.
1193
- `None` if `state_ids` is not provided. In this case, the flag will be
1194
- calculated based on `event`'s prev events.
1156
+ context: The `EventContext` to persist the event with.
1195
1157
1196
1158
backfilled: True if this is part of a historical batch of events (inhibits
1197
1159
notification to clients, and validation of device keys.)
1198
1160
1199
1161
PartialStateConflictError: if the room was un-partial stated in between
1200
- computing the state at the event and persisting it. The caller should retry
1201
- exactly once in this case .
1162
+ computing the state at the event and persisting it. The caller should
1163
+ recompute `context` and retry exactly once when this happens .
1202
1164
"""
1203
1165
logger .debug ("Processing event: %s" , event )
1204
1166
assert not event .internal_metadata .outlier
1205
1167
1206
- context = await self ._state_handler .compute_event_context (
1207
- event ,
1208
- state_ids_before_event = state_ids ,
1209
- partial_state = partial_state ,
1210
- )
1211
1168
try :
1212
1169
await self ._check_event_auth (origin , event , context )
1213
1170
except AuthError as e :
@@ -1219,7 +1176,7 @@ async def _process_received_pdu(
1219
1176
# For new (non-backfilled and non-outlier) events we check if the event
1220
1177
# passes auth based on the current state. If it doesn't then we
1221
1178
# "soft-fail" the event.
1222
- await self ._check_for_soft_fail (event , state_ids , origin = origin )
1179
+ await self ._check_for_soft_fail (event , context = context , origin = origin )
1223
1180
1224
1181
await self ._run_push_actions_and_persist_event (event , context , backfilled )
1225
1182
@@ -1782,7 +1739,7 @@ async def _maybe_kick_guest_users(self, event: EventBase) -> None:
1782
1739
async def _check_for_soft_fail (
1783
1740
self ,
1784
1741
event : EventBase ,
1785
- state_ids : Optional [ StateMap [ str ]] ,
1742
+ context : EventContext ,
1786
1743
origin : str ,
1787
1744
) -> None :
1788
1745
"""Checks if we should soft fail the event; if so, marks the event as
@@ -1793,7 +1750,7 @@ async def _check_for_soft_fail(
1793
1750
1794
1751
Args:
1795
1752
event
1796
- state_ids : The state at the event if we don't have all the event's prev events
1753
+ context : The `EventContext` which we are about to persist the event with.
1797
1754
origin: The host the event originates from.
1798
1755
"""
1799
1756
if await self ._store .is_partial_state_room (event .room_id ):
@@ -1819,11 +1776,15 @@ async def _check_for_soft_fail(
1819
1776
auth_types = auth_types_for_event (room_version_obj , event )
1820
1777
1821
1778
# Calculate the "current state".
1822
- if state_ids is not None :
1823
- # If we're explicitly given the state then we won't have all the
1824
- # prev events, and so we have a gap in the graph. In this case
1825
- # we want to be a little careful as we might have been down for
1826
- # a while and have an incorrect view of the current state,
1779
+ seen_event_ids = await self ._store .have_events_in_timeline (prev_event_ids )
1780
+ has_missing_prevs = bool (prev_event_ids - seen_event_ids )
1781
+ if has_missing_prevs :
1782
+ # We don't have all the prev_events of this event, which means we have a
1783
+ # gap in the graph, and the new event is going to become a new backwards
1784
+ # extremity.
1785
+ #
1786
+ # In this case we want to be a little careful as we might have been
1787
+ # down for a while and have an incorrect view of the current state,
1827
1788
# however we still want to do checks as gaps are easy to
1828
1789
# maliciously manufacture.
1829
1790
#
@@ -1836,6 +1797,7 @@ async def _check_for_soft_fail(
1836
1797
event .room_id , extrem_ids
1837
1798
)
1838
1799
state_sets : List [StateMap [str ]] = list (state_sets_d .values ())
1800
+ state_ids = await context .get_prev_state_ids ()
1839
1801
state_sets .append (state_ids )
1840
1802
current_state_ids = (
1841
1803
await self ._state_resolution_handler .resolve_events_with_store (
0 commit comments