Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Pass the requester during event serialization #15174

Merged
merged 9 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15174.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add the `transaction_id` in the events included in many endpoints responses.
30 changes: 22 additions & 8 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
)
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersion
from synapse.types import JsonDict
from synapse.types import JsonDict, Requester

from . import EventBase

Expand Down Expand Up @@ -316,8 +316,9 @@ class SerializeEventConfig:
as_client_event: bool = True
# Function to convert from federation format to client format
event_format: Callable[[JsonDict], JsonDict] = format_event_for_client_v1
# ID of the user's auth token - used for namespacing of transaction IDs
token_id: Optional[int] = None
# The entity that requested the event. This is used to determine whether to include
# the transaction_id in the unsigned section of the event.
requester: Optional[Requester] = None
# List of event fields to include. If empty, all fields will be returned.
only_event_fields: Optional[List[str]] = None
# Some events can have stripped room state stored in the `unsigned` field.
Expand Down Expand Up @@ -367,11 +368,24 @@ def serialize_event(
e.unsigned["redacted_because"], time_now_ms, config=config
)

if config.token_id is not None:
if config.token_id == getattr(e.internal_metadata, "token_id", None):
txn_id = getattr(e.internal_metadata, "txn_id", None)
if txn_id is not None:
d["unsigned"]["transaction_id"] = txn_id
# If we have a txn_id saved in the internal_metadata, we should include it in the
# unsigned section of the event if it was sent by the same session as the one
# requesting the event.
# There is a special case for guests, because they only have one access token
# without associated access_token_id, so we always include the txn_id for events
# they sent.
txn_id = getattr(e.internal_metadata, "txn_id", None)
if txn_id is not None and config.requester is not None:
event_token_id = getattr(e.internal_metadata, "token_id", None)
if config.requester.user.to_string() == e.sender and (
(
event_token_id is not None
and config.requester.access_token_id is not None
and event_token_id == config.requester.access_token_id
)
or config.requester.is_guest
):
d["unsigned"]["transaction_id"] = txn_id

# invite_room_state and knock_room_state are a list of stripped room state events
# that are meant to provide metadata about a room to an invitee/knocker. They are
Expand Down
20 changes: 10 additions & 10 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from synapse.handlers.presence import format_user_presence_state
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, UserID
from synapse.types import JsonDict, Requester, UserID
from synapse.visibility import filter_events_for_client

if TYPE_CHECKING:
Expand All @@ -46,13 +46,12 @@ def __init__(self, hs: "HomeServer"):

async def get_stream(
self,
auth_user_id: str,
requester: Requester,
pagin_config: PaginationConfig,
timeout: int = 0,
as_client_event: bool = True,
affect_presence: bool = True,
room_id: Optional[str] = None,
is_guest: bool = False,
) -> JsonDict:
"""Fetches the events stream for a given user."""

Expand All @@ -62,13 +61,12 @@ async def get_stream(
raise SynapseError(403, "This room has been blocked on this server")

# send any outstanding server notices to the user.
await self._server_notices_sender.on_user_syncing(auth_user_id)
await self._server_notices_sender.on_user_syncing(requester.user.to_string())

auth_user = UserID.from_string(auth_user_id)
presence_handler = self.hs.get_presence_handler()

context = await presence_handler.user_syncing(
auth_user_id,
requester.user.to_string(),
affect_presence=affect_presence,
presence_state=PresenceState.ONLINE,
)
Expand All @@ -82,10 +80,10 @@ async def get_stream(
timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))

stream_result = await self.notifier.get_events_for(
auth_user,
requester.user,
pagin_config,
timeout,
is_guest=is_guest,
is_guest=requester.is_guest,
explicit_room_id=room_id,
)
events = stream_result.events
Expand All @@ -102,7 +100,7 @@ async def get_stream(
if event.membership != Membership.JOIN:
continue
# Send down presence.
if event.state_key == auth_user_id:
if event.state_key == requester.user.to_string():
# Send down presence for everyone in the room.
users: Iterable[str] = await self.store.get_users_in_room(
event.room_id
Expand All @@ -124,7 +122,9 @@ async def get_stream(
chunks = self._event_serializer.serialize_events(
events,
time_now,
config=SerializeEventConfig(as_client_event=as_client_event),
config=SerializeEventConfig(
as_client_event=as_client_event, requester=requester
),
)

chunk = {
Expand Down
51 changes: 36 additions & 15 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,22 +318,26 @@ async def room_initial_sync(
)
is_peeking = member_event_id is None

user_id = requester.user.to_string()

if membership == Membership.JOIN:
result = await self._room_initial_sync_joined(
user_id, room_id, pagin_config, membership, is_peeking
requester, room_id, pagin_config, membership, is_peeking
)
elif membership == Membership.LEAVE:
# The member_event_id will always be available if membership is set
# to leave.
assert member_event_id

result = await self._room_initial_sync_parted(
user_id, room_id, pagin_config, membership, member_event_id, is_peeking
requester,
room_id,
pagin_config,
membership,
member_event_id,
is_peeking,
)

account_data_events = []
user_id = requester.user.to_string()
tags = await self.store.get_tags_for_room(user_id, room_id)
if tags:
account_data_events.append(
Expand All @@ -350,7 +354,7 @@ async def room_initial_sync(

async def _room_initial_sync_parted(
self,
user_id: str,
requester: Requester,
room_id: str,
pagin_config: PaginationConfig,
membership: str,
Expand All @@ -369,36 +373,44 @@ async def _room_initial_sync_parted(
)

messages = await filter_events_for_client(
self._storage_controllers, user_id, messages, is_peeking=is_peeking
self._storage_controllers,
requester.user.to_string(),
messages,
is_peeking=is_peeking,
)

start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token)

time_now = self.clock.time_msec()
serialize_options = SerializeEventConfig(requester=requester)

return {
"membership": membership,
"room_id": room_id,
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
self._event_serializer.serialize_events(messages, time_now)
self._event_serializer.serialize_events(
messages, time_now, config=serialize_options
)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
},
"state": (
# Don't bundle aggregations as this is a deprecated API.
self._event_serializer.serialize_events(room_state.values(), time_now)
self._event_serializer.serialize_events(
room_state.values(), time_now, config=serialize_options
)
),
"presence": [],
"receipts": [],
}

async def _room_initial_sync_joined(
self,
user_id: str,
requester: Requester,
room_id: str,
pagin_config: PaginationConfig,
membership: str,
Expand All @@ -410,9 +422,12 @@ async def _room_initial_sync_joined(

# TODO: These concurrently
time_now = self.clock.time_msec()
serialize_options = SerializeEventConfig(requester=requester)
# Don't bundle aggregations as this is a deprecated API.
state = self._event_serializer.serialize_events(
current_state.values(), time_now
current_state.values(),
time_now,
config=serialize_options,
)

now_token = self.hs.get_event_sources().get_current_token()
Expand Down Expand Up @@ -450,7 +465,10 @@ async def get_receipts() -> List[JsonDict]:
if not receipts:
return []

return ReceiptEventSource.filter_out_private_receipts(receipts, user_id)
return ReceiptEventSource.filter_out_private_receipts(
receipts,
requester.user.to_string(),
)

presence, receipts, (messages, token) = await make_deferred_yieldable(
gather_results(
Expand All @@ -469,20 +487,23 @@ async def get_receipts() -> List[JsonDict]:
)

messages = await filter_events_for_client(
self._storage_controllers, user_id, messages, is_peeking=is_peeking
self._storage_controllers,
requester.user.to_string(),
messages,
is_peeking=is_peeking,
)

start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
end_token = now_token

time_now = self.clock.time_msec()

ret = {
"room_id": room_id,
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
self._event_serializer.serialize_events(messages, time_now)
self._event_serializer.serialize_events(
messages, time_now, config=serialize_options
)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
Expand Down
9 changes: 6 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from synapse.events import EventBase, relation_from_event
from synapse.events.builder import EventBuilder
from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
from synapse.events.utils import maybe_upsert_event_field
from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
from synapse.logging import opentracing
Expand Down Expand Up @@ -245,8 +245,11 @@ async def get_state_events(
)
room_state = room_state_events[membership_event_id]

now = self.clock.time_msec()
events = self._event_serializer.serialize_events(room_state.values(), now)
events = self._event_serializer.serialize_events(
room_state.values(),
self.clock.time_msec(),
config=SerializeEventConfig(requester=requester),
)
return events

async def _user_can_see_state_at_event(
Expand Down
4 changes: 3 additions & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,9 @@ async def get_messages(

time_now = self.clock.time_msec()

serialize_options = SerializeEventConfig(as_client_event=as_client_event)
serialize_options = SerializeEventConfig(
as_client_event=as_client_event, requester=requester
)

chunk = {
"chunk": (
Expand Down
12 changes: 10 additions & 2 deletions synapse/handlers/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from synapse.api.constants import Direction, EventTypes, RelationTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase, relation_from_event
from synapse.events.utils import SerializeEventConfig
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import trace
from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent
Expand Down Expand Up @@ -151,16 +152,23 @@ async def get_relations(
)

now = self._clock.time_msec()
serialize_options = SerializeEventConfig(requester=requester)
return_value: JsonDict = {
"chunk": self._event_serializer.serialize_events(
events, now, bundle_aggregations=aggregations
events,
now,
bundle_aggregations=aggregations,
config=serialize_options,
),
}
if include_original_event:
# Do not bundle aggregations when retrieving the original event because
# we want the content before relations are applied to it.
return_value["original_event"] = self._event_serializer.serialize_event(
event, now, bundle_aggregations=None
event,
now,
bundle_aggregations=None,
config=serialize_options,
)

if next_token:
Expand Down
Loading