Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 921a3f8

Browse files
authored
Fix not sending events over federation when using sharded event persisters (#8536)
* Fix outbound federaion with multiple event persisters. We incorrectly notified federation senders that the minimum persisted stream position had advanced when we got an `RDATA` from an event persister. Notifying of federation senders already correctly happens in the notifier, so we just delete the offending line. * Change some interfaces to use RoomStreamToken. By enforcing use of `RoomStreamTokens` we make it less likely that people pass in random ints that they got from somewhere random.
1 parent 3ee97a2 commit 921a3f8

File tree

10 files changed

+51
-21
lines changed

10 files changed

+51
-21
lines changed

changelog.d/8536.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix not sending events over federation when using sharded event writers.

synapse/app/generic_worker.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -790,10 +790,6 @@ async def process_replication_rows(self, stream_name, token, rows):
790790
send_queue.process_rows_for_federation(self.federation_sender, rows)
791791
await self.update_token(token)
792792

793-
# We also need to poke the federation sender when new events happen
794-
elif stream_name == "events":
795-
self.federation_sender.notify_new_events(token)
796-
797793
# ... and when new receipts happen
798794
elif stream_name == ReceiptsStream.NAME:
799795
await self._on_new_receipts(rows)

synapse/federation/send_queue.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ def _clear_queue_before_pos(self, position_to_delete):
188188
for key in keys[:i]:
189189
del self.edus[key]
190190

191-
def notify_new_events(self, current_id):
191+
def notify_new_events(self, max_token):
192192
"""As per FederationSender"""
193193
# We don't need to replicate this as it gets sent down a different
194194
# stream.

synapse/federation/sender/__init__.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
events_processed_counter,
4141
)
4242
from synapse.metrics.background_process_metrics import run_as_background_process
43-
from synapse.types import ReadReceipt
43+
from synapse.types import ReadReceipt, RoomStreamToken
4444
from synapse.util.metrics import Measure, measure_func
4545

4646
logger = logging.getLogger(__name__)
@@ -154,10 +154,15 @@ def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
154154
self._per_destination_queues[destination] = queue
155155
return queue
156156

157-
def notify_new_events(self, current_id: int) -> None:
157+
def notify_new_events(self, max_token: RoomStreamToken) -> None:
158158
"""This gets called when we have some new events we might want to
159159
send out to other servers.
160160
"""
161+
# We just use the minimum stream ordering and ignore the vector clock
162+
# component. This is safe to do as long as we *always* ignore the vector
163+
# clock components.
164+
current_id = max_token.stream
165+
161166
self._last_poked_id = max(current_id, self._last_poked_id)
162167

163168
if self._is_processing:

synapse/handlers/appservice.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
event_processing_loop_room_count,
2828
)
2929
from synapse.metrics.background_process_metrics import run_as_background_process
30+
from synapse.types import RoomStreamToken
3031
from synapse.util.metrics import Measure
3132

3233
logger = logging.getLogger(__name__)
@@ -47,15 +48,17 @@ def __init__(self, hs):
4748
self.current_max = 0
4849
self.is_processing = False
4950

50-
async def notify_interested_services(self, current_id):
51+
async def notify_interested_services(self, max_token: RoomStreamToken):
5152
"""Notifies (pushes) all application services interested in this event.
5253
5354
Pushing is done asynchronously, so this method won't block for any
5455
prolonged length of time.
55-
56-
Args:
57-
current_id(int): The current maximum ID.
5856
"""
57+
# We just use the minimum stream ordering and ignore the vector clock
58+
# component. This is safe to do as long as we *always* ignore the vector
59+
# clock components.
60+
current_id = max_token.stream
61+
5962
services = self.store.get_app_services()
6063
if not services or not self.notify_appservices:
6164
return

synapse/notifier.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,19 +319,19 @@ def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
319319
)
320320

321321
if self.federation_sender:
322-
self.federation_sender.notify_new_events(max_room_stream_token.stream)
322+
self.federation_sender.notify_new_events(max_room_stream_token)
323323

324324
async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
325325
try:
326326
await self.appservice_handler.notify_interested_services(
327-
max_room_stream_token.stream
327+
max_room_stream_token
328328
)
329329
except Exception:
330330
logger.exception("Error notifying application services of event")
331331

332332
async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
333333
try:
334-
await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
334+
await self._pusher_pool.on_new_notifications(max_room_stream_token)
335335
except Exception:
336336
logger.exception("Error pusher pool of event")
337337

synapse/push/emailpusher.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
1919

2020
from synapse.metrics.background_process_metrics import run_as_background_process
21+
from synapse.types import RoomStreamToken
2122

2223
logger = logging.getLogger(__name__)
2324

@@ -91,7 +92,12 @@ def on_stop(self):
9192
pass
9293
self.timed_call = None
9394

94-
def on_new_notifications(self, max_stream_ordering):
95+
def on_new_notifications(self, max_token: RoomStreamToken):
96+
# We just use the minimum stream ordering and ignore the vector clock
97+
# component. This is safe to do as long as we *always* ignore the vector
98+
# clock components.
99+
max_stream_ordering = max_token.stream
100+
95101
if self.max_stream_ordering:
96102
self.max_stream_ordering = max(
97103
max_stream_ordering, self.max_stream_ordering

synapse/push/httppusher.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from synapse.logging import opentracing
2424
from synapse.metrics.background_process_metrics import run_as_background_process
2525
from synapse.push import PusherConfigException
26+
from synapse.types import RoomStreamToken
2627

2728
from . import push_rule_evaluator, push_tools
2829

@@ -114,7 +115,12 @@ def on_started(self, should_check_for_notifs):
114115
if should_check_for_notifs:
115116
self._start_processing()
116117

117-
def on_new_notifications(self, max_stream_ordering):
118+
def on_new_notifications(self, max_token: RoomStreamToken):
119+
# We just use the minimum stream ordering and ignore the vector clock
120+
# component. This is safe to do as long as we *always* ignore the vector
121+
# clock components.
122+
max_stream_ordering = max_token.stream
123+
118124
self.max_stream_ordering = max(
119125
max_stream_ordering, self.max_stream_ordering or 0
120126
)

synapse/push/pusherpool.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from synapse.push.emailpusher import EmailPusher
2525
from synapse.push.httppusher import HttpPusher
2626
from synapse.push.pusher import PusherFactory
27+
from synapse.types import RoomStreamToken
2728
from synapse.util.async_helpers import concurrently_execute
2829

2930
if TYPE_CHECKING:
@@ -186,11 +187,16 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens):
186187
)
187188
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
188189

189-
async def on_new_notifications(self, max_stream_id: int):
190+
async def on_new_notifications(self, max_token: RoomStreamToken):
190191
if not self.pushers:
191192
# nothing to do here.
192193
return
193194

195+
# We just use the minimum stream ordering and ignore the vector clock
196+
# component. This is safe to do as long as we *always* ignore the vector
197+
# clock components.
198+
max_stream_id = max_token.stream
199+
194200
if max_stream_id < self._last_room_stream_id_seen:
195201
# Nothing to do
196202
return
@@ -214,7 +220,7 @@ async def on_new_notifications(self, max_stream_id: int):
214220

215221
if u in self.pushers:
216222
for p in self.pushers[u].values():
217-
p.on_new_notifications(max_stream_id)
223+
p.on_new_notifications(max_token)
218224

219225
except Exception:
220226
logger.exception("Exception in pusher on_new_notifications")

tests/handlers/test_appservice.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from twisted.internet import defer
1919

2020
from synapse.handlers.appservice import ApplicationServicesHandler
21+
from synapse.types import RoomStreamToken
2122

2223
from tests.test_utils import make_awaitable
2324
from tests.utils import MockClock
@@ -61,7 +62,9 @@ def test_notify_interested_services(self):
6162
defer.succeed((0, [event])),
6263
defer.succeed((0, [])),
6364
]
64-
yield defer.ensureDeferred(self.handler.notify_interested_services(0))
65+
yield defer.ensureDeferred(
66+
self.handler.notify_interested_services(RoomStreamToken(None, 0))
67+
)
6568
self.mock_scheduler.submit_event_for_as.assert_called_once_with(
6669
interested_service, event
6770
)
@@ -80,7 +83,9 @@ def test_query_user_exists_unknown_user(self):
8083
defer.succeed((0, [event])),
8184
defer.succeed((0, [])),
8285
]
83-
yield defer.ensureDeferred(self.handler.notify_interested_services(0))
86+
yield defer.ensureDeferred(
87+
self.handler.notify_interested_services(RoomStreamToken(None, 0))
88+
)
8489
self.mock_as_api.query_user.assert_called_once_with(services[0], user_id)
8590

8691
@defer.inlineCallbacks
@@ -97,7 +102,9 @@ def test_query_user_exists_known_user(self):
97102
defer.succeed((0, [event])),
98103
defer.succeed((0, [])),
99104
]
100-
yield defer.ensureDeferred(self.handler.notify_interested_services(0))
105+
yield defer.ensureDeferred(
106+
self.handler.notify_interested_services(RoomStreamToken(None, 0))
107+
)
101108
self.assertFalse(
102109
self.mock_as_api.query_user.called,
103110
"query_user called when it shouldn't have been.",

0 commit comments

Comments
 (0)