This repository was archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Refactor MSC3030 /timestamp_to_event
to move away from our snowflake pull from destination
pattern
#14096
Merged
MadLittleMods
merged 13 commits into
develop
from
madlittlemods/13944-fix-msc3030-jump-to-date-complement-backfill-test-flake
Oct 26, 2022
Merged
Refactor MSC3030 /timestamp_to_event
to move away from our snowflake pull from destination
pattern
#14096
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
8867831
Refactor `get_pdu` to return `PulledPduInfo`
MadLittleMods f03a2b6
Use generic `_try_destination_list` on `timestamp_to_event`
MadLittleMods 7c82755
Fix Complement MSC3030 `can_paginate_after_getting_remote_event_from_…
MadLittleMods 3ce3984
Remove scratch changes
MadLittleMods 720788d
Add changelog
MadLittleMods da87def
Consistently handle `NotRetryingDestination` and `FederationDeniedError`
MadLittleMods 7332df1
Update changelog to reflect Synapse bugfix
MadLittleMods 482629b
Merge branch 'develop' into madlittlemods/13944-fix-msc3030-jump-to-d…
MadLittleMods a57fceb
Remove trying to backfill local event since we no longer deal with ou…
MadLittleMods 7f86fef
Update changelog to reflect refactor purpose of this PR now
MadLittleMods 79f2fea
Merge branch 'develop' into madlittlemods/13944-fix-msc3030-jump-to-d…
MadLittleMods 25ce11c
Avoid f-string with logger to prevent unnecessary string interpolatio…
MadLittleMods 64a907a
Use else
MadLittleMods File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Refactor [MSC3030](https://github.com/matrix-org/matrix-spec-proposals/pull/3030) `/timestamp_to_event` endpoint to loop over federation destinations with standard pattern and error handling. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,6 +80,18 @@ | |
T = TypeVar("T") | ||
|
||
|
||
@attr.s(frozen=True, slots=True, auto_attribs=True) | ||
class PulledPduInfo: | ||
""" | ||
A result object that stores the PDU and info about it like which homeserver we | ||
pulled it from (`pull_origin`) | ||
""" | ||
|
||
pdu: EventBase | ||
# Which homeserver we pulled the PDU from | ||
pull_origin: str | ||
|
||
|
||
class InvalidResponseError(RuntimeError): | ||
"""Helper for _try_destination_list: indicates that the server returned a response | ||
we couldn't parse | ||
|
@@ -114,7 +126,9 @@ def __init__(self, hs: "HomeServer"): | |
self.hostname = hs.hostname | ||
self.signing_key = hs.signing_key | ||
|
||
self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache( | ||
# Cache mapping `event_id` to a tuple of the event itself and the `pull_origin` | ||
# (which server we pulled the event from) | ||
self._get_pdu_cache: ExpiringCache[str, Tuple[EventBase, str]] = ExpiringCache( | ||
cache_name="get_pdu_cache", | ||
clock=self._clock, | ||
max_len=1000, | ||
|
@@ -352,11 +366,11 @@ async def _record_failure_callback( | |
@tag_args | ||
async def get_pdu( | ||
self, | ||
destinations: Iterable[str], | ||
destinations: Collection[str], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noticed that we iterate over |
||
event_id: str, | ||
room_version: RoomVersion, | ||
timeout: Optional[int] = None, | ||
) -> Optional[EventBase]: | ||
) -> Optional[PulledPduInfo]: | ||
"""Requests the PDU with given origin and ID from the remote home | ||
servers. | ||
|
||
|
@@ -371,11 +385,11 @@ async def get_pdu( | |
moving to the next destination. None indicates no timeout. | ||
|
||
Returns: | ||
The requested PDU, or None if we were unable to find it. | ||
The requested PDU wrapped in `PulledPduInfo`, or None if we were unable to find it. | ||
""" | ||
|
||
logger.debug( | ||
"get_pdu: event_id=%s from destinations=%s", event_id, destinations | ||
"get_pdu(event_id=%s): from destinations=%s", event_id, destinations | ||
) | ||
|
||
# TODO: Rate limit the number of times we try and get the same event. | ||
|
@@ -384,19 +398,25 @@ async def get_pdu( | |
# it gets persisted to the database), so we cache the results of the lookup. | ||
# Note that this is separate to the regular get_event cache which caches | ||
# events once they have been persisted. | ||
event = self._get_pdu_cache.get(event_id) | ||
get_pdu_cache_entry = self._get_pdu_cache.get(event_id) | ||
|
||
event = None | ||
pull_origin = None | ||
if get_pdu_cache_entry: | ||
event, pull_origin = get_pdu_cache_entry | ||
# If we don't see the event in the cache, go try to fetch it from the | ||
# provided remote federated destinations | ||
if not event: | ||
else: | ||
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) | ||
|
||
# TODO: We can probably refactor this to use `_try_destination_list` | ||
for destination in destinations: | ||
now = self._clock.time_msec() | ||
last_attempt = pdu_attempts.get(destination, 0) | ||
if last_attempt + PDU_RETRY_TIME_MS > now: | ||
logger.debug( | ||
"get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", | ||
"get_pdu(event_id=%s): skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", | ||
event_id, | ||
destination, | ||
last_attempt, | ||
PDU_RETRY_TIME_MS, | ||
|
@@ -411,43 +431,48 @@ async def get_pdu( | |
room_version=room_version, | ||
timeout=timeout, | ||
) | ||
pull_origin = destination | ||
|
||
pdu_attempts[destination] = now | ||
|
||
if event: | ||
# Prime the cache | ||
self._get_pdu_cache[event.event_id] = event | ||
self._get_pdu_cache[event.event_id] = (event, pull_origin) | ||
|
||
# Now that we have an event, we can break out of this | ||
# loop and stop asking other destinations. | ||
break | ||
|
||
except NotRetryingDestination as e: | ||
logger.info("get_pdu(event_id=%s): %s", event_id, e) | ||
continue | ||
except FederationDeniedError: | ||
logger.info( | ||
"get_pdu(event_id=%s): Not attempting to fetch PDU from %s because the homeserver is not on our federation whitelist", | ||
event_id, | ||
destination, | ||
) | ||
continue | ||
except SynapseError as e: | ||
logger.info( | ||
"Failed to get PDU %s from %s because %s", | ||
"get_pdu(event_id=%s): Failed to get PDU from %s because %s", | ||
event_id, | ||
destination, | ||
e, | ||
) | ||
continue | ||
except NotRetryingDestination as e: | ||
logger.info(str(e)) | ||
continue | ||
except FederationDeniedError as e: | ||
logger.info(str(e)) | ||
continue | ||
except Exception as e: | ||
pdu_attempts[destination] = now | ||
|
||
logger.info( | ||
"Failed to get PDU %s from %s because %s", | ||
"get_pdu(event_id=): Failed to get PDU from %s because %s", | ||
event_id, | ||
destination, | ||
e, | ||
) | ||
continue | ||
|
||
if not event: | ||
if not event or not pull_origin: | ||
return None | ||
|
||
# `event` now refers to an object stored in `get_pdu_cache`. Our | ||
|
@@ -459,7 +484,7 @@ async def get_pdu( | |
event.room_version, | ||
) | ||
|
||
return event_copy | ||
return PulledPduInfo(event_copy, pull_origin) | ||
|
||
@trace | ||
@tag_args | ||
|
@@ -699,12 +724,14 @@ async def _check_sigs_and_hash_and_fetch_one( | |
pdu_origin = get_domain_from_id(pdu.sender) | ||
if not res and pdu_origin != origin: | ||
try: | ||
res = await self.get_pdu( | ||
pulled_pdu_info = await self.get_pdu( | ||
destinations=[pdu_origin], | ||
event_id=pdu.event_id, | ||
room_version=room_version, | ||
timeout=10000, | ||
) | ||
if pulled_pdu_info is not None: | ||
res = pulled_pdu_info.pdu | ||
except SynapseError: | ||
pass | ||
|
||
|
@@ -806,6 +833,7 @@ async def _try_destination_list( | |
) | ||
|
||
for destination in destinations: | ||
# We don't want to ask our own server for information we don't have | ||
if destination == self.server_name: | ||
continue | ||
|
||
|
@@ -814,9 +842,21 @@ async def _try_destination_list( | |
except ( | ||
RequestSendFailed, | ||
InvalidResponseError, | ||
NotRetryingDestination, | ||
) as e: | ||
logger.warning("Failed to %s via %s: %s", description, destination, e) | ||
# Skip to the next homeserver in the list to try. | ||
continue | ||
except NotRetryingDestination as e: | ||
logger.info("%s: %s", description, e) | ||
continue | ||
except FederationDeniedError: | ||
logger.info( | ||
"%s: Not attempting to %s from %s because the homeserver is not on our federation whitelist", | ||
description, | ||
description, | ||
destination, | ||
) | ||
continue | ||
except UnsupportedRoomVersionError: | ||
raise | ||
except HttpResponseException as e: | ||
|
@@ -1609,6 +1649,54 @@ async def send_request( | |
return result | ||
|
||
async def timestamp_to_event( | ||
self, *, destinations: List[str], room_id: str, timestamp: int, direction: str | ||
) -> Optional["TimestampToEventResponse"]: | ||
""" | ||
Calls each remote federating server from `destinations` asking for their closest | ||
event to the given timestamp in the given direction until we get a response. | ||
Also validates the response to always return the expected keys or raises an | ||
error. | ||
|
||
Args: | ||
destinations: The domains of homeservers to try fetching from | ||
room_id: Room to fetch the event from | ||
timestamp: The point in time (inclusive) we should navigate from in | ||
the given direction to find the closest event. | ||
direction: ["f"|"b"] to indicate whether we should navigate forward | ||
or backward from the given timestamp to find the closest event. | ||
|
||
Returns: | ||
A parsed TimestampToEventResponse including the closest event_id | ||
and origin_server_ts or None if no destination has a response. | ||
""" | ||
|
||
async def _timestamp_to_event_from_destination( | ||
destination: str, | ||
) -> TimestampToEventResponse: | ||
return await self._timestamp_to_event_from_destination( | ||
destination, room_id, timestamp, direction | ||
) | ||
|
||
try: | ||
# Loop through each homeserver candidate until we get a succesful response | ||
timestamp_to_event_response = await self._try_destination_list( | ||
Comment on lines
+1681
to
+1682
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved this logic from |
||
"timestamp_to_event", | ||
destinations, | ||
# TODO: The requested timestamp may lie in a part of the | ||
# event graph that the remote server *also* didn't have, | ||
# in which case they will have returned another event | ||
# which may be nowhere near the requested timestamp. In | ||
# the future, we may need to reconcile that gap and ask | ||
# other homeservers, and/or extend `/timestamp_to_event` | ||
# to return events on *both* sides of the timestamp to | ||
# help reconcile the gap faster. | ||
_timestamp_to_event_from_destination, | ||
) | ||
return timestamp_to_event_response | ||
except SynapseError: | ||
return None | ||
|
||
async def _timestamp_to_event_from_destination( | ||
self, destination: str, room_id: str, timestamp: int, direction: str | ||
) -> "TimestampToEventResponse": | ||
""" | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.