|
11 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
12 | 12 | # See the License for the specific language governing permissions and
|
13 | 13 | # limitations under the License.
|
| 14 | +import datetime |
14 | 15 | import itertools
|
15 | 16 | import logging
|
16 | 17 | from queue import Empty, PriorityQueue
|
|
21 | 22 | Iterable,
|
22 | 23 | List,
|
23 | 24 | Optional,
|
| 25 | + Sequence, |
24 | 26 | Set,
|
25 | 27 | Tuple,
|
26 | 28 | cast,
|
|
43 | 45 | )
|
44 | 46 | from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
45 | 47 | from synapse.storage.databases.main.signatures import SignatureWorkerStore
|
46 |
| -from synapse.storage.engines import PostgresEngine |
| 48 | +from synapse.storage.engines import PostgresEngine, Sqlite3Engine |
47 | 49 | from synapse.types import JsonDict
|
48 | 50 | from synapse.util import json_encoder
|
49 | 51 | from synapse.util.caches.descriptors import cached
|
|
72 | 74 |
|
73 | 75 | logger = logging.getLogger(__name__)
|
74 | 76 |
|
| 77 | +PULLED_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int( |
| 78 | + datetime.timedelta(days=7).total_seconds() |
| 79 | +) |
| 80 | +PULLED_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int( |
| 81 | + datetime.timedelta(hours=1).total_seconds() |
| 82 | +) |
75 | 83 |
|
76 | 84 | # All the info we need while iterating the DAG while backfilling
|
77 | 85 | @attr.s(frozen=True, slots=True, auto_attribs=True)
|
@@ -1339,6 +1347,78 @@ def _record_event_failed_pull_attempt_upsert_txn(
|
1339 | 1347 |
|
1340 | 1348 | txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
|
1341 | 1349 |
|
| 1350 | + @trace |
| 1351 | + async def filter_events_with_pull_attempt_backoff( |
| 1352 | + self, |
| 1353 | + room_id: str, |
| 1354 | + event_ids: Sequence[str], |
| 1355 | + ) -> List[str]: |
| 1356 | + """ |
| 1357 | + Filter out events that we've failed to pull before |
| 1358 | + recently. Uses exponential backoff. |
| 1359 | +
|
| 1360 | + Args: |
| 1361 | + room_id: The room that the events belong to |
| 1362 | + event_ids: A list of events to filter down |
| 1363 | +
|
| 1364 | + Returns: |
| 1365 | + List of event_ids that can be attempted to be pulled |
| 1366 | + """ |
| 1367 | + return await self.db_pool.runInteraction( |
| 1368 | + "filter_events_with_pull_attempt_backoff", |
| 1369 | + self._filter_events_with_pull_attempt_backoff_txn, |
| 1370 | + room_id, |
| 1371 | + event_ids, |
| 1372 | + ) |
| 1373 | + |
| 1374 | + def _filter_events_with_pull_attempt_backoff_txn( |
| 1375 | + self, |
| 1376 | + txn: LoggingTransaction, |
| 1377 | + room_id: str, |
| 1378 | + event_ids: Sequence[str], |
| 1379 | + ) -> None: |
| 1380 | + where_event_ids_match_clause, values = make_in_list_sql_clause( |
| 1381 | + txn.database_engine, "event_id", event_ids |
| 1382 | + ) |
| 1383 | + |
| 1384 | + sql = """ |
| 1385 | + SELECT event_id FROM event_failed_pull_attempts |
| 1386 | + WHERE |
| 1387 | + room_id = ? |
| 1388 | + %s /* where_event_ids_match_clause */ |
| 1389 | + /** |
| 1390 | + * Exponential back-off (up to the upper bound) so we don't try to |
| 1391 | + * pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. |
| 1392 | + * |
| 1393 | + * We use `1 << n` as a power of 2 equivalent for compatibility |
| 1394 | + * with older SQLites. The left shift equivalent only works with |
| 1395 | + * powers of 2 because left shift is a binary operation (base-2). |
| 1396 | + * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. |
| 1397 | + */ |
| 1398 | + AND ( |
| 1399 | + event_id IS NULL |
| 1400 | + OR ? /* current_time */ >= last_attempt_ts + /*least*/%s((1 << num_attempts) * ? /* step */, ? /* upper bound */) |
| 1401 | + ) |
| 1402 | + """ |
| 1403 | + |
| 1404 | + if isinstance(self.database_engine, PostgresEngine): |
| 1405 | + least_function = "least" |
| 1406 | + elif isinstance(self.database_engine, Sqlite3Engine): |
| 1407 | + least_function = "min" |
| 1408 | + else: |
| 1409 | + raise RuntimeError("Unknown database engine") |
| 1410 | + |
| 1411 | + txn.execute( |
| 1412 | + sql % (where_event_ids_match_clause, least_function), |
| 1413 | + ( |
| 1414 | + room_id, |
| 1415 | + *values, |
| 1416 | + self._clock.time_msec(), |
| 1417 | + 1000 * PULLED_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS, |
| 1418 | + 1000 * PULLED_EVENT_BACKOFF_UPPER_BOUND_SECONDS, |
| 1419 | + ), |
| 1420 | + ) |
| 1421 | + |
1342 | 1422 | async def get_missing_events(
|
1343 | 1423 | self,
|
1344 | 1424 | room_id: str,
|
|
0 commit comments