Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit b2357a8

Browse files
authored
Fix bug where 5s delays would occasionally happen. (#15150)
This only affects deployments using workers.
1 parent 8219525 commit b2357a8

File tree

3 files changed

+80
-0
lines changed

3 files changed

+80
-0
lines changed

changelog.d/15150.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a bug introduced in Synapse 1.76 where 5s delays would occasionally occur in deployments using workers.

synapse/replication/tcp/resource.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,24 @@ async def _run_notifier_loop(self) -> None:
238238
except Exception:
239239
logger.exception("Failed to replicate")
240240

241+
# The last token we send may not match the current
242+
# token, in which case we want to send out a `POSITION`
243+
# to tell other workers the actual current position.
244+
if updates[-1][0] < current_token:
245+
logger.info(
246+
"Sending position: %s -> %s",
247+
stream.NAME,
248+
current_token,
249+
)
250+
self.command_handler.send_command(
251+
PositionCommand(
252+
stream.NAME,
253+
self._instance_name,
254+
updates[-1][0],
255+
current_token,
256+
)
257+
)
258+
241259
logger.debug("No more pending updates, breaking poke loop")
242260
finally:
243261
self.pending_updates = False

tests/replication/tcp/test_handler.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,64 @@ def test_wait_for_stream_position(self) -> None:
141141
self.get_success(ctx_worker1.__aexit__(None, None, None))
142142

143143
self.assertTrue(d.called)
144+
145+
def test_wait_for_stream_position_rdata(self) -> None:
146+
"""Check that wait for stream position correctly waits for an update
147+
from the correct instance, when RDATA is sent.
148+
"""
149+
store = self.hs.get_datastores().main
150+
cmd_handler = self.hs.get_replication_command_handler()
151+
data_handler = self.hs.get_replication_data_handler()
152+
153+
worker1 = self.make_worker_hs(
154+
"synapse.app.generic_worker",
155+
extra_config={
156+
"worker_name": "worker1",
157+
"run_background_tasks_on": "worker1",
158+
"redis": {"enabled": True},
159+
},
160+
)
161+
162+
cache_id_gen = worker1.get_datastores().main._cache_id_gen
163+
assert cache_id_gen is not None
164+
165+
self.replicate()
166+
167+
# First, make sure the master knows that `worker1` exists.
168+
initial_token = cache_id_gen.get_current_token()
169+
cmd_handler.send_command(
170+
PositionCommand("caches", "worker1", initial_token, initial_token)
171+
)
172+
self.replicate()
173+
174+
# `wait_for_stream_position` should only return once master receives a
175+
# notification that `next_token2` has persisted.
176+
ctx_worker1 = cache_id_gen.get_next_mult(2)
177+
next_token1, next_token2 = self.get_success(ctx_worker1.__aenter__())
178+
179+
d = defer.ensureDeferred(
180+
data_handler.wait_for_stream_position("worker1", "caches", next_token2)
181+
)
182+
self.assertFalse(d.called)
183+
184+
# Insert an entry into the cache stream with token `next_token1`, but
185+
# not `next_token2`.
186+
self.get_success(
187+
store.db_pool.simple_insert(
188+
table="cache_invalidation_stream_by_instance",
189+
values={
190+
"stream_id": next_token1,
191+
"instance_name": "worker1",
192+
"cache_func": "foo",
193+
"keys": [],
194+
"invalidation_ts": 0,
195+
},
196+
)
197+
)
198+
199+
# Finish the context manager, triggering the data to be sent to master.
200+
self.get_success(ctx_worker1.__aexit__(None, None, None))
201+
202+
# Master should get told about `next_token2`, so the deferred should
203+
# resolve.
204+
self.assertTrue(d.called)

0 commit comments

Comments
 (0)