Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Make event persisters periodically announce position over replication. #8499

Merged
merged 10 commits into from
Oct 12, 2020
13 changes: 9 additions & 4 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ example flow would be (where '>' indicates master to worker and

> SERVER example.com
< REPLICATE
> POSITION events master 53
> POSITION events master 53 53
> RDATA events master 54 ["$foo1:bar.com", ...]
> RDATA events master 55 ["$foo4:bar.com", ...]

Expand Down Expand Up @@ -138,9 +138,9 @@ the wire:
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE
> POSITION events master 1
> POSITION backfill master 1
> POSITION caches master 1
> POSITION events master 1 1
> POSITION backfill master 1 1
> POSITION caches master 1 1
> RDATA caches master 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events master 14 ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
Expand Down Expand Up @@ -185,6 +185,11 @@ client (C):
updates via HTTP API, rather than via the DB, then processes should make the
request to the appropriate process.

Two positions are included, the "new" position and the last position sent.
This allows servers to tell instances that the positions have advanced but no
data has been written, without clients needlessly checking to see if they
have missed any updates.

#### ERROR (S, C)

There was an error
Expand Down
36 changes: 26 additions & 10 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,34 +141,50 @@ def get_logcontext_id(self):


class PositionCommand(Command):
"""Sent by the server to tell the client the stream position without
needing to send an RDATA.
"""Sent by an instance to tell others the stream position without needing to
send an RDATA.

Two tokens are sent, the new position and the last position sent by the
instance (in an RDATA or other POSITION). The tokens are chosen so that *no*
rows were written by the instance between the `prev_token` and `new_token`.
(If an instance hasn't sent a position before then the new position can be
used for both.)

Format::

POSITION <stream_name> <instance_name> <token>
POSITION <stream_name> <instance_name> <prev_token> <new_token>

On receipt of a POSITION command clients should check if they have missed
any updates, and if so then fetch them out of band.
On receipt of a POSITION command instances should check if they have missed
any updates, and if so then fetch them out of band. Instance can check this
by comparing their view of the current token for the sending instance with
the included `prev_token`.

The `<instance_name>` is the process that sent the command and is the source
of the stream.
"""

NAME = "POSITION"

def __init__(self, stream_name, instance_name, token):
def __init__(self, stream_name, instance_name, prev_token, new_token):
self.stream_name = stream_name
self.instance_name = instance_name
self.token = token
self.prev_token = prev_token
self.new_token = new_token

@classmethod
def from_line(cls, line):
stream_name, instance_name, token = line.split(" ", 2)
return cls(stream_name, instance_name, int(token))
stream_name, instance_name, prev_token, new_token = line.split(" ", 3)
return cls(stream_name, instance_name, int(prev_token), int(new_token))

def to_line(self):
return " ".join((self.stream_name, self.instance_name, str(self.token)))
return " ".join(
(
self.stream_name,
self.instance_name,
str(self.prev_token),
str(self.new_token),
)
)


class ErrorCommand(_SimpleCommand):
Expand Down
19 changes: 11 additions & 8 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,14 @@ def send_positions_to_connection(self, conn: AbstractConnection):
# We respond with current position of all streams this instance
# replicates.
for stream in self.get_streams_to_replicate():
# Note that we use the current token as the prev token here (rather
# than stream.last_token), as we can't be sure that there have been
# no rows written between last token and the current token (since we
# might be racing with the replication sending bg process).
current_token = stream.current_token(self._instance_name)
self.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
stream.current_token(self._instance_name),
stream.NAME, self._instance_name, current_token, current_token,
)
)

Expand Down Expand Up @@ -511,16 +514,16 @@ async def _process_position(
# If the position token matches our current token then we're up to
# date and there's nothing to do. Otherwise, fetch all updates
# between then and now.
missing_updates = cmd.token != current_token
missing_updates = cmd.prev_token != current_token
while missing_updates:
logger.info(
"Fetching replication rows for '%s' between %i and %i",
stream_name,
current_token,
cmd.token,
cmd.new_token,
)
(updates, current_token, missing_updates) = await stream.get_updates_since(
cmd.instance_name, current_token, cmd.token
cmd.instance_name, current_token, cmd.new_token
)

# TODO: add some tests for this
Expand All @@ -536,11 +539,11 @@ async def _process_position(
[stream.parse_row(row) for row in rows],
)

logger.info("Caught up with stream '%s' to %i", stream_name, cmd.token)
logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)

# We've now caught up to position sent to us, notify handler.
await self._replication_data_handler.on_position(
cmd.stream_name, cmd.instance_name, cmd.token
cmd.stream_name, cmd.instance_name, cmd.new_token
)

self._streams_by_connection.setdefault(conn, set()).add(stream_name)
Expand Down