-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Update get_pdu
to return the original, pristine EventBase
#13320
Changes from 18 commits
ee236ca
79a1b72
bfd35fd
e0e20a5
22410f2
09c411b
6029b42
09167b1
eb6a291
1c4e57c
488f5ed
29a5269
2688e44
24913e7
0e6dd5a
5bc75ed
dea7669
72e65a5
86fe0dc
fd879bb
354678f
233077c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix `FederationClient.get_pdu()` returning events from the cache as `outliers` instead of original events we saw over federation. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,7 +53,7 @@ | |
RoomVersion, | ||
RoomVersions, | ||
) | ||
from synapse.events import EventBase, builder | ||
from synapse.events import EventBase, builder, make_event_from_dict | ||
from synapse.federation.federation_base import ( | ||
FederationBase, | ||
InvalidEventSignatureError, | ||
|
@@ -299,7 +299,8 @@ async def get_pdu_from_destination_raw( | |
moving to the next destination. None indicates no timeout. | ||
|
||
Returns: | ||
The requested PDU, or None if we were unable to find it. | ||
A copy of the requested PDU that is safe to modify, or None if we | ||
were unable to find it. | ||
|
||
Raises: | ||
SynapseError, NotRetryingDestination, FederationDeniedError | ||
|
@@ -309,7 +310,7 @@ async def get_pdu_from_destination_raw( | |
) | ||
|
||
logger.debug( | ||
"retrieved event id %s from %s: %r", | ||
"get_pdu_from_destination_raw: retrieved event id %s from %s: %r", | ||
event_id, | ||
destination, | ||
transaction_data, | ||
|
@@ -358,54 +359,97 @@ async def get_pdu( | |
The requested PDU, or None if we were unable to find it. | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I like the idea of having it be immutable 👍
But needing to pass in We can tackle this in another potential PR though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, there's definitely a balance of two evils here. I'm happy to punt this for now at least. |
||
|
||
# TODO: Rate limit the number of times we try and get the same event. | ||
logger.debug( | ||
"get_pdu: event_id=%s from destinations=%s", event_id, destinations | ||
) | ||
|
||
ev = self._get_pdu_cache.get(event_id) | ||
if ev: | ||
return ev | ||
# TODO: Rate limit the number of times we try and get the same event. | ||
|
||
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) | ||
# We might need the same event multiple times in quick succession (before | ||
# it gets persisted to the database), so we cache the results of the lookup. | ||
# Note that this is separate to the regular get_event cache which caches | ||
# events once they have been persisted. | ||
event_from_cache = self._get_pdu_cache.get(event_id) | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# If we don't see the event in the cache, go try to fetch it from the | ||
# provided remote federated destinations | ||
event_from_remote = None | ||
if not event_from_cache: | ||
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) | ||
|
||
for destination in destinations: | ||
now = self._clock.time_msec() | ||
last_attempt = pdu_attempts.get(destination, 0) | ||
if last_attempt + PDU_RETRY_TIME_MS > now: | ||
logger.debug( | ||
"get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", | ||
destination, | ||
last_attempt, | ||
PDU_RETRY_TIME_MS, | ||
now, | ||
) | ||
continue | ||
|
||
try: | ||
event_from_remote = await self.get_pdu_from_destination_raw( | ||
destination=destination, | ||
event_id=event_id, | ||
room_version=room_version, | ||
timeout=timeout, | ||
) | ||
|
||
signed_pdu = None | ||
for destination in destinations: | ||
now = self._clock.time_msec() | ||
last_attempt = pdu_attempts.get(destination, 0) | ||
if last_attempt + PDU_RETRY_TIME_MS > now: | ||
continue | ||
pdu_attempts[destination] = now | ||
|
||
try: | ||
signed_pdu = await self.get_pdu_from_destination_raw( | ||
destination=destination, | ||
event_id=event_id, | ||
room_version=room_version, | ||
timeout=timeout, | ||
) | ||
if event_from_remote: | ||
# Prime the cache | ||
self._get_pdu_cache[ | ||
event_from_remote.event_id | ||
] = event_from_remote | ||
|
||
pdu_attempts[destination] = now | ||
# FIXME: We should add a `break` here to avoid calling every | ||
# destination after we already found a PDU (will follow-up | ||
# in a separate PR) | ||
|
||
except SynapseError as e: | ||
logger.info( | ||
"Failed to get PDU %s from %s because %s", event_id, destination, e | ||
) | ||
continue | ||
except NotRetryingDestination as e: | ||
logger.info(str(e)) | ||
continue | ||
except FederationDeniedError as e: | ||
logger.info(str(e)) | ||
continue | ||
except Exception as e: | ||
pdu_attempts[destination] = now | ||
except SynapseError as e: | ||
logger.info( | ||
"Failed to get PDU %s from %s because %s", | ||
event_id, | ||
destination, | ||
e, | ||
) | ||
continue | ||
except NotRetryingDestination as e: | ||
logger.info(str(e)) | ||
continue | ||
except FederationDeniedError as e: | ||
logger.info(str(e)) | ||
continue | ||
except Exception as e: | ||
pdu_attempts[destination] = now | ||
|
||
logger.info( | ||
"Failed to get PDU %s from %s because %s", | ||
event_id, | ||
destination, | ||
e, | ||
) | ||
continue | ||
|
||
logger.info( | ||
"Failed to get PDU %s from %s because %s", event_id, destination, e | ||
) | ||
continue | ||
event = event_from_cache or event_from_remote | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if not event: | ||
return None | ||
|
||
if signed_pdu: | ||
self._get_pdu_cache[event_id] = signed_pdu | ||
# Make sure to return a copy because downstream callers will use this | ||
# event reference directly and change our original, pristine, untouched | ||
# PDU. For example when people mark the event as an `outlier` | ||
# (`event.internal_metadata.outlier = true`), we don't want that to | ||
# propagate back into the cache. | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
event_copy = make_event_from_dict( | ||
event.get_pdu_json(), | ||
event.room_version, | ||
) | ||
|
||
return signed_pdu | ||
return event_copy | ||
|
||
async def get_room_state_ids( | ||
self, destination: str, room_id: str, event_id: str | ||
|
Uh oh!
There was an error while loading. Please reload this page.