Skip to content

Refactor Measure block metrics to be homeserver-scoped #18591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18591.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `Measure` block metrics to be homeserver-scoped.
29 changes: 25 additions & 4 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
from synapse.logging.opentracing import init_tracer
from synapse.metrics import install_gc_manager, register_threadpool
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.homeserver_metrics_manager import HomeserverMetricsManager
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers
from synapse.module_api.callbacks.third_party_event_rules_callbacks import (
Expand Down Expand Up @@ -283,18 +284,38 @@ async def wrapper() -> None:
reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))


def listen_metrics(bind_addresses: StrCollection, port: int) -> None:
def listen_metrics(
bind_addresses: StrCollection, port: int, metrics_manager: HomeserverMetricsManager
) -> None:
"""
Start Prometheus metrics server.
"""
from prometheus_client import start_http_server as start_http_server_prometheus
from prometheus_client import (
REGISTRY,
CollectorRegistry,
start_http_server as start_http_server_prometheus,
)

from synapse.metrics import RegistryProxy
from synapse.metrics import CombinedRegistryProxy

combined_registry_proxy = CombinedRegistryProxy(
[
# TODO: Remove `REGISTRY` once all metrics have been migrated to the
# homeserver specific metrics collector registry, see
# https://github.com/element-hq/synapse/issues/18592
REGISTRY,
metrics_manager.metrics_collector_registry,
]
)
# Cheeky cast but matches the signature of a `CollectorRegistry` instance enough
# for it to be usable in the contexts in which we use it.
# TODO Do something nicer about this.
registry = cast(CollectorRegistry, combined_registry_proxy)
Comment on lines +287 to +313
Copy link
Contributor Author

@MadLittleMods MadLittleMods Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this work can be removed if we decide that it's okay to drop the type: metrics listener (part of #18584)

Comment on lines +310 to +313
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cast is the same thing we were doing before with the RegistryProxy, nothing new. I'm not very keen on thinking of something better here.


for host in bind_addresses:
logger.info("Starting metrics listener on %s:%d", host, port)
_set_prometheus_client_use_created_metrics(False)
start_http_server_prometheus(port, addr=host, registry=RegistryProxy)
start_http_server_prometheus(port, addr=host, registry=registry)


def _set_prometheus_client_use_created_metrics(new_value: bool) -> None:
Expand Down
7 changes: 5 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.server import JsonResource, OptionsResource
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics import METRICS_PREFIX, MetricsResource
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.rest import ClientRestResource, admin
from synapse.rest.health import HealthResource
Expand Down Expand Up @@ -186,7 +186,9 @@ def _listen_http(self, listener_config: ListenerConfig) -> None:
for res in listener_config.http_options.resources:
for name in res.names:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
resources[METRICS_PREFIX] = MetricsResource(
metrics_manager=self.get_metrics_manager()
)
elif name == "client":
resource: Resource = ClientRestResource(self)

Expand Down Expand Up @@ -294,6 +296,7 @@ def start_listening(self) -> None:
_base.listen_metrics(
listener.bind_addresses,
listener.port,
self.get_metrics_manager(),
)
else:
raise ConfigError(
Expand Down
7 changes: 5 additions & 2 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
StaticResource,
)
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics import METRICS_PREFIX, MetricsResource
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.rest import ClientRestResource, admin
from synapse.rest.health import HealthResource
Expand Down Expand Up @@ -252,7 +252,9 @@ def _configure_named_resource(
resources[SERVER_KEY_PREFIX] = KeyResource(self)

if name == "metrics" and self.config.metrics.enable_metrics:
metrics_resource: Resource = MetricsResource(RegistryProxy)
metrics_resource: Resource = MetricsResource(
metrics_manager=self.get_metrics_manager()
)
if compress:
metrics_resource = gz_wrap(metrics_resource)
resources[METRICS_PREFIX] = metrics_resource
Expand Down Expand Up @@ -296,6 +298,7 @@ def start_listening(self) -> None:
_base.listen_metrics(
listener.bind_addresses,
listener.port,
self.get_metrics_manager(),
)
else:
raise ConfigError(
Expand Down
3 changes: 2 additions & 1 deletion synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.is_mine_server_name = hs.is_mine_server_name
Expand Down Expand Up @@ -156,7 +157,7 @@ def _clear_queue(self) -> None:

def _clear_queue_before_pos(self, position_to_delete: int) -> None:
"""Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"):
with Measure(self.clock, self.metrics_manager, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)
Expand Down
5 changes: 4 additions & 1 deletion synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ def __init__(self, hs: "HomeServer"):
self._storage_controllers = hs.get_storage_controllers()

self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self.is_mine_id = hs.is_mine_id
self.is_mine_server_name = hs.is_mine_server_name

Expand Down Expand Up @@ -657,7 +658,9 @@ async def handle_room_events(events: List[EventBase]) -> None:
logger.debug(
"Handling %i events in room %s", len(events), events[0].room_id
)
with Measure(self.clock, "handle_room_events"):
with Measure(
self.clock, self.metrics_manager, "handle_room_events"
):
for event in events:
await handle_event(event)

Expand Down
3 changes: 3 additions & 0 deletions synapse/federation/sender/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class TransactionManager:
def __init__(self, hs: "synapse.server.HomeServer"):
self._server_name = hs.hostname
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_manager = (
hs.get_metrics_manager()
) # nb must be called this for @measure_func
self._store = hs.get_datastores().main
self._transaction_actions = TransactionActions(self._store)
self._transport_layer = hs.get_federation_transport_client()
Expand Down
7 changes: 5 additions & 2 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(self, hs: "HomeServer"):
self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False
self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self.notify_appservices = hs.config.worker.should_notify_appservices
self.event_sources = hs.get_event_sources()
self._msc2409_to_device_messages_enabled = (
Expand Down Expand Up @@ -120,7 +121,7 @@ def notify_interested_services(self, max_token: RoomStreamToken) -> None:

@wrap_as_background_process("notify_interested_services")
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
with Measure(self.clock, "notify_interested_services"):
with Measure(self.clock, self.metrics_manager, "notify_interested_services"):
self.is_processing = True
try:
upper_bound = -1
Expand Down Expand Up @@ -329,7 +330,9 @@ async def _notify_interested_services_ephemeral(
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s", stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"):
with Measure(
self.clock, self.metrics_manager, "notify_interested_services_ephemeral"
):
for service in services:
if stream_key == StreamKeyType.TYPING:
# Note that we don't persist the token (via set_appservice_stream_type_pos)
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, hs: "HomeServer"):
self._storage_controllers = hs.get_storage_controllers()
self._config = hs.config
self._clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self._event_creation_handler = hs.get_event_creation_handler()
self._room_member_handler = hs.get_room_member_handler()

Expand Down Expand Up @@ -159,7 +160,7 @@ async def _unsafe_process_new_event(self) -> None:

# Loop round handling deltas until we're up to date
while True:
with Measure(self._clock, "delayed_events_delta"):
with Measure(self._clock, self.metrics_manager, "delayed_events_delta"):
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
return
Expand Down
11 changes: 10 additions & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,11 @@ class DeviceHandler(DeviceWorkerHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_manager = (
hs.get_metrics_manager()
) # nb must be called this for @measure_func

self.federation_sender = hs.get_federation_sender()
self._account_data_handler = hs.get_account_data_handler()
self._storage_controllers = hs.get_storage_controllers()
Expand Down Expand Up @@ -1214,10 +1219,14 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
def __init__(self, hs: "HomeServer", device_handler: DeviceHandler):
self.store = hs.get_datastores().main
self.federation = hs.get_federation_client()
self.clock = hs.get_clock()
self.device_handler = device_handler
self._notifier = hs.get_notifier()

self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_manager = (
hs.get_metrics_manager()
) # nb must be called this for @measure_func

self._remote_edu_linearizer = Linearizer(name="remote_device_list")
self._resync_linearizer = Linearizer(name="remote_device_resync")

Expand Down
7 changes: 5 additions & 2 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,13 @@ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.validator = EventValidator()
self.profile_handler = hs.get_profile_handler()
self.event_builder_factory = hs.get_event_builder_factory()
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_manager = (
hs.get_metrics_manager()
) # nb must be called this for @measure_func
self.profile_handler = hs.get_profile_handler()
self.server_name = hs.hostname
self.notifier = hs.get_notifier()
self.config = hs.config
Expand Down
8 changes: 5 additions & 3 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.wheel_timer: WheelTimer[str] = WheelTimer()
self.notifier = hs.get_notifier()
self.metrics_manager = hs.get_metrics_manager()

federation_registry = hs.get_federation_registry()

Expand Down Expand Up @@ -941,7 +942,7 @@ async def _update_states(

now = self.clock.time_msec()

with Measure(self.clock, "presence_update_states"):
with Measure(self.clock, self.metrics_manager, "presence_update_states"):
# NOTE: We purposefully don't await between now and when we've
# calculated what we want to do with the new states, to avoid races.

Expand Down Expand Up @@ -1497,7 +1498,7 @@ async def _process_presence() -> None:
async def _unsafe_process(self) -> None:
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "presence_delta"):
with Measure(self.clock, self.metrics_manager, "presence_delta"):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
return
Expand Down Expand Up @@ -1762,6 +1763,7 @@ def __init__(self, hs: "HomeServer"):
self.get_presence_handler = hs.get_presence_handler
self.get_presence_router = hs.get_presence_router
self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self.store = hs.get_datastores().main

async def get_new_events(
Expand Down Expand Up @@ -1792,7 +1794,7 @@ async def get_new_events(
user_id = user.to_string()
stream_change_cache = self.store.presence_stream_cache

with Measure(self.clock, "presence.get_new_events"):
with Measure(self.clock, self.metrics_manager, "presence.get_new_events"):
if from_key is not None:
from_key = int(from_key)

Expand Down
9 changes: 5 additions & 4 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ def __init__(self, hs: "HomeServer"):
self._push_rules_handler = hs.get_push_rules_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self.state = hs.get_state_handler()
self.auth_blocking = hs.get_auth_blocking()
self._storage_controllers = hs.get_storage_controllers()
Expand Down Expand Up @@ -710,7 +711,7 @@ async def ephemeral_by_room(

sync_config = sync_result_builder.sync_config

with Measure(self.clock, "ephemeral_by_room"):
with Measure(self.clock, self.metrics_manager, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else 0

room_ids = sync_result_builder.joined_room_ids
Expand Down Expand Up @@ -783,7 +784,7 @@ async def _load_filtered_recents(
and current token to send down to clients.
newly_joined_room
"""
with Measure(self.clock, "load_filtered_recents"):
with Measure(self.clock, self.metrics_manager, "load_filtered_recents"):
timeline_limit = sync_config.filter_collection.timeline_limit()
block_all_timeline = (
sync_config.filter_collection.blocks_all_room_timeline()
Expand Down Expand Up @@ -1174,7 +1175,7 @@ async def compute_state_delta(
# updates even if they occurred logically before the previous event.
# TODO(mjark) Check for new redactions in the state events.

with Measure(self.clock, "compute_state_delta"):
with Measure(self.clock, self.metrics_manager, "compute_state_delta"):
# The memberships needed for events in the timeline.
# Only calculated when `lazy_load_members` is on.
members_to_fetch: Optional[Set[str]] = None
Expand Down Expand Up @@ -1791,7 +1792,7 @@ async def unread_notifs_for_room_id(
# the DB.
return RoomNotifCounts.empty()

with Measure(self.clock, "unread_notifs_for_room_id"):
with Measure(self.clock, self.metrics_manager, "unread_notifs_for_room_id"):
return await self.store.get_unread_event_push_actions_by_room_for_user(
room_id,
sync_config.user.to_string(),
Expand Down
5 changes: 3 additions & 2 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
def __init__(self, hs: "HomeServer"):
self._main_store = hs.get_datastores().main
self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
# We can't call get_typing_handler here because there's a cycle:
#
# Typing -> Notifier -> TypingNotificationEventSource -> Typing
Expand Down Expand Up @@ -535,7 +536,7 @@ async def get_new_events_as(
appservice may be interested in.
* The latest known room serial.
"""
with Measure(self.clock, "typing.get_new_events_as"):
with Measure(self.clock, self.metrics_manager, "typing.get_new_events_as"):
handler = self.get_typing_handler()

events = []
Expand Down Expand Up @@ -571,7 +572,7 @@ async def get_new_events(
Find typing notifications for given rooms (> `from_token` and <= `to_token`)
"""

with Measure(self.clock, "typing.get_new_events"):
with Measure(self.clock, self.metrics_manager, "typing.get_new_events"):
from_key = int(from_key)
handler = self.get_typing_handler()

Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self, hs: "HomeServer"):
self._storage_controllers = hs.get_storage_controllers()
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.metrics_manager = hs.get_metrics_manager()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.worker.should_update_user_directory
Expand Down Expand Up @@ -237,7 +238,7 @@ async def _unsafe_process(self) -> None:

# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "user_dir_delta"):
with Measure(self.clock, self.metrics_manager, "user_dir_delta"):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self.pos == room_max_stream_ordering:
return
Expand Down
3 changes: 3 additions & 0 deletions synapse/http/federation/matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from synapse.http.federation.well_known_resolver import WellKnownResolver
from synapse.http.proxyagent import ProxyAgent
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.homeserver_metrics_manager import HomeserverMetricsManager
from synapse.types import ISynapseReactor
from synapse.util import Clock

Expand Down Expand Up @@ -93,6 +94,7 @@ class MatrixFederationAgent:
def __init__(
self,
reactor: ISynapseReactor,
metrics_manager: HomeserverMetricsManager,
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
user_agent: bytes,
ip_allowlist: Optional[IPSet],
Expand Down Expand Up @@ -128,6 +130,7 @@ def __init__(
if _well_known_resolver is None:
_well_known_resolver = WellKnownResolver(
reactor,
metrics_manager,
agent=BlocklistingAgentWrapper(
ProxyAgent(
reactor,
Expand Down
Loading
Loading