14
14
15
15
import logging
16
16
from typing import (
17
+ TYPE_CHECKING ,
17
18
Awaitable ,
18
19
Callable ,
19
20
Collection ,
32
33
33
34
from twisted .internet import defer
34
35
35
- import synapse .server
36
36
from synapse .api .constants import EventTypes , HistoryVisibility , Membership
37
37
from synapse .api .errors import AuthError
38
38
from synapse .events import EventBase
53
53
from synapse .util .metrics import Measure
54
54
from synapse .visibility import filter_events_for_client
55
55
56
+ if TYPE_CHECKING :
57
+ from synapse .server import HomeServer
58
+
56
59
logger = logging .getLogger (__name__ )
57
60
58
61
notified_events_counter = Counter ("synapse_notifier_notified_events" , "" )
@@ -82,7 +85,7 @@ class _NotificationListener:
82
85
83
86
__slots__ = ["deferred" ]
84
87
85
- def __init__ (self , deferred ):
88
+ def __init__ (self , deferred : "defer.Deferred" ):
86
89
self .deferred = deferred
87
90
88
91
@@ -124,7 +127,7 @@ def notify(
124
127
stream_key : str ,
125
128
stream_id : Union [int , RoomStreamToken ],
126
129
time_now_ms : int ,
127
- ):
130
+ ) -> None :
128
131
"""Notify any listeners for this user of a new event from an
129
132
event source.
130
133
Args:
@@ -152,7 +155,7 @@ def notify(
152
155
self .notify_deferred = ObservableDeferred (defer .Deferred ())
153
156
noify_deferred .callback (self .current_token )
154
157
155
- def remove (self , notifier : "Notifier" ):
158
+ def remove (self , notifier : "Notifier" ) -> None :
156
159
"""Remove this listener from all the indexes in the Notifier
157
160
it knows about.
158
161
"""
@@ -188,7 +191,7 @@ class EventStreamResult:
188
191
start_token : StreamToken
189
192
end_token : StreamToken
190
193
191
- def __bool__ (self ):
194
+ def __bool__ (self ) -> bool :
192
195
return bool (self .events )
193
196
194
197
@@ -212,7 +215,7 @@ class Notifier:
212
215
213
216
UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
214
217
215
- def __init__ (self , hs : "synapse.server. HomeServer" ):
218
+ def __init__ (self , hs : "HomeServer" ):
216
219
self .user_to_user_stream : Dict [str , _NotifierUserStream ] = {}
217
220
self .room_to_user_streams : Dict [str , Set [_NotifierUserStream ]] = {}
218
221
@@ -248,7 +251,7 @@ def __init__(self, hs: "synapse.server.HomeServer"):
248
251
# This is not a very cheap test to perform, but it's only executed
249
252
# when rendering the metrics page, which is likely once per minute at
250
253
# most when scraping it.
251
- def count_listeners ():
254
+ def count_listeners () -> int :
252
255
all_user_streams : Set [_NotifierUserStream ] = set ()
253
256
254
257
for streams in list (self .room_to_user_streams .values ()):
@@ -270,7 +273,7 @@ def count_listeners():
270
273
"synapse_notifier_users" , "" , [], lambda : len (self .user_to_user_stream )
271
274
)
272
275
273
- def add_replication_callback (self , cb : Callable [[], None ]):
276
+ def add_replication_callback (self , cb : Callable [[], None ]) -> None :
274
277
"""Add a callback that will be called when some new data is available.
275
278
Callback is not given any arguments. It should *not* return a Deferred - if
276
279
it needs to do any asynchronous work, a background thread should be started and
@@ -284,7 +287,7 @@ async def on_new_room_event(
284
287
event_pos : PersistedEventPosition ,
285
288
max_room_stream_token : RoomStreamToken ,
286
289
extra_users : Optional [Collection [UserID ]] = None ,
287
- ):
290
+ ) -> None :
288
291
"""Unwraps event and calls `on_new_room_event_args`."""
289
292
await self .on_new_room_event_args (
290
293
event_pos = event_pos ,
@@ -307,7 +310,7 @@ async def on_new_room_event_args(
307
310
event_pos : PersistedEventPosition ,
308
311
max_room_stream_token : RoomStreamToken ,
309
312
extra_users : Optional [Collection [UserID ]] = None ,
310
- ):
313
+ ) -> None :
311
314
"""Used by handlers to inform the notifier something has happened
312
315
in the room, room event wise.
313
316
@@ -338,7 +341,9 @@ async def on_new_room_event_args(
338
341
339
342
self .notify_replication ()
340
343
341
- def _notify_pending_new_room_events (self , max_room_stream_token : RoomStreamToken ):
344
+ def _notify_pending_new_room_events (
345
+ self , max_room_stream_token : RoomStreamToken
346
+ ) -> None :
342
347
"""Notify for the room events that were queued waiting for a previous
343
348
event to be persisted.
344
349
Args:
@@ -374,7 +379,7 @@ def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken
374
379
)
375
380
self ._on_updated_room_token (max_room_stream_token )
376
381
377
- def _on_updated_room_token (self , max_room_stream_token : RoomStreamToken ):
382
+ def _on_updated_room_token (self , max_room_stream_token : RoomStreamToken ) -> None :
378
383
"""Poke services that might care that the room position has been
379
384
updated.
380
385
"""
@@ -386,13 +391,13 @@ def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
386
391
if self .federation_sender :
387
392
self .federation_sender .notify_new_events (max_room_stream_token )
388
393
389
- def _notify_app_services (self , max_room_stream_token : RoomStreamToken ):
394
+ def _notify_app_services (self , max_room_stream_token : RoomStreamToken ) -> None :
390
395
try :
391
396
self .appservice_handler .notify_interested_services (max_room_stream_token )
392
397
except Exception :
393
398
logger .exception ("Error notifying application services of event" )
394
399
395
- def _notify_pusher_pool (self , max_room_stream_token : RoomStreamToken ):
400
+ def _notify_pusher_pool (self , max_room_stream_token : RoomStreamToken ) -> None :
396
401
try :
397
402
self ._pusher_pool .on_new_notifications (max_room_stream_token )
398
403
except Exception :
@@ -475,8 +480,8 @@ async def wait_for_events(
475
480
user_id : str ,
476
481
timeout : int ,
477
482
callback : Callable [[StreamToken , StreamToken ], Awaitable [T ]],
478
- room_ids = None ,
479
- from_token = StreamToken .START ,
483
+ room_ids : Optional [ Collection [ str ]] = None ,
484
+ from_token : StreamToken = StreamToken .START ,
480
485
) -> T :
481
486
"""Wait until the callback returns a non empty response or the
482
487
timeout fires.
@@ -700,14 +705,14 @@ def remove_expired_streams(self) -> None:
700
705
for expired_stream in expired_streams :
701
706
expired_stream .remove (self )
702
707
703
- def _register_with_keys (self , user_stream : _NotifierUserStream ):
708
+ def _register_with_keys (self , user_stream : _NotifierUserStream ) -> None :
704
709
self .user_to_user_stream [user_stream .user_id ] = user_stream
705
710
706
711
for room in user_stream .rooms :
707
712
s = self .room_to_user_streams .setdefault (room , set ())
708
713
s .add (user_stream )
709
714
710
- def _user_joined_room (self , user_id : str , room_id : str ):
715
+ def _user_joined_room (self , user_id : str , room_id : str ) -> None :
711
716
new_user_stream = self .user_to_user_stream .get (user_id )
712
717
if new_user_stream is not None :
713
718
room_streams = self .room_to_user_streams .setdefault (room_id , set ())
@@ -719,7 +724,7 @@ def notify_replication(self) -> None:
719
724
for cb in self .replication_callbacks :
720
725
cb ()
721
726
722
- def notify_remote_server_up (self , server : str ):
727
+ def notify_remote_server_up (self , server : str ) -> None :
723
728
"""Notify any replication that a remote server has come back up"""
724
729
# We call federation_sender directly rather than registering as a
725
730
# callback as a) we already have a reference to it and b) it introduces
0 commit comments