@@ -1039,6 +1039,33 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
1039
1039
pdu .room_id , room_version , lock , origin , pdu
1040
1040
)
1041
1041
1042
+ async def _get_next_valid_staged_event_for_room (
1043
+ self , room_id : str , room_version : RoomVersion
1044
+ ) -> Optional [Tuple [str , EventBase ]]:
1045
+ """Return the first non-spam event from staging queue."""
1046
+
1047
+ while True :
1048
+ # We need to do this check outside the lock to avoid a race between
1049
+ # a new event being inserted by another instance and it attempting
1050
+ # to acquire the lock.
1051
+ next = await self .store .get_next_staged_event_for_room (
1052
+ room_id , room_version
1053
+ )
1054
+
1055
+ if next is None :
1056
+ return None
1057
+
1058
+ origin , event = next
1059
+
1060
+ if await self ._spam_checker .should_drop_federated_event (event ):
1061
+ logger .warning (
1062
+ "Staged federated event contains spam, dropping %s" ,
1063
+ event .event_id ,
1064
+ )
1065
+ continue
1066
+
1067
+ return next
1068
+
1042
1069
@wrap_as_background_process ("_process_incoming_pdus_in_room_inner" )
1043
1070
async def _process_incoming_pdus_in_room_inner (
1044
1071
self ,
@@ -1116,31 +1143,15 @@ async def _process_incoming_pdus_in_room_inner(
1116
1143
(self ._clock .time_msec () - received_ts ) / 1000
1117
1144
)
1118
1145
1119
- while True :
1120
- # We need to do this check outside the lock to avoid a race between
1121
- # a new event being inserted by another instance and it attempting
1122
- # to acquire the lock.
1123
- next = await self .store .get_next_staged_event_for_room (
1124
- room_id , room_version
1125
- )
1126
-
1127
- if next is None :
1128
- break
1129
-
1130
- origin , event = next
1131
-
1132
- if await self ._spam_checker .should_drop_federated_event (event ):
1133
- logger .warning (
1134
- "Staged federated event contains spam, dropping %s" ,
1135
- event .event_id ,
1136
- )
1137
- continue
1138
-
1139
- break
1146
+ next = await self ._get_next_valid_staged_event_for_room (
1147
+ room_id , room_version
1148
+ )
1140
1149
1141
1150
if not next :
1142
1151
break
1143
1152
1153
+ origin , event = next
1154
+
1144
1155
# Prune the event queue if it's getting large.
1145
1156
#
1146
1157
# We do this *after* handling the first event as the common case is
0 commit comments