|
16 | 16 |
|
17 | 17 | import logging
|
18 | 18 | from collections import defaultdict
|
| 19 | +from threading import Lock |
19 | 20 | from typing import Dict, Tuple, Union
|
20 | 21 |
|
21 | 22 | from twisted.internet import defer
|
@@ -56,12 +57,17 @@ def __init__(self, _hs):
|
56 | 57 | # map from user id to app_id:pushkey to pusher
|
57 | 58 | self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
|
58 | 59 |
|
| 60 | + # a lock for the pushers dict, since `count_pushers` is called from an different |
| 61 | + # and we otherwise get concurrent modification errors |
| 62 | + self._pushers_lock = Lock() |
| 63 | + |
59 | 64 | def count_pushers():
|
60 | 65 | results = defaultdict(int) # type: Dict[Tuple[str, str], int]
|
61 |
| - for pushers in self.pushers.values(): |
62 |
| - for pusher in pushers.values(): |
63 |
| - k = (type(pusher).__name__, pusher.app_id) |
64 |
| - results[k] += 1 |
| 66 | + with self._pushers_lock: |
| 67 | + for pushers in self.pushers.values(): |
| 68 | + for pusher in pushers.values(): |
| 69 | + k = (type(pusher).__name__, pusher.app_id) |
| 70 | + results[k] += 1 |
65 | 71 | return results
|
66 | 72 |
|
67 | 73 | LaterGauge(
|
@@ -293,11 +299,12 @@ def _start_pusher(self, pusherdict):
|
293 | 299 | return
|
294 | 300 |
|
295 | 301 | appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
|
296 |
| - byuser = self.pushers.setdefault(pusherdict["user_name"], {}) |
297 | 302 |
|
298 |
| - if appid_pushkey in byuser: |
299 |
| - byuser[appid_pushkey].on_stop() |
300 |
| - byuser[appid_pushkey] = p |
| 303 | + with self._pushers_lock: |
| 304 | + byuser = self.pushers.setdefault(pusherdict["user_name"], {}) |
| 305 | + if appid_pushkey in byuser: |
| 306 | + byuser[appid_pushkey].on_stop() |
| 307 | + byuser[appid_pushkey] = p |
301 | 308 |
|
302 | 309 | # Check if there *may* be push to process. We do this as this check is a
|
303 | 310 | # lot cheaper to do than actually fetching the exact rows we need to
|
@@ -326,7 +333,9 @@ def remove_pusher(self, app_id, pushkey, user_id):
|
326 | 333 | if appid_pushkey in byuser:
|
327 | 334 | logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
|
328 | 335 | byuser[appid_pushkey].on_stop()
|
329 |
| - del byuser[appid_pushkey] |
| 336 | + with self._pushers_lock: |
| 337 | + del byuser[appid_pushkey] |
| 338 | + |
330 | 339 | yield self.store.delete_pusher_by_app_id_pushkey_user_id(
|
331 | 340 | app_id, pushkey, user_id
|
332 | 341 | )
|
0 commit comments