Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

WIP: Send ephemeral events to appservices #8366

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions synapse/appservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def __init__(
protocols=None,
rate_limited=True,
ip_range_whitelist=None,
supports_ephemeral=False,
):
self.token = token
self.url = (
Expand All @@ -102,6 +103,7 @@ def __init__(
self.namespaces = self._check_namespaces(namespaces)
self.id = id
self.ip_range_whitelist = ip_range_whitelist
self.supports_ephemeral = supports_ephemeral

if "|" in self.id:
raise Exception("application service ID cannot contain '|' character")
Expand Down Expand Up @@ -188,11 +190,11 @@ async def _matches_user(self, event, store):
if not store:
return False

does_match = await self._matches_user_in_member_list(event.room_id, store)
does_match = await self.matches_user_in_member_list(event.room_id, store)
return does_match

@cached(num_args=1, cache_context=True)
async def _matches_user_in_member_list(self, room_id, store, cache_context):
async def matches_user_in_member_list(self, room_id, store, cache_context):
member_list = await store.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
)
Expand Down Expand Up @@ -239,6 +241,19 @@ async def is_interested(self, event, store=None) -> bool:

return False

@cached(num_args=1, cache_context=True)
async def is_interested_in_presence(self, user_id, store, cache_context):
# Find all the rooms the sender is in
if self.is_interested_in_user(user_id.to_string()):
return True
room_ids = await store.get_rooms_for_user(user_id.to_string())

# Then find out if the appservice is interested in any of those rooms
for room_id in room_ids:
if await self.matches_user_in_member_list(room_id, store, cache_context):
return True
return False

def is_interested_in_user(self, user_id):
return (
self._matches_regex(user_id, ApplicationService.NS_USERS)
Expand Down
26 changes: 26 additions & 0 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,32 @@ async def _get() -> Optional[JsonDict]:
key = (service.id, protocol)
return await self.protocol_meta_cache.wrap(key, _get)

async def push_ephemeral(self, service, events, to_device=None, device_lists=None):
if service.url is None:
return True
if service.supports_ephemeral is False:
return True

uri = service.url + (
"%s/uk.half-shot.appservice/ephemeral" % APP_SERVICE_PREFIX
)
try:
await self.put_json(
uri=uri,
json_body={
"events": events,
"device_messages": to_device,
"device_lists": device_lists,
},
args={"access_token": service.hs_token},
)
return True
except CodeMessageException as e:
logger.warning("push_ephemeral to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("push_ephemeral to %s threw exception %s", uri, ex)
return False

async def push_bulk(self, service, events, txn_id=None):
if service.url is None:
return True
Expand Down
8 changes: 6 additions & 2 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ async def start(self):
def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event)

async def submit_ephemeral_events_for_as(self, service, events):
if self.txn_ctrl.is_service_up(service):
await self.as_api.push_ephemeral(service, events)


class _ServiceQueuer:
"""Queue of events waiting to be sent to appservices.
Expand Down Expand Up @@ -161,7 +165,7 @@ def __init__(self, clock, store, as_api):
async def send(self, service, events):
try:
txn = await self.store.create_appservice_txn(service=service, events=events)
service_is_up = await self._is_service_up(service)
service_is_up = await self.is_service_up(service)
if service_is_up:
sent = await txn.send(self.as_api)
if sent:
Expand Down Expand Up @@ -204,7 +208,7 @@ def start_recoverer(self, service):
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))

async def _is_service_up(self, service):
async def is_service_up(self, service):
state = await self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None

Expand Down
3 changes: 3 additions & 0 deletions synapse/config/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def _load_appservice(hostname, as_info, config_filename):
if as_info.get("ip_range_whitelist"):
ip_range_whitelist = IPSet(as_info.get("ip_range_whitelist"))

supports_ephemeral = as_info.get("uk.half-shot.appservice.push_ephemeral", False)

return ApplicationService(
token=as_info["as_token"],
hostname=hostname,
Expand All @@ -168,6 +170,7 @@ def _load_appservice(hostname, as_info, config_filename):
hs_token=as_info["hs_token"],
sender=user_id,
id=as_info["id"],
supports_ephemeral=supports_ephemeral,
protocols=protocols,
rate_limited=rate_limited,
ip_range_whitelist=ip_range_whitelist,
Expand Down
130 changes: 130 additions & 0 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@
# limitations under the License.

import logging
from typing import Collection, List, Union

from prometheus_client import Counter

from twisted.internet import defer

import synapse
from synapse.api.constants import EventTypes
from synapse.appservice import ApplicationService
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken, UserID
from synapse.util.metrics import Measure

logger = logging.getLogger(__name__)
Expand All @@ -43,6 +47,7 @@ def __init__(self, hs):
self.started_scheduler = False
self.clock = hs.get_clock()
self.notify_appservices = hs.config.notify_appservices
self.event_sources = hs.get_event_sources()

self.current_max = 0
self.is_processing = False
Expand Down Expand Up @@ -158,6 +163,131 @@ async def handle_room_events(events):
finally:
self.is_processing = False

async def notify_interested_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
):
services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
if not services or not self.notify_appservices:
return
logger.info("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
events = []
if stream_key == "typing_key":
events = await self._handle_typing(service, new_token)
elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
elif stream_key == "presence_key":
events = await self._handle_as_presence(service, users)
elif stream_key == "device_list_key":
# Check if the device lists have changed for any of the users we are interested in
events = await self._handle_device_list(service, users, new_token)
elif stream_key == "to_device_key":
# Check the inbox for any users the bridge owns
events = await self._handle_to_device(service, users, new_token)
if events:
# TODO: Do in background?
await self.scheduler.submit_ephemeral_events_for_as(
service, events, new_token
)
# We don't persist the token for typing_key
if stream_key == "presence_key":
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
elif stream_key == "receipt_key":
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)
elif stream_key == "to_device_key":
await self.store.set_type_stream_id_for_appservice(
service, "to_device", new_token
)

async def _handle_typing(self, service, new_token):
typing_source = self.event_sources.sources["typing"]
# Get the typing events from just before current
typing, _key = await typing_source.get_new_events_as(
service=service,
# For performance reasons, we don't persist the previous
# token in the DB and instead fetch the latest typing information
# for appservices.
from_key=new_token - 1,
)
return typing

async def _handle_receipts(self, service, token: int):
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
receipts_source = self.event_sources.sources["receipt"]
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
)
return receipts

async def _handle_device_list(
self, service: ApplicationService, users: List[str], new_token: int
):
# TODO: Determine if any user have left and report those
from_token = await self.store.get_type_stream_id_for_appservice(
service, "device_list"
)
changed_user_ids = await self.store.get_device_changes_for_as(
service, from_token, new_token
)
# Return the
return {
"type": "m.device_list_update",
"content": {"changed": changed_user_ids,},
}

async def _handle_to_device(self, service, users, token):
if not any([True for u in users if service.is_interested_in_user(u)]):
return False

since_token = await self.store.get_type_stream_id_for_appservice(
service, "to_device"
)
messages, _ = await self.store.get_new_messages_for_as(
service, since_token, token
)
# This returns user_id -> device_id -> message
return messages

async def _handle_as_presence(self, service, users):
events = []
presence_source = self.event_sources.sources["presence"]
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
for user in users:
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
presence_events, _key = await presence_source.get_new_events(
user=user, service=service, from_key=from_key,
)
time_now = self.clock.time_msec()
presence_events = [
{
"type": "m.presence",
"sender": event.user_id,
"content": format_user_presence_state(
event, time_now, include_user_id=False
),
}
for event in presence_events
]
events = events + presence_events

async def query_user_exists(self, user_id):
"""Check if any application service knows this user_id exists.

Expand Down
22 changes: 22 additions & 0 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,27 @@ async def get_new_events(self, from_key, room_ids, **kwargs):

return (events, to_key)

async def get_new_events_as(self, from_key, service, **kwargs):
from_key = int(from_key)
to_key = self.get_current_key()

if from_key == to_key:
return [], to_key

# We first need to fetch all new receipts
rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms(
from_key=from_key, to_key=to_key
)

# Then filter down to rooms that the AS can read
events = []
for room_id, event in rooms_to_events.items():
if not await service.matches_user_in_member_list(room_id, self.store):
continue

events.append(event)

return (events, to_key)

def get_current_key(self, direction="f"):
return self.store.get_max_receipt_stream_id()
22 changes: 22 additions & 0 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import TYPE_CHECKING, List, Set, Tuple

from synapse.api.errors import AuthError, ShadowBanError, SynapseError
from synapse.appservice import ApplicationService
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import TypingStream
from synapse.types import UserID, get_domain_from_id
Expand Down Expand Up @@ -430,6 +431,27 @@ def _make_event_for(self, room_id):
"content": {"user_ids": list(typing)},
}

async def get_new_events_as(
self, from_key: int, service: ApplicationService, **kwargs
):
with Measure(self.clock, "typing.get_new_events_as"):
from_key = int(from_key)
handler = self.get_typing_handler()

events = []
for room_id in handler._room_serials.keys():
if handler._room_serials[room_id] <= from_key:
print("Key too old")
continue
if not await service.matches_user_in_member_list(
room_id, handler.store
):
continue

events.append(self._make_event_for(room_id))

return (events, handler._latest_room_serial)

async def get_new_events(self, from_key, room_ids, **kwargs):
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
Expand Down
22 changes: 22 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,19 @@ async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
):
try:
await self.appservice_handler.notify_interested_services_ephemeral(
stream_key, new_token, users
)
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
Expand Down Expand Up @@ -367,6 +380,15 @@ def on_new_event(

self.notify_replication()

# Notify appservices
run_as_background_process(
"_notify_app_services_ephemeral",
self._notify_app_services_ephemeral,
stream_key,
new_token,
users,
)

def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happend
without waking up any of the normal user event streams"""
Expand Down
Loading