Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Make event persisters periodically announce position over replication. #8499

Merged
merged 10 commits into from
Oct 12, 2020
31 changes: 31 additions & 0 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
from twisted.internet.protocol import Factory

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import PositionCommand
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
from synapse.replication.tcp.streams import EventsStream
from synapse.util.metrics import Measure

stream_updates_counter = Counter(
Expand Down Expand Up @@ -84,6 +86,16 @@ def __init__(self, hs):
# Set of streams to replicate.
self.streams = self.command_handler.get_streams_to_replicate()

# If we are replicating an event stream we want to periodically check if
# we should send updated POSITIONs. We do this as a looping call rather
# explicitly poking when the position advances (without new data to
# replicate) to reduce replication traffic (otherwise each writer would
# likely send a POSITION for each new event received over replication).
#
# Note that if the position hasn't advanced then we won't send anything.
if any(EventsStream.NAME == s.NAME for s in self.streams):
self.clock.looping_call(self.on_notifier_poke, 1000)

def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the
connections if there are.
Expand Down Expand Up @@ -136,6 +148,8 @@ async def _run_notifier_loop(self):
self._replication_torture_level / 1000.0
)

last_token = stream.last_token

logger.debug(
"Getting stream: %s: %s -> %s",
stream.NAME,
Expand All @@ -159,6 +173,23 @@ async def _run_notifier_loop(self):
)
stream_updates_counter.labels(stream.NAME).inc(len(updates))

else:
# The token has advanced but there is no data to
# send, so we send a `POSITION` to inform other
# workers of the updated position.
logger.info(
"Sending position: %s -> %s", stream.NAME, current_token
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not something for this PR, but I wonder whether these at INFO are a bit noisy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, probably, its just really useful to figure out when things go wrong.

self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
last_token,
current_token,
)
)
continue

# Some streams return multiple rows with the same stream IDs,
# we need to make sure they get sent out in batches. We do
# this by setting the current token to all but the last of
Expand Down