Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

An idle event persister can hold up outbound federation of PDUs and other processes #15595

Open
@squahtx

Description

@squahtx

During a recent federation outage on matrix.org, where federation senders would get stuck for minutes, @\richvdh noticed that the minimum stream position of RoomStreamTokens was stuck because event_persister-4 wasn't advancing.

https://matrix.to/#/!yHWhpxlXVaLcsgDUKb:matrix.org/$mve7Xf2PoCru5c6wrcWsNUXQEf_4rtqXRv3Gbyi6E8M?via=matrix.org&via=element.io&via=vector.modular.im


When an event persister is idle, the minimum stream position in RoomStreamTokens will be stuck at the last persisted stream position of the idle event persister plus any continuous run of stream positions seen over replication after that. That is, the minimum stream position gets stuck at the first gap. See here for how the minimum stream position is calculated.

For an explanation of the fields in a RoomStreamToken, see

There is also a third mode for live tokens where the token starts with "m",
which is sometimes used when using sharded event persisters. In this case
the events stream is considered to be a set of streams (one for each writer)
and the token encodes the vector clock of positions of each writer in their
respective streams.
The format of the token in such case is an initial integer min position,
followed by the mapping of instance ID to position separated by '.' and '~':
m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}. ...
The `min_pos` corresponds to the minimum position all writers have persisted
up to, and then only writers that are ahead of that position need to be
encoded. An example token is:
m56~2.58~3.59
Which corresponds to a set of three (or more writers) where instances 2 and
3 (these are instance IDs that can be looked up in the DB to fetch the more
commonly used instance names) are at positions 58 and 59 respectively, and
all other instances are at position 56.

The federation senders use this minimum stream position to determine where it is safe to process up to (since new events can't appear with an earlier stream position). Thus when the minimum stream position gets stuck, the federation senders stop making progress even when there are new events from local users needing to be sent.

max_token = self.store.get_room_max_token()
event_pos = PersistedEventPosition(instance_name, token)
event_entry = self.notifier.create_pending_room_event_entry(
event_pos,
extra_users,
row.data.room_id,
row.data.type,
row.data.state_key,
row.data.membership,
)
await self.notifier.notify_new_room_events(
[(event_entry, row.data.event_id)], max_token
)

def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
"""
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
current_id = max_token.stream
self._last_poked_id = max(current_id, self._last_poked_id)

last_token = await self.store.get_federation_out_pos("events")
(
next_token,
event_to_received_ts,
) = await self.store.get_all_new_event_ids_stream(
last_token, self._last_poked_id, limit=100
)


The idle event persister could likely do something to fix the problem, since it can tell when it is behind.

# We move the current min position up if the minimum current positions
# of all instances is higher (since by definition all positions less
# that that have been persisted).
our_current_position = self._current_positions.get(self._instance_name, 0)
min_curr = min(
(
token
for name, token in self._current_positions.items()
if name != self._instance_name
),
default=our_current_position,
)
if our_current_position and (self._unfinished_ids or self._in_flight_fetches):
min_curr = min(min_curr, our_current_position)
self._persisted_upto_position = max(min_curr, self._persisted_upto_position)
# We now iterate through the seen positions, discarding those that are
# less than the current min positions, and incrementing the min position
# if its exactly one greater.
#
# This is also where we discard items from `_known_persisted_positions`
# (to ensure the list doesn't infinitely grow).
while self._known_persisted_positions:
if self._known_persisted_positions[0] <= self._persisted_upto_position:
heapq.heappop(self._known_persisted_positions)
elif (
self._known_persisted_positions[0] == self._persisted_upto_position + 1
):
heapq.heappop(self._known_persisted_positions)
self._persisted_upto_position += 1
else:
# There was a gap in seen positions, so there is nothing more to
# do.
break

Note that _persisted_upto_position can end up ahead of the event persister's own position in _current_positions if it has nothing in flight. However, the event persister's own position doesn't appear to be updated and _persisted_upto_position isn't broadcast over replication.

Even if we did broadcast _persisted_upto_position over replication, this would only work for a single idle event persister. When there are two or more idle event persisters, we would just get stuck.

Note that the second half of the code is responsible for advancing the minimum stream position up to the first gap in stream positions.

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-WorkersProblems related to running Synapse in Worker Mode (or replication)O-UncommonMost users are unlikely to come across this or unexpected workflowS-MinorBlocks non-critical functionality, workarounds exist.T-DefectBugs, crashes, hangs, security vulnerabilities, or other reported issues.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions