59
59
from synapse .events .snapshot import EventContext
60
60
from synapse .federation .federation_client import InvalidResponseError
61
61
from synapse .logging .context import nested_logging_context
62
- from synapse .logging .opentracing import trace
62
+ from synapse .logging .opentracing import (
63
+ SynapseTags ,
64
+ set_tag ,
65
+ start_active_span ,
66
+ tag_args ,
67
+ trace ,
68
+ )
63
69
from synapse .metrics .background_process_metrics import run_as_background_process
64
70
from synapse .replication .http .devices import ReplicationUserDevicesResyncRestServlet
65
71
from synapse .replication .http .federation import (
@@ -410,6 +416,7 @@ async def check_join_restrictions(
410
416
prev_member_event ,
411
417
)
412
418
419
+ @trace
413
420
async def process_remote_join (
414
421
self ,
415
422
origin : str ,
@@ -715,7 +722,7 @@ async def _get_missing_events_for_pdu(
715
722
716
723
@trace
717
724
async def _process_pulled_events (
718
- self , origin : str , events : Iterable [EventBase ], backfilled : bool
725
+ self , origin : str , events : List [EventBase ], backfilled : bool
719
726
) -> None :
720
727
"""Process a batch of events we have pulled from a remote server
721
728
@@ -730,6 +737,11 @@ async def _process_pulled_events(
730
737
backfilled: True if this is part of a historical batch of events (inhibits
731
738
notification to clients, and validation of device keys.)
732
739
"""
740
+ set_tag (
741
+ SynapseTags .FUNC_ARG_PREFIX + f"event_ids ({ len (events )} )" ,
742
+ str ([event .event_id for event in events ]),
743
+ )
744
+ set_tag (SynapseTags .FUNC_ARG_PREFIX + "backfilled" , str (backfilled ))
733
745
logger .debug (
734
746
"processing pulled backfilled=%s events=%s" ,
735
747
backfilled ,
@@ -753,6 +765,7 @@ async def _process_pulled_events(
753
765
await self ._process_pulled_event (origin , ev , backfilled = backfilled )
754
766
755
767
@trace
768
+ @tag_args
756
769
async def _process_pulled_event (
757
770
self , origin : str , event : EventBase , backfilled : bool
758
771
) -> None :
@@ -854,6 +867,7 @@ async def _process_pulled_event(
854
867
else :
855
868
raise
856
869
870
+ @trace
857
871
async def _compute_event_context_with_maybe_missing_prevs (
858
872
self , dest : str , event : EventBase
859
873
) -> EventContext :
@@ -970,6 +984,8 @@ async def _compute_event_context_with_maybe_missing_prevs(
970
984
event , state_ids_before_event = state_map , partial_state = partial_state
971
985
)
972
986
987
+ @trace
988
+ @tag_args
973
989
async def _get_state_ids_after_missing_prev_event (
974
990
self ,
975
991
destination : str ,
@@ -1009,10 +1025,10 @@ async def _get_state_ids_after_missing_prev_event(
1009
1025
logger .debug ("Fetching %i events from cache/store" , len (desired_events ))
1010
1026
have_events = await self ._store .have_seen_events (room_id , desired_events )
1011
1027
1012
- missing_desired_events = desired_events - have_events
1028
+ missing_desired_event_ids = desired_events - have_events
1013
1029
logger .debug (
1014
1030
"We are missing %i events (got %i)" ,
1015
- len (missing_desired_events ),
1031
+ len (missing_desired_event_ids ),
1016
1032
len (have_events ),
1017
1033
)
1018
1034
@@ -1024,13 +1040,24 @@ async def _get_state_ids_after_missing_prev_event(
1024
1040
# already have a bunch of the state events. It would be nice if the
1025
1041
# federation api gave us a way of finding out which we actually need.
1026
1042
1027
- missing_auth_events = set (auth_event_ids ) - have_events
1028
- missing_auth_events .difference_update (
1029
- await self ._store .have_seen_events (room_id , missing_auth_events )
1043
+ missing_auth_event_ids = set (auth_event_ids ) - have_events
1044
+ missing_auth_event_ids .difference_update (
1045
+ await self ._store .have_seen_events (room_id , missing_auth_event_ids )
1030
1046
)
1031
- logger .debug ("We are also missing %i auth events" , len (missing_auth_events ))
1047
+ logger .debug ("We are also missing %i auth events" , len (missing_auth_event_ids ))
1048
+
1049
+ missing_event_ids = missing_desired_event_ids | missing_auth_event_ids
1032
1050
1033
- missing_events = missing_desired_events | missing_auth_events
1051
+ set_tag (
1052
+ SynapseTags .RESULT_PREFIX
1053
+ + f"missing_auth_event_ids ({ len (missing_auth_event_ids )} )" ,
1054
+ str (missing_auth_event_ids ),
1055
+ )
1056
+ set_tag (
1057
+ SynapseTags .RESULT_PREFIX
1058
+ + f"missing_desired_event_ids ({ len (missing_desired_event_ids )} )" ,
1059
+ str (missing_desired_event_ids ),
1060
+ )
1034
1061
1035
1062
# Making an individual request for each of 1000s of events has a lot of
1036
1063
# overhead. On the other hand, we don't really want to fetch all of the events
@@ -1041,13 +1068,13 @@ async def _get_state_ids_after_missing_prev_event(
1041
1068
#
1042
1069
# TODO: might it be better to have an API which lets us do an aggregate event
1043
1070
# request
1044
- if (len (missing_events ) * 10 ) >= len (auth_event_ids ) + len (state_event_ids ):
1071
+ if (len (missing_event_ids ) * 10 ) >= len (auth_event_ids ) + len (state_event_ids ):
1045
1072
logger .debug ("Requesting complete state from remote" )
1046
1073
await self ._get_state_and_persist (destination , room_id , event_id )
1047
1074
else :
1048
- logger .debug ("Fetching %i events from remote" , len (missing_events ))
1075
+ logger .debug ("Fetching %i events from remote" , len (missing_event_ids ))
1049
1076
await self ._get_events_and_persist (
1050
- destination = destination , room_id = room_id , event_ids = missing_events
1077
+ destination = destination , room_id = room_id , event_ids = missing_event_ids
1051
1078
)
1052
1079
1053
1080
# We now need to fill out the state map, which involves fetching the
@@ -1104,6 +1131,10 @@ async def _get_state_ids_after_missing_prev_event(
1104
1131
event_id ,
1105
1132
failed_to_fetch ,
1106
1133
)
1134
+ set_tag (
1135
+ SynapseTags .RESULT_PREFIX + f"failed_to_fetch ({ len (failed_to_fetch )} )" ,
1136
+ str (failed_to_fetch ),
1137
+ )
1107
1138
1108
1139
if remote_event .is_state () and remote_event .rejected_reason is None :
1109
1140
state_map [
@@ -1112,6 +1143,8 @@ async def _get_state_ids_after_missing_prev_event(
1112
1143
1113
1144
return state_map
1114
1145
1146
+ @trace
1147
+ @tag_args
1115
1148
async def _get_state_and_persist (
1116
1149
self , destination : str , room_id : str , event_id : str
1117
1150
) -> None :
@@ -1133,6 +1166,7 @@ async def _get_state_and_persist(
1133
1166
destination = destination , room_id = room_id , event_ids = (event_id ,)
1134
1167
)
1135
1168
1169
+ @trace
1136
1170
async def _process_received_pdu (
1137
1171
self ,
1138
1172
origin : str ,
@@ -1283,6 +1317,7 @@ async def _resync_device(self, sender: str) -> None:
1283
1317
except Exception :
1284
1318
logger .exception ("Failed to resync device for %s" , sender )
1285
1319
1320
+ @trace
1286
1321
async def _handle_marker_event (self , origin : str , marker_event : EventBase ) -> None :
1287
1322
"""Handles backfilling the insertion event when we receive a marker
1288
1323
event that points to one.
@@ -1414,6 +1449,8 @@ async def backfill_event_id(
1414
1449
1415
1450
return event_from_response
1416
1451
1452
+ @trace
1453
+ @tag_args
1417
1454
async def _get_events_and_persist (
1418
1455
self , destination : str , room_id : str , event_ids : Collection [str ]
1419
1456
) -> None :
@@ -1459,6 +1496,7 @@ async def get_event(event_id: str) -> None:
1459
1496
logger .info ("Fetched %i events of %i requested" , len (events ), len (event_ids ))
1460
1497
await self ._auth_and_persist_outliers (room_id , events )
1461
1498
1499
+ @trace
1462
1500
async def _auth_and_persist_outliers (
1463
1501
self , room_id : str , events : Iterable [EventBase ]
1464
1502
) -> None :
@@ -1477,6 +1515,12 @@ async def _auth_and_persist_outliers(
1477
1515
"""
1478
1516
event_map = {event .event_id : event for event in events }
1479
1517
1518
+ event_ids = event_map .keys ()
1519
+ set_tag (
1520
+ SynapseTags .FUNC_ARG_PREFIX + f"event_ids ({ len (event_ids )} )" ,
1521
+ str (event_ids ),
1522
+ )
1523
+
1480
1524
# filter out any events we have already seen. This might happen because
1481
1525
# the events were eagerly pushed to us (eg, during a room join), or because
1482
1526
# another thread has raced against us since we decided to request the event.
@@ -1593,6 +1637,7 @@ async def prep(event: EventBase) -> None:
1593
1637
backfilled = True ,
1594
1638
)
1595
1639
1640
+ @trace
1596
1641
async def _check_event_auth (
1597
1642
self , origin : Optional [str ], event : EventBase , context : EventContext
1598
1643
) -> None :
@@ -1631,6 +1676,11 @@ async def _check_event_auth(
1631
1676
claimed_auth_events = await self ._load_or_fetch_auth_events_for_event (
1632
1677
origin , event
1633
1678
)
1679
+ set_tag (
1680
+ SynapseTags .RESULT_PREFIX
1681
+ + f"claimed_auth_events ({ len (claimed_auth_events )} )" ,
1682
+ str ([ev .event_id for ev in claimed_auth_events ]),
1683
+ )
1634
1684
1635
1685
# ... and check that the event passes auth at those auth events.
1636
1686
# https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
@@ -1728,6 +1778,7 @@ async def _check_event_auth(
1728
1778
)
1729
1779
context .rejected = RejectedReason .AUTH_ERROR
1730
1780
1781
+ @trace
1731
1782
async def _maybe_kick_guest_users (self , event : EventBase ) -> None :
1732
1783
if event .type != EventTypes .GuestAccess :
1733
1784
return
@@ -1935,6 +1986,8 @@ async def _load_or_fetch_auth_events_for_event(
1935
1986
# instead we raise an AuthError, which will make the caller ignore it.
1936
1987
raise AuthError (code = HTTPStatus .FORBIDDEN , msg = "Auth events could not be found" )
1937
1988
1989
+ @trace
1990
+ @tag_args
1938
1991
async def _get_remote_auth_chain_for_event (
1939
1992
self , destination : str , room_id : str , event_id : str
1940
1993
) -> None :
@@ -1963,6 +2016,7 @@ async def _get_remote_auth_chain_for_event(
1963
2016
1964
2017
await self ._auth_and_persist_outliers (room_id , remote_auth_events )
1965
2018
2019
+ @trace
1966
2020
async def _run_push_actions_and_persist_event (
1967
2021
self , event : EventBase , context : EventContext , backfilled : bool = False
1968
2022
) -> None :
@@ -2071,8 +2125,13 @@ async def persist_events_and_notify(
2071
2125
self ._message_handler .maybe_schedule_expiry (event )
2072
2126
2073
2127
if not backfilled : # Never notify for backfilled events
2074
- for event in events :
2075
- await self ._notify_persisted_event (event , max_stream_token )
2128
+ with start_active_span ("notify_persisted_events" ):
2129
+ set_tag (
2130
+ SynapseTags .RESULT_PREFIX + f"event_ids ({ len (events )} )" ,
2131
+ str ([ev .event_id for ev in events ]),
2132
+ )
2133
+ for event in events :
2134
+ await self ._notify_persisted_event (event , max_stream_token )
2076
2135
2077
2136
return max_stream_token .stream
2078
2137
0 commit comments