-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Keep track when we try and fail to process a pulled event #13589
Changes from 20 commits
e0d7fab
b8d55d3
f63d823
bec26e2
fee37c3
d1290be
f9119d0
f5c6fe7
16ebec0
ce07aa1
5811ba1
cf2b093
cbb4194
621c5d3
75c07bb
783cce5
54ef84b
e4192d7
7a44932
86d98ca
1464101
71c7738
df8c76d
d45b078
33ad64e
63bec99
31d7502
0b5f1db
4ce7709
d3a1f84
1347686
57182dc
e58bc65
70019d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Keep track when we attempt to backfill an event but fail so we can intelligently back-off in the future. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -862,6 +862,10 @@ async def _process_pulled_event( | |
self._sanity_check_event(event) | ||
except SynapseError as err: | ||
logger.warning("Event %s failed sanity check: %s", event_id, err) | ||
if backfilled: | ||
await self._store.record_event_failed_backfill_attempt( | ||
event.room_id, event_id | ||
) | ||
return | ||
|
||
try: | ||
|
@@ -897,6 +901,11 @@ async def _process_pulled_event( | |
backfilled=backfilled, | ||
) | ||
except FederationError as e: | ||
if backfilled: | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
await self._store.record_event_failed_backfill_attempt( | ||
event.room_id, event_id | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if this is the right place to record this? What is the failure mode you're trying to catch here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume this will catch the But perhaps you also mean that we should also generally look for any exceptions and record there as well? That sounds reasonable for recording whatever failure that happens 👍: try:
...
except FederationError as e:
record_event_failed_pull_attempt(...)
...
except Exception as e:
record_event_failed_pull_attempt(...)
raise e There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I was mostly just thinking that its not obvious that this particular spot is the right place in the handling. For example, I think by this point we've already attempted to try and refetch events that have bad signatures returned by I guess this is a good first step, and we can always add it the checks in more places later. |
||
|
||
if e.code == 403: | ||
logger.warning("Pulled event %s failed history check.", event_id) | ||
else: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1292,6 +1292,95 @@ def _get_backfill_events( | |
|
||
return event_id_results | ||
|
||
@trace | ||
async def record_event_failed_backfill_attempt( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The next PR that uses this information is ready to go: #13635 (just waiting for this PR to merge since it's based on it) If this PR looks good, feel free to start reviewing that one as well. Or not and I can do the merge dance and put it in the review queue (no worries). |
||
self, room_id: str, event_id: str | ||
) -> None: | ||
""" | ||
Record when we fail to backfill an event. | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
This information allows us to be more intelligent when we decide to | ||
retry (we don't need to fail over and over) and we can process that | ||
event in the background so we don't block on it each time. | ||
|
||
Args: | ||
room_id: The room where the event failed to backfill from | ||
event_id: The event that failed to be backfilled | ||
""" | ||
if self.database_engine.can_native_upsert: | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
await self.db_pool.runInteraction( | ||
"record_event_failed_backfill_attempt", | ||
self._record_event_failed_backfill_attempt_upsert_native_txn, | ||
room_id, | ||
event_id, | ||
db_autocommit=True, # Safe as its a single upsert | ||
) | ||
else: | ||
await self.db_pool.runInteraction( | ||
"record_event_failed_backfill_attempt", | ||
self._record_event_failed_backfill_attempt_upsert_emulated_txn, | ||
room_id, | ||
event_id, | ||
) | ||
|
||
def _record_event_failed_backfill_attempt_upsert_native_txn( | ||
self, | ||
txn: LoggingTransaction, | ||
room_id: str, | ||
event_id: str, | ||
) -> None: | ||
assert self.database_engine.can_native_upsert | ||
|
||
sql = """ | ||
INSERT INTO event_failed_backfill_attempts ( | ||
room_id, event_id, num_attempts, last_attempt_ts | ||
) | ||
VALUES (?, ?, ?, ?) | ||
ON CONFLICT (room_id, event_id) DO UPDATE SET | ||
num_attempts=event_failed_backfill_attempts.num_attempts + 1, | ||
last_attempt_ts=EXCLUDED.last_attempt_ts; | ||
""" | ||
|
||
txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec())) | ||
|
||
def _record_event_failed_backfill_attempt_upsert_emulated_txn( | ||
self, | ||
txn: LoggingTransaction, | ||
room_id: str, | ||
event_id: str, | ||
) -> None: | ||
self.database_engine.lock_table(txn, "event_failed_backfill_attempts") | ||
|
||
prev_row = self.db_pool.simple_select_one_txn( | ||
txn, | ||
table="event_failed_backfill_attempts", | ||
keyvalues={"room_id": room_id, "event_id": event_id}, | ||
retcols=("num_attempts"), | ||
allow_none=True, | ||
) | ||
|
||
if not prev_row: | ||
self.db_pool.simple_insert_txn( | ||
txn, | ||
table="event_failed_backfill_attempts", | ||
values={ | ||
"room_id": room_id, | ||
"event_id": event_id, | ||
"num_attempts": 1, | ||
"last_attempt_ts": self._clock.time_msec(), | ||
}, | ||
) | ||
else: | ||
self.db_pool.simple_update_one_txn( | ||
txn, | ||
table="event_failed_backfill_attempts", | ||
keyvalues={"room_id": room_id, "event_id": event_id}, | ||
updatevalues={ | ||
"num_attempts": prev_row["num_attempts"] + 1, | ||
"last_attempt_ts": self._clock.time_msec(), | ||
}, | ||
) | ||
|
||
async def get_missing_events( | ||
self, | ||
room_id: str, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* Copyright 2022 The Matrix.org Foundation C.I.C | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
|
||
-- Add a table that keeps track of when we failed to backfill an event. This | ||
-- allows us to be more intelligent when we decide to retry (we don't need to | ||
-- fail over and over) and we can process that event in the background so we | ||
-- don't block on it each time. | ||
CREATE TABLE IF NOT EXISTS event_failed_backfill_attempts( | ||
room_id TEXT NOT NULL, | ||
richvdh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
event_id TEXT NOT NULL, | ||
num_attempts INT NOT NULL, | ||
last_attempt_ts BIGINT NOT NULL, | ||
PRIMARY KEY (room_id, event_id) | ||
); | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
Uh oh!
There was an error while loading. Please reload this page.