This repository was archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Send device list updates to application services (MSC3202) - part 1 #11881
Merged
Merged
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
4b67118
Set min application service stream_id to 1
anoadragon453 51be04b
Guard processing device list updates with experimental option
anoadragon453 b4aad36
Add to_key arg, user_ids optional for get_users_whose_devices_changed
anoadragon453 1671f87
Add migration delta to track device_list stream id per appservice
anoadragon453 a77f351
Move DeviceLists type to synapse.types
anoadragon453 88c4e73
Switch DeviceLists to containing Sets, which allows item deletes
anoadragon453 047db4d
Use get_users_whose_devices_changed to pull device list changes for g…
anoadragon453 55ac419
Add device lists to AS txns, thread thru the AS scheduler methods
anoadragon453 3d2f018
Fix existing tests for device list changes
anoadragon453 4168d2f
Add tests
anoadragon453 9d903aa
changelog
anoadragon453 9b0572d
kwargs does not exist in <Python 3.8
anoadragon453 84ea3e2
Apply suggestions from code review
anoadragon453 55eb056
Restore newlines in get_users_whose_devices_changed docstring
anoadragon453 8ef2df8
Explain why device_list_stream_id column is NULLable
anoadragon453 d08e52c
lint the user_ids=None change
anoadragon453 5f7cd20
Replace somewhat misleading FIXME comment
anoadragon453 7bd8118
Remove broken config option override decorator
anoadragon453 7e4a531
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 fca1add
Update synapse/appservice/scheduler.py
anoadragon453 6d00c2b
Rename DeviceLists -> DeviceListUpdates
anoadragon453 9f5eb99
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 afb9cfc
while -> for when processing queued_device_list_summaries
anoadragon453 9ed4403
Use difference_update instead of -=
anoadragon453 dff0a91
re-lint with updated tool versions
anoadragon453 e4f94ff
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 73817f3
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Send device list changes to application services as specified by [MSC3202](https://github.com/matrix-org/matrix-spec-proposals/pull/3202), using unstable prefixes. The `msc3202_transaction_extensions` experimental homeserver config option must be enabled and `org.matrix.msc3202: true` must be present in the application service registration file for device list changes to be sent. The "left" field is currently always empty. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,7 +72,7 @@ | |
from synapse.logging.context import run_in_background | ||
from synapse.metrics.background_process_metrics import run_as_background_process | ||
from synapse.storage.databases.main import DataStore | ||
from synapse.types import JsonDict | ||
from synapse.types import DeviceLists, JsonDict | ||
from synapse.util import Clock | ||
|
||
if TYPE_CHECKING: | ||
|
@@ -122,6 +122,7 @@ def enqueue_for_appservice( | |
events: Optional[Collection[EventBase]] = None, | ||
ephemeral: Optional[Collection[JsonDict]] = None, | ||
to_device_messages: Optional[Collection[JsonDict]] = None, | ||
device_list_summary: Optional[DeviceLists] = None, | ||
) -> None: | ||
""" | ||
Enqueue some data to be sent off to an application service. | ||
|
@@ -133,10 +134,18 @@ def enqueue_for_appservice( | |
to_device_messages: The to-device messages to send. These differ from normal | ||
to-device messages sent to clients, as they have 'to_device_id' and | ||
'to_user_id' fields. | ||
device_list_summary: A summary of users that the application service either needs | ||
to refresh the device lists of, or those that the application service need no | ||
longer track the device lists of. | ||
""" | ||
# We purposefully allow this method to run with empty events/ephemeral | ||
# collections, so that callers do not need to check iterable size themselves. | ||
if not events and not ephemeral and not to_device_messages: | ||
if ( | ||
not events | ||
and not ephemeral | ||
and not to_device_messages | ||
and not device_list_summary | ||
): | ||
return | ||
|
||
if events: | ||
|
@@ -147,6 +156,10 @@ def enqueue_for_appservice( | |
self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend( | ||
to_device_messages | ||
) | ||
if device_list_summary: | ||
self.queuer.queued_device_list_summaries.setdefault( | ||
appservice.id, [] | ||
).append(device_list_summary) | ||
|
||
# Kick off a new application service transaction | ||
self.queuer.start_background_request(appservice) | ||
|
@@ -169,6 +182,8 @@ def __init__( | |
self.queued_ephemeral: Dict[str, List[JsonDict]] = {} | ||
# dict of {service_id: [to_device_message_json]} | ||
self.queued_to_device_messages: Dict[str, List[JsonDict]] = {} | ||
# dict of {service_id: [device_list_summary]} | ||
self.queued_device_list_summaries: Dict[str, List[DeviceLists]] = {} | ||
|
||
# the appservices which currently have a transaction in flight | ||
self.requests_in_flight: Set[str] = set() | ||
|
@@ -212,7 +227,40 @@ async def _send_request(self, service: ApplicationService) -> None: | |
] | ||
del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION] | ||
|
||
if not events and not ephemeral and not to_device_messages_to_send: | ||
# Consolidate any pending device list summaries into a single, up-to-date | ||
# summary. | ||
# Note: this code assumes that in a single DeviceLists, a user will | ||
# never be in both "changed" and "left" sets. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the code below will actually work fine if the sets are not disjoint, with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The spec doesn't particularly say what a client should do if a user ends up in both lists, though I would assume that ...So maybe it's OK to pass a device list update with a user with both lists anyways? |
||
device_list_summary = DeviceLists() | ||
while self.queued_device_list_summaries.get(service.id, []): | ||
anoadragon453 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Pop a summary off the front of the queue | ||
summary = self.queued_device_list_summaries[service.id].pop(0) | ||
|
||
# For every user in the incoming "changed" set: | ||
# * Remove them from the existing "left" set if necessary | ||
# (as we need to start tracking them again) | ||
# * Add them to the existing "changed" set if necessary. | ||
for user_id in summary.changed: | ||
if user_id in device_list_summary.left: | ||
device_list_summary.left.remove(user_id) | ||
device_list_summary.changed.add(user_id) | ||
anoadragon453 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# For every user in the incoming "left" set: | ||
# * Remove them from the existing "changed" set if necessary | ||
# (we no longer need to track them) | ||
# * Add them to the existing "left" set if necessary. | ||
for user_id in summary.left: | ||
if user_id in device_list_summary.changed: | ||
device_list_summary.changed.remove(user_id) | ||
device_list_summary.left.add(user_id) | ||
anoadragon453 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if ( | ||
not events | ||
and not ephemeral | ||
and not to_device_messages_to_send | ||
# Note that DeviceLists implements __bool__ | ||
anoadragon453 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
and not device_list_summary | ||
): | ||
return | ||
|
||
one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None | ||
|
@@ -240,6 +288,7 @@ async def _send_request(self, service: ApplicationService) -> None: | |
to_device_messages_to_send, | ||
one_time_key_counts, | ||
unused_fallback_keys, | ||
device_list_summary, | ||
) | ||
except Exception: | ||
logger.exception("AS request failed") | ||
|
@@ -322,6 +371,7 @@ async def send( | |
to_device_messages: Optional[List[JsonDict]] = None, | ||
one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None, | ||
unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None, | ||
device_list_summary: Optional[DeviceLists] = None, | ||
) -> None: | ||
""" | ||
Create a transaction with the given data and send to the provided | ||
|
@@ -336,6 +386,7 @@ async def send( | |
appservice devices in the transaction. | ||
unused_fallback_keys: Lists of unused fallback keys for relevant | ||
appservice devices in the transaction. | ||
device_list_summary: The device list summary to include in the transaction. | ||
""" | ||
try: | ||
txn = await self.store.create_appservice_txn( | ||
|
@@ -345,6 +396,7 @@ async def send( | |
to_device_messages=to_device_messages or [], | ||
one_time_key_counts=one_time_key_counts or {}, | ||
unused_fallback_keys=unused_fallback_keys or {}, | ||
device_list_summary=device_list_summary or DeviceLists(), | ||
) | ||
service_is_up = await self._is_service_up(service) | ||
if service_is_up: | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.