@@ -185,19 +185,26 @@ def notify_interested_services_ephemeral(
185
185
new_token : Optional [int ],
186
186
users : Optional [Collection [Union [str , UserID ]]] = None ,
187
187
) -> None :
188
- """This is called by the notifier in the background
189
- when a ephemeral event handled by the homeserver.
190
-
191
- This will determine which appservices
192
- are interested in the event, and submit them.
188
+ """
189
+ This is called by the notifier in the background when an ephemeral event is handled
190
+ by the homeserver.
193
191
194
- Events will only be pushed to appservices
195
- that have opted into ephemeral events
192
+ This will determine which appservices are interested in the event, and submit them.
196
193
197
194
Args:
198
195
stream_key: The stream the event came from.
199
- new_token: The latest stream token
200
- users: The user(s) involved with the event.
196
+
197
+ `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
198
+ value for `stream_key` will cause this function to return early.
199
+
200
+ Ephemeral events will only be pushed to appservices that have opted into
201
+ them.
202
+
203
+ Appservices will only receive ephemeral events that fall within their
204
+ registered user and room namespaces.
205
+
206
+ new_token: The latest stream token.
207
+ users: The users that should be informed of the new event, if any.
201
208
"""
202
209
if not self .notify_appservices :
203
210
return
@@ -232,40 +239,87 @@ async def _notify_interested_services_ephemeral(
232
239
for service in services :
233
240
# Only handle typing if we have the latest token
234
241
if stream_key == "typing_key" and new_token is not None :
242
+ # Note that we don't persist the token (via set_type_stream_id_for_appservice)
243
+ # for typing_key due to performance reasons and due to their highly
244
+ # ephemeral nature.
245
+ #
246
+ # Instead we simply grab the latest typing updates in _handle_typing
247
+ # and, if they apply to this application service, send it off.
235
248
events = await self ._handle_typing (service , new_token )
236
249
if events :
237
250
self .scheduler .submit_ephemeral_events_for_as (service , events )
238
- # We don't persist the token for typing_key for performance reasons
251
+
239
252
elif stream_key == "receipt_key" :
240
253
events = await self ._handle_receipts (service )
241
254
if events :
242
255
self .scheduler .submit_ephemeral_events_for_as (service , events )
256
+
257
+ # Persist the latest handled stream token for this appservice
243
258
await self .store .set_type_stream_id_for_appservice (
244
259
service , "read_receipt" , new_token
245
260
)
261
+
246
262
elif stream_key == "presence_key" :
247
263
events = await self ._handle_presence (service , users )
248
264
if events :
249
265
self .scheduler .submit_ephemeral_events_for_as (service , events )
266
+
267
+ # Persist the latest handled stream token for this appservice
250
268
await self .store .set_type_stream_id_for_appservice (
251
269
service , "presence" , new_token
252
270
)
253
271
254
272
async def _handle_typing (
255
273
self , service : ApplicationService , new_token : int
256
274
) -> List [JsonDict ]:
275
+ """
276
+ Return the typing events since the given stream token that the given application
277
+ service should receive.
278
+
279
+ First fetch all typing events between the given typing stream token (non-inclusive)
280
+ and the latest typing event stream token (inclusive). Then return only those typing
281
+ events that the given application service may be interested in.
282
+
283
+ Args:
284
+ service: The application service to check for which events it should receive.
285
+ new_token: A typing event stream token.
286
+
287
+ Returns:
288
+ A list of JSON dictionaries containing data derived from the typing events that
289
+ should be sent to the given application service.
290
+ """
257
291
typing_source = self .event_sources .sources .typing
258
292
# Get the typing events from just before current
259
293
typing , _ = await typing_source .get_new_events_as (
260
294
service = service ,
261
295
# For performance reasons, we don't persist the previous
262
- # token in the DB and instead fetch the latest typing information
296
+ # token in the DB and instead fetch the latest typing event
263
297
# for appservices.
298
+ # TODO: It'd likely be more efficient to simply fetch the
299
+ # typing event with the given 'new_token' stream token and
300
+ # check if the given service was interested, rather than
301
+ # iterating over all typing events and only grabbing the
302
+ # latest few.
264
303
from_key = new_token - 1 ,
265
304
)
266
305
return typing
267
306
268
307
async def _handle_receipts (self , service : ApplicationService ) -> List [JsonDict ]:
308
+ """
309
+ Return the latest read receipts that the given application service should receive.
310
+
311
+ First fetch all read receipts between the last receipt stream token that this
312
+ application service should have previously received (non-inclusive) and the
313
+ latest read receipt stream token (inclusive). Then from that set, return only
314
+ those read receipts that the given application service may be interested in.
315
+
316
+ Args:
317
+ service: The application service to check for which events it should receive.
318
+
319
+ Returns:
320
+ A list of JSON dictionaries containing data derived from the read receipts that
321
+ should be sent to the given application service.
322
+ """
269
323
from_key = await self .store .get_type_stream_id_for_appservice (
270
324
service , "read_receipt"
271
325
)
@@ -278,6 +332,22 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
278
332
async def _handle_presence (
279
333
self , service : ApplicationService , users : Collection [Union [str , UserID ]]
280
334
) -> List [JsonDict ]:
335
+ """
336
+ Return the latest presence updates that the given application service should receive.
337
+
338
+ First, filter the given users list to those that the application service is
339
+ interested in. Then retrieve the latest presence updates since the
340
+ the last-known previously received presence stream token for the given
341
+ application service. Return those presence updates.
342
+
343
+ Args:
344
+ service: The application service that ephemeral events are being sent to.
345
+ users: The users that should receive the presence update.
346
+
347
+ Returns:
348
+ A list of json dictionaries containing data derived from the presence events
349
+ that should be sent to the given application service.
350
+ """
281
351
events : List [JsonDict ] = []
282
352
presence_source = self .event_sources .sources .presence
283
353
from_key = await self .store .get_type_stream_id_for_appservice (
@@ -290,9 +360,9 @@ async def _handle_presence(
290
360
interested = await service .is_interested_in_presence (user , self .store )
291
361
if not interested :
292
362
continue
363
+
293
364
presence_events , _ = await presence_source .get_new_events (
294
365
user = user ,
295
- service = service ,
296
366
from_key = from_key ,
297
367
)
298
368
time_now = self .clock .time_msec ()
0 commit comments