-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Safe async event cache #13308
Safe async event cache #13308
Changes from 9 commits
6beae64
65b47e1
87001f8
c170748
be84644
9ef18e4
e90f402
b8ea213
d39d9bf
a2d2312
bd174cc
e71ac4a
22f114d
771c29d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix async get event cache invalidation logic. Contributed by Nick @ Beeper (@fizzadar). | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
from typing import ( | ||
TYPE_CHECKING, | ||
Any, | ||
Awaitable, | ||
Callable, | ||
Collection, | ||
Dict, | ||
|
@@ -33,6 +34,7 @@ | |
Tuple, | ||
Type, | ||
TypeVar, | ||
Union, | ||
cast, | ||
overload, | ||
) | ||
|
@@ -57,7 +59,7 @@ | |
from synapse.storage.background_updates import BackgroundUpdater | ||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine | ||
from synapse.storage.types import Connection, Cursor | ||
from synapse.util.async_helpers import delay_cancellation, maybe_awaitable | ||
from synapse.util.async_helpers import delay_cancellation | ||
from synapse.util.iterutils import batch_iter | ||
|
||
if TYPE_CHECKING: | ||
|
@@ -208,7 +210,9 @@ def __getattr__(self, name: str) -> Any: | |
|
||
|
||
# The type of entry which goes on our after_callbacks and exception_callbacks lists. | ||
_CallbackListEntry = Tuple[Callable[..., object], Tuple[object, ...], Dict[str, object]] | ||
_CallbackListEntry = Tuple[ | ||
Callable[..., Union[object, Awaitable]], Tuple[object, ...], Dict[str, object] | ||
] | ||
|
||
P = ParamSpec("P") | ||
R = TypeVar("R") | ||
|
@@ -796,6 +800,29 @@ async def runInteraction( | |
The result of func | ||
""" | ||
|
||
async def _run_callbacks(callbacks: List[_CallbackListEntry]): | ||
""" | ||
This function takes a list of mixed sync/async callbacks and executes | ||
the async ones first and then the sync callbacks. | ||
|
||
We do this with the assumption that async functions call out to external | ||
systems (e.g. to invalidate a cache) and the sync functions make these | ||
changes on any local in-memory caches/similar, and thus must be second. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO this second paragraph belongs as a comment rather than a docstring. It's not really the place of a docstring to describe the reasons behind implementation decisions within a method. |
||
""" | ||
|
||
sync_callbacks: List[_CallbackListEntry] = [] | ||
|
||
for cb, args, kwargs in callbacks: | ||
if inspect.iscoroutinefunction(cb): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a good reason to limit this to coroutine functions, rather than other awaitables? I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's what I did first but
Perhaps this should be changed to also check awaitables? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh sorry, brainfart on my part. As you say: coroutine functions aren't themselves awaitable; they return an awaitable the first time you call them. Unfortunately, they aren't the only thing that can return awaitables (eg: old-style functions that just return a Twisted So I think the only correct way to do this is to have separate |
||
awaitable = cb(*args, **kwargs) | ||
assert isinstance(awaitable, Awaitable) | ||
await awaitable | ||
else: | ||
sync_callbacks.append((cb, args, kwargs)) | ||
|
||
for cb, args, kwargs in sync_callbacks: | ||
cb(*args, **kwargs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add some comments here please? What is going on here, and why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a docstring to describe it - should this be hoisted to a class method now perhaps? b8ea213 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
or, better, a top-level function. Though I suspect the changes regarding |
||
|
||
async def _runInteraction() -> R: | ||
after_callbacks: List[_CallbackListEntry] = [] | ||
exception_callbacks: List[_CallbackListEntry] = [] | ||
|
@@ -817,15 +844,10 @@ async def _runInteraction() -> R: | |
**kwargs, | ||
) | ||
|
||
for after_callback, after_args, after_kwargs in after_callbacks: | ||
await maybe_awaitable(after_callback(*after_args, **after_kwargs)) | ||
|
||
await _run_callbacks(after_callbacks) | ||
return cast(R, result) | ||
except Exception: | ||
for exception_callback, after_args, after_kwargs in exception_callbacks: | ||
await maybe_awaitable( | ||
exception_callback(*after_args, **after_kwargs) | ||
) | ||
await _run_callbacks(exception_callbacks) | ||
raise | ||
|
||
# To handle cancellation, we ensure that `after_callback`s and | ||
|
Uh oh!
There was an error while loading. Please reload this page.