@@ -285,12 +285,12 @@ async def on_receive_pdu(
285
285
# - Fetching any missing prev events to fill in gaps in the graph
286
286
# - Fetching state if we have a hole in the graph
287
287
if not pdu .internal_metadata .is_outlier ():
288
- prevs = set (pdu .prev_event_ids ())
289
- seen = await self .store .have_events_in_timeline (prevs )
290
- missing_prevs = prevs - seen
288
+ if sent_to_us_directly :
289
+ prevs = set (pdu .prev_event_ids ())
290
+ seen = await self .store .have_events_in_timeline (prevs )
291
+ missing_prevs = prevs - seen
291
292
292
- if missing_prevs :
293
- if sent_to_us_directly :
293
+ if missing_prevs :
294
294
# We only backfill backwards to the min depth.
295
295
min_depth = await self .get_min_depth_for_context (pdu .room_id )
296
296
logger .debug ("min_depth: %d" , min_depth )
@@ -351,106 +351,8 @@ async def on_receive_pdu(
351
351
affected = pdu .event_id ,
352
352
)
353
353
354
- else :
355
- # We don't have all of the prev_events for this event.
356
- #
357
- # In this case, we need to fall back to asking another server in the
358
- # federation for the state at this event. That's ok provided we then
359
- # resolve the state against other bits of the DAG before using it (which
360
- # will ensure that you can't just take over a room by sending an event,
361
- # withholding its prev_events, and declaring yourself to be an admin in
362
- # the subsequent state request).
363
- #
364
- # Since we're pulling this event as a missing prev_event, then clearly
365
- # this event is not going to become the only forward-extremity and we are
366
- # guaranteed to resolve its state against our existing forward
367
- # extremities, so that should be fine.
368
- #
369
- # XXX this really feels like it could/should be merged with the above,
370
- # but there is an interaction with min_depth that I'm not really
371
- # following.
372
- logger .info (
373
- "Event %s is missing prev_events %s: calculating state for a "
374
- "backwards extremity" ,
375
- event_id ,
376
- shortstr (missing_prevs ),
377
- )
378
-
379
- # Calculate the state after each of the previous events, and
380
- # resolve them to find the correct state at the current event.
381
- event_map = {event_id : pdu }
382
- try :
383
- # Get the state of the events we know about
384
- ours = await self .state_store .get_state_groups_ids (
385
- room_id , seen
386
- )
387
-
388
- # state_maps is a list of mappings from (type, state_key) to event_id
389
- state_maps : List [StateMap [str ]] = list (ours .values ())
390
-
391
- # we don't need this any more, let's delete it.
392
- del ours
393
-
394
- # Ask the remote server for the states we don't
395
- # know about
396
- for p in missing_prevs :
397
- logger .info (
398
- "Requesting state after missing prev_event %s" , p
399
- )
400
-
401
- with nested_logging_context (p ):
402
- # note that if any of the missing prevs share missing state or
403
- # auth events, the requests to fetch those events are deduped
404
- # by the get_pdu_cache in federation_client.
405
- remote_state = (
406
- await self ._get_state_after_missing_prev_event (
407
- origin , room_id , p
408
- )
409
- )
410
-
411
- remote_state_map = {
412
- (x .type , x .state_key ): x .event_id
413
- for x in remote_state
414
- }
415
- state_maps .append (remote_state_map )
416
-
417
- for x in remote_state :
418
- event_map [x .event_id ] = x
419
-
420
- room_version = await self .store .get_room_version_id (room_id )
421
- state_map = await self ._state_resolution_handler .resolve_events_with_store (
422
- room_id ,
423
- room_version ,
424
- state_maps ,
425
- event_map ,
426
- state_res_store = StateResolutionStore (self .store ),
427
- )
428
-
429
- # We need to give _process_received_pdu the actual state events
430
- # rather than event ids, so generate that now.
431
-
432
- # First though we need to fetch all the events that are in
433
- # state_map, so we can build up the state below.
434
- evs = await self .store .get_events (
435
- list (state_map .values ()),
436
- get_prev_content = False ,
437
- redact_behaviour = EventRedactBehaviour .AS_IS ,
438
- )
439
- event_map .update (evs )
440
-
441
- state = [event_map [e ] for e in state_map .values ()]
442
- except Exception :
443
- logger .warning (
444
- "Error attempting to resolve state at missing "
445
- "prev_events" ,
446
- exc_info = True ,
447
- )
448
- raise FederationError (
449
- "ERROR" ,
450
- 403 ,
451
- "We can't get valid state history." ,
452
- affected = event_id ,
453
- )
354
+ else :
355
+ state = await self ._resolve_state_at_missing_prevs (origin , pdu )
454
356
455
357
# A second round of checks for all events. Check that the event passes auth
456
358
# based on `auth_events`, this allows us to assert that the event would
@@ -1493,6 +1395,123 @@ async def get_event(event_id: str):
1493
1395
event_infos ,
1494
1396
)
1495
1397
1398
+ async def _resolve_state_at_missing_prevs (
1399
+ self , dest : str , event : EventBase
1400
+ ) -> Optional [Iterable [EventBase ]]:
1401
+ """Calculate the state at an event with missing prev_events.
1402
+
1403
+ This is used when we have pulled a batch of events from a remote server, and
1404
+ still don't have all the prev_events.
1405
+
1406
+ If we already have all the prev_events for `event`, this method does nothing.
1407
+
1408
+ Otherwise, the missing prevs become new backwards extremities, and we fall back
1409
+ to asking the remote server for the state after each missing `prev_event`,
1410
+ and resolving across them.
1411
+
1412
+ That's ok provided we then resolve the state against other bits of the DAG
1413
+ before using it - in other words, that the received event `event` is not going
1414
+ to become the only forwards_extremity in the room (which will ensure that you
1415
+ can't just take over a room by sending an event, withholding its prev_events,
1416
+ and declaring yourself to be an admin in the subsequent state request).
1417
+
1418
+ In other words: we should only call this method if `event` has been *pulled*
1419
+ as part of a batch of missing prev events, or similar.
1420
+
1421
+ Params:
1422
+ dest: the remote server to ask for state at the missing prevs. Typically,
1423
+ this will be the server we got `event` from.
1424
+ event: an event to check for missing prevs.
1425
+
1426
+ Returns:
1427
+ if we already had all the prev events, `None`. Otherwise, returns a list of
1428
+ the events in the state at `event`.
1429
+ """
1430
+ room_id = event .room_id
1431
+ event_id = event .event_id
1432
+
1433
+ prevs = set (event .prev_event_ids ())
1434
+ seen = await self .store .have_events_in_timeline (prevs )
1435
+ missing_prevs = prevs - seen
1436
+
1437
+ if not missing_prevs :
1438
+ return None
1439
+
1440
+ logger .info (
1441
+ "Event %s is missing prev_events %s: calculating state for a "
1442
+ "backwards extremity" ,
1443
+ event_id ,
1444
+ shortstr (missing_prevs ),
1445
+ )
1446
+ # Calculate the state after each of the previous events, and
1447
+ # resolve them to find the correct state at the current event.
1448
+ event_map = {event_id : event }
1449
+ try :
1450
+ # Get the state of the events we know about
1451
+ ours = await self .state_store .get_state_groups_ids (room_id , seen )
1452
+
1453
+ # state_maps is a list of mappings from (type, state_key) to event_id
1454
+ state_maps : List [StateMap [str ]] = list (ours .values ())
1455
+
1456
+ # we don't need this any more, let's delete it.
1457
+ del ours
1458
+
1459
+ # Ask the remote server for the states we don't
1460
+ # know about
1461
+ for p in missing_prevs :
1462
+ logger .info ("Requesting state after missing prev_event %s" , p )
1463
+
1464
+ with nested_logging_context (p ):
1465
+ # note that if any of the missing prevs share missing state or
1466
+ # auth events, the requests to fetch those events are deduped
1467
+ # by the get_pdu_cache in federation_client.
1468
+ remote_state = await self ._get_state_after_missing_prev_event (
1469
+ dest , room_id , p
1470
+ )
1471
+
1472
+ remote_state_map = {
1473
+ (x .type , x .state_key ): x .event_id for x in remote_state
1474
+ }
1475
+ state_maps .append (remote_state_map )
1476
+
1477
+ for x in remote_state :
1478
+ event_map [x .event_id ] = x
1479
+
1480
+ room_version = await self .store .get_room_version_id (room_id )
1481
+ state_map = await self ._state_resolution_handler .resolve_events_with_store (
1482
+ room_id ,
1483
+ room_version ,
1484
+ state_maps ,
1485
+ event_map ,
1486
+ state_res_store = StateResolutionStore (self .store ),
1487
+ )
1488
+
1489
+ # We need to give _process_received_pdu the actual state events
1490
+ # rather than event ids, so generate that now.
1491
+
1492
+ # First though we need to fetch all the events that are in
1493
+ # state_map, so we can build up the state below.
1494
+ evs = await self .store .get_events (
1495
+ list (state_map .values ()),
1496
+ get_prev_content = False ,
1497
+ redact_behaviour = EventRedactBehaviour .AS_IS ,
1498
+ )
1499
+ event_map .update (evs )
1500
+
1501
+ state = [event_map [e ] for e in state_map .values ()]
1502
+ except Exception :
1503
+ logger .warning (
1504
+ "Error attempting to resolve state at missing prev_events" ,
1505
+ exc_info = True ,
1506
+ )
1507
+ raise FederationError (
1508
+ "ERROR" ,
1509
+ 403 ,
1510
+ "We can't get valid state history." ,
1511
+ affected = event_id ,
1512
+ )
1513
+ return state
1514
+
1496
1515
def _sanity_check_event (self , ev : EventBase ) -> None :
1497
1516
"""
1498
1517
Do some early sanity checks of a received event
0 commit comments