-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Support MSC3814: Dehydrated Devices #15929
Changes from 12 commits
6baeda9
ace4f49
49f892d
6c183c5
6d0ce6f
b95364e
395e039
b20c4c7
a950d5e
fe9be3c
ccd6c12
664ad97
a52a25a
0f49f81
f7e0933
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
Implement [MSC3814](https://github.com/matrix-org/matrix-spec-proposals/pull/3814), | ||
dehydrated devices v2/shrivelled sessions and move [MSC2697](https://github.com/matrix-org/matrix-spec-proposals/pull/2697) | ||
behind a config flag. Contributed by Nico from Famedly and H-Shay. | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,10 +13,11 @@ | |
# limitations under the License. | ||
|
||
import logging | ||
from typing import TYPE_CHECKING, Any, Dict | ||
from http import HTTPStatus | ||
from typing import TYPE_CHECKING, Any, Dict, Optional | ||
|
||
from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes | ||
from synapse.api.errors import SynapseError | ||
from synapse.api.errors import Codes, SynapseError | ||
from synapse.api.ratelimiting import Ratelimiter | ||
from synapse.logging.context import run_in_background | ||
from synapse.logging.opentracing import ( | ||
|
@@ -48,6 +49,9 @@ def __init__(self, hs: "HomeServer"): | |
self.store = hs.get_datastores().main | ||
self.notifier = hs.get_notifier() | ||
self.is_mine = hs.is_mine | ||
if hs.config.experimental.msc3814_enabled: | ||
self.event_sources = hs.get_event_sources() | ||
self.device_handler = hs.get_device_handler() | ||
|
||
# We only need to poke the federation sender explicitly if its on the | ||
# same instance. Other federation sender instances will get notified by | ||
|
@@ -303,3 +307,93 @@ async def send_device_message( | |
# Enqueue a new federation transaction to send the new | ||
# device messages to each remote destination. | ||
self.federation_sender.send_device_messages(destination) | ||
|
||
async def get_events_for_dehydrated_device( | ||
self, | ||
requester: Requester, | ||
device_id: str, | ||
since_token: Optional[str], | ||
limit: int, | ||
) -> JsonDict: | ||
"""Fetches up to `limit` events sent to `device_id` starting from `since_token` | ||
and returns the new since token. If there are no more messages, returns an empty | ||
array and deletes the dehydrated device associated with the user/device_id. | ||
|
||
Args: | ||
requester: the user requesting the messages | ||
device_id: ID of the dehydrated device | ||
since_token: stream id to start from when fetching messages | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a /sync next_batch stream token or a stream id? (I'd expect stream ids to be integers) |
||
limit: the number of messages to fetch | ||
""" | ||
H-Shay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
user_id = requester.user.to_string() | ||
|
||
# only allow fetching messages for the dehydrated device id currently associated | ||
# with the user | ||
dehydrated_device = await self.device_handler.get_dehydrated_device(user_id) | ||
if dehydrated_device is None or device_id != dehydrated_device[0]: | ||
raise SynapseError( | ||
HTTPStatus.FORBIDDEN, | ||
"You may only fetch messages for your dehydrated device", | ||
Codes.FORBIDDEN, | ||
) | ||
H-Shay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
since_stream_id = 0 | ||
if since_token: | ||
if not since_token.startswith("d"): | ||
raise SynapseError( | ||
HTTPStatus.BAD_REQUEST, | ||
"from parameter %r has an invalid format" % (since_token,), | ||
errcode=Codes.INVALID_PARAM, | ||
) | ||
|
||
try: | ||
since_stream_id = int(since_token[1:]) | ||
except Exception: | ||
raise SynapseError( | ||
HTTPStatus.BAD_REQUEST, | ||
"from parameter %r has an invalid format" % (since_token,), | ||
errcode=Codes.INVALID_PARAM, | ||
) | ||
H-Shay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# if we have a since token, delete any to-device messages before that token | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned already, I think we should not delete any delivered to-device messages for dehydrated devices. We can do this in a later PR, but I think that this will be crucial to ease the resumption of rehydration and ensure that room keys don't get lost because a device aborted the rehydration step. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can open a follow-up PR to stop deleting the delivered to-device messages and address the TODOs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The follow-up PR is here: #16010 |
||
# (since we now know that the device has received them) | ||
deleted = await self.store.delete_messages_for_device( | ||
user_id, device_id, since_stream_id | ||
) | ||
logger.debug( | ||
"Deleted %d to-device messages up to %d", deleted, since_stream_id | ||
) | ||
H-Shay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
to_token = self.event_sources.get_current_token().to_device_key | ||
|
||
messages, stream_id = await self.store.get_messages_for_device( | ||
user_id, device_id, since_stream_id, to_token, limit | ||
) | ||
|
||
for message in messages: | ||
# We pop here as we shouldn't be sending the message ID down | ||
# `/sync` | ||
H-Shay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
message_id = message.pop("message_id", None) | ||
if message_id: | ||
set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id) | ||
|
||
logger.debug( | ||
"Returning %d to-device messages between %d and %d (current token: %d) for dehydrated device %s", | ||
len(messages), | ||
since_stream_id, | ||
stream_id, | ||
to_token, | ||
device_id, | ||
) | ||
H-Shay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if messages == []: | ||
# we've fetched all the messages, delete the dehydrated device | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not align with the MSC, the MSC tells us this:
So, per MSC, we should delete the device the first time the client makes a request to this endpoint, no matter how many messages we may have. That being said, I think that we should leave the deletion of the device up to the client, see my rationale here: matrix-org/matrix-spec-proposals#3814 (comment). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this is one of the things I asked for clarification about on the MSC, see matrix-org/matrix-spec-proposals#3814 (comment) |
||
await self.store.remove_dehydrated_device( | ||
requester.user.to_string(), device_id | ||
) | ||
|
||
return { | ||
"events": messages, | ||
"next_batch": f"d{stream_id}", | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,12 +25,14 @@ | |
from synapse.http.servlet import ( | ||
RestServlet, | ||
parse_and_validate_json_object_from_request, | ||
parse_integer, | ||
) | ||
from synapse.http.site import SynapseRequest | ||
from synapse.rest.client._base import client_patterns, interactive_auth_handler | ||
from synapse.rest.client.models import AuthenticationData | ||
from synapse.rest.models import RequestBodyModel | ||
from synapse.types import JsonDict | ||
from synapse.util.cancellation import cancellable | ||
|
||
if TYPE_CHECKING: | ||
from synapse.server import HomeServer | ||
|
@@ -229,6 +231,8 @@ class Config: | |
class DehydratedDeviceServlet(RestServlet): | ||
"""Retrieve or store a dehydrated device. | ||
|
||
Implements both MSC2697 and MSC3814. | ||
H-Shay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
GET /org.matrix.msc2697.v2/dehydrated_device | ||
|
||
HTTP/1.1 200 OK | ||
|
@@ -261,16 +265,21 @@ class DehydratedDeviceServlet(RestServlet): | |
|
||
""" | ||
|
||
PATTERNS = client_patterns("/org.matrix.msc2697.v2/dehydrated_device$", releases=()) | ||
|
||
def __init__(self, hs: "HomeServer"): | ||
def __init__(self, hs: "HomeServer", msc2697: bool = True): | ||
super().__init__() | ||
self.hs = hs | ||
self.auth = hs.get_auth() | ||
handler = hs.get_device_handler() | ||
assert isinstance(handler, DeviceHandler) | ||
self.device_handler = handler | ||
|
||
self.PATTERNS = client_patterns( | ||
"/org.matrix.msc2697.v2/dehydrated_device$" | ||
if msc2697 | ||
else "/org.matrix.msc3814.v1/dehydrated_device$", | ||
releases=(), | ||
) | ||
|
||
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: | ||
requester = await self.auth.get_user_by_req(request) | ||
dehydrated_device = await self.device_handler.get_dehydrated_device( | ||
|
@@ -347,14 +356,55 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: | |
return 200, result | ||
|
||
|
||
class DehydratedDeviceEventsServlet(RestServlet): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible for multiple clients to be hitting this endpoint at once? What solves that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be something like a scenario where (let's say) two devices have called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah probably a conversation for the MSC, but seems quite likely now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think what will help, and will be necessary for perfect resumption, is that we shouldn't delete the events untill the device is fully deleted. This way any device can attempt to rehydrated the device at any point in time, even if another one is already rehydrating it, until one device succeeds and deletes the dehydrated device. |
||
PATTERNS = client_patterns( | ||
"/org.matrix.msc3814.v1/dehydrated_device/(?P<device_id>[^/]*)/events$", | ||
releases=(), | ||
) | ||
|
||
def __init__(self, hs: "HomeServer"): | ||
super().__init__() | ||
self.message_handler = hs.get_device_message_handler() | ||
self.auth = hs.get_auth() | ||
self.store = hs.get_datastores().main | ||
|
||
class PostBody(RequestBodyModel): | ||
next_batch: Optional[StrictStr] | ||
|
||
@cancellable | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
async def on_POST( | ||
self, request: SynapseRequest, device_id: str | ||
) -> Tuple[int, JsonDict]: | ||
requester = await self.auth.get_user_by_req(request) | ||
|
||
next_batch = parse_and_validate_json_object_from_request( | ||
request, self.PostBody | ||
).next_batch | ||
limit = parse_integer(request, "limit", 100) | ||
|
||
msgs = await self.message_handler.get_events_for_dehydrated_device( | ||
requester=requester, | ||
device_id=device_id, | ||
since_token=next_batch, | ||
limit=limit, | ||
) | ||
|
||
return 200, msgs | ||
|
||
|
||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: | ||
if ( | ||
hs.config.worker.worker_app is None | ||
and not hs.config.experimental.msc3861.enabled | ||
): | ||
DeleteDevicesRestServlet(hs).register(http_server) | ||
DevicesRestServlet(hs).register(http_server) | ||
|
||
if hs.config.worker.worker_app is None: | ||
DeviceRestServlet(hs).register(http_server) | ||
DehydratedDeviceServlet(hs).register(http_server) | ||
ClaimDehydratedDeviceServlet(hs).register(http_server) | ||
if hs.config.experimental.msc2697_enabled: | ||
DehydratedDeviceServlet(hs, msc2697=True).register(http_server) | ||
ClaimDehydratedDeviceServlet(hs).register(http_server) | ||
if hs.config.experimental.msc3814_enabled: | ||
DehydratedDeviceServlet(hs, msc2697=False).register(http_server) | ||
DehydratedDeviceEventsServlet(hs).register(http_server) |
Uh oh!
There was an error while loading. Please reload this page.