Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit af07502

Browse files
committed
Merge SlavedEventStore parts into EventsWorkerStore
1 parent 6cfd419 commit af07502

File tree

7 files changed

+61
-106
lines changed

7 files changed

+61
-106
lines changed

synapse/app/admin_cmd.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
from synapse.config.logger import setup_logging
2929
from synapse.events import EventBase
3030
from synapse.handlers.admin import ExfiltrationWriter
31-
from synapse.replication.slave.storage.events import SlavedEventStore
3231
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
3332
from synapse.server import HomeServer
3433
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
@@ -39,11 +38,22 @@
3938
)
4039
from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
4140
from synapse.storage.databases.main.devices import DeviceWorkerStore
41+
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
42+
from synapse.storage.databases.main.event_push_actions import (
43+
EventPushActionsWorkerStore,
44+
)
45+
from synapse.storage.databases.main.events_worker import EventsWorkerStore
4246
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
4347
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
4448
from synapse.storage.databases.main.registration import RegistrationWorkerStore
49+
from synapse.storage.databases.main.relations import RelationsWorkerStore
4550
from synapse.storage.databases.main.room import RoomWorkerStore
51+
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
52+
from synapse.storage.databases.main.signatures import SignatureWorkerStore
53+
from synapse.storage.databases.main.state import StateGroupWorkerStore
54+
from synapse.storage.databases.main.stream import StreamWorkerStore
4655
from synapse.storage.databases.main.tags import TagsWorkerStore
56+
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
4757
from synapse.types import StateMap
4858
from synapse.util import SYNAPSE_VERSION
4959
from synapse.util.logcontext import LoggingContext
@@ -53,16 +63,24 @@
5363

5464
class AdminCmdSlavedStore(
5565
SlavedFilteringStore,
56-
SlavedEventStore,
5766
DeviceWorkerStore,
5867
TagsWorkerStore,
5968
DeviceInboxWorkerStore,
6069
AccountDataWorkerStore,
6170
PushRulesWorkerStore,
6271
ApplicationServiceTransactionWorkerStore,
6372
ApplicationServiceWorkerStore,
64-
RegistrationWorkerStore,
73+
RoomMemberWorkerStore,
74+
RelationsWorkerStore,
75+
EventFederationWorkerStore,
76+
EventPushActionsWorkerStore,
77+
StateGroupWorkerStore,
78+
SignatureWorkerStore,
79+
UserErasureWorkerStore,
6580
ReceiptsWorkerStore,
81+
StreamWorkerStore,
82+
EventsWorkerStore,
83+
RegistrationWorkerStore,
6684
RoomWorkerStore,
6785
):
6886
def __init__(

synapse/app/generic_worker.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
from synapse.logging.context import LoggingContext
4949
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
5050
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
51-
from synapse.replication.slave.storage.events import SlavedEventStore
5251
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
5352
from synapse.replication.slave.storage.keys import SlavedKeyStore
5453
from synapse.rest.admin import register_servlets_for_media_repo
@@ -101,6 +100,11 @@
101100
from synapse.storage.databases.main.devices import DeviceWorkerStore
102101
from synapse.storage.databases.main.directory import DirectoryWorkerStore
103102
from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
103+
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
104+
from synapse.storage.databases.main.event_push_actions import (
105+
EventPushActionsWorkerStore,
106+
)
107+
from synapse.storage.databases.main.events_worker import EventsWorkerStore
104108
from synapse.storage.databases.main.lock import LockStore
105109
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
106110
from synapse.storage.databases.main.metrics import ServerMetricsStore
@@ -113,15 +117,21 @@
113117
from synapse.storage.databases.main.pusher import PusherWorkerStore
114118
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
115119
from synapse.storage.databases.main.registration import RegistrationWorkerStore
120+
from synapse.storage.databases.main.relations import RelationsWorkerStore
116121
from synapse.storage.databases.main.room import RoomWorkerStore
117122
from synapse.storage.databases.main.room_batch import RoomBatchStore
123+
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
118124
from synapse.storage.databases.main.search import SearchStore
119125
from synapse.storage.databases.main.session import SessionStore
126+
from synapse.storage.databases.main.signatures import SignatureWorkerStore
127+
from synapse.storage.databases.main.state import StateGroupWorkerStore
120128
from synapse.storage.databases.main.stats import StatsStore
129+
from synapse.storage.databases.main.stream import StreamWorkerStore
121130
from synapse.storage.databases.main.tags import TagsWorkerStore
122131
from synapse.storage.databases.main.transactions import TransactionWorkerStore
123132
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
124133
from synapse.storage.databases.main.user_directory import UserDirectoryStore
134+
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
125135
from synapse.types import JsonDict
126136
from synapse.util import SYNAPSE_VERSION
127137
from synapse.util.httpresourcetree import create_resource_tree
@@ -237,7 +247,6 @@ class GenericWorkerSlavedStore(
237247
AccountDataWorkerStore,
238248
CensorEventsStore,
239249
ClientIpWorkerStore,
240-
SlavedEventStore,
241250
SlavedKeyStore,
242251
RoomWorkerStore,
243252
RoomBatchStore,
@@ -251,7 +260,16 @@ class GenericWorkerSlavedStore(
251260
MediaRepositoryStore,
252261
ServerMetricsStore,
253262
PusherWorkerStore,
263+
RoomMemberWorkerStore,
264+
RelationsWorkerStore,
265+
EventFederationWorkerStore,
266+
EventPushActionsWorkerStore,
267+
StateGroupWorkerStore,
268+
SignatureWorkerStore,
269+
UserErasureWorkerStore,
254270
ReceiptsWorkerStore,
271+
StreamWorkerStore,
272+
EventsWorkerStore,
255273
RegistrationWorkerStore,
256274
SearchStore,
257275
TransactionWorkerStore,

synapse/replication/slave/storage/events.py

Lines changed: 0 additions & 79 deletions
This file was deleted.

synapse/storage/databases/main/__init__.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from synapse.storage.engines import BaseDatabaseEngine
2828
from synapse.storage.types import Cursor
2929
from synapse.types import JsonDict, get_domain_from_id
30-
from synapse.util.caches.stream_change_cache import StreamChangeCache
3130

3231
from .account_data import AccountDataStore
3332
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
@@ -139,24 +138,6 @@ def __init__(
139138

140139
super().__init__(database, db_conn, hs)
141140

142-
events_max = self._stream_id_gen.get_current_token()
143-
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
144-
db_conn,
145-
"current_state_delta_stream",
146-
entity_column="room_id",
147-
stream_column="stream_id",
148-
max_value=events_max, # As we share the stream id with events token
149-
limit=1000,
150-
)
151-
self._curr_state_delta_stream_cache = StreamChangeCache(
152-
"_curr_state_delta_stream_cache",
153-
min_curr_state_delta_id,
154-
prefilled_cache=curr_state_delta_prefill,
155-
)
156-
157-
self._stream_order_on_start = self.get_room_max_stream_ordering()
158-
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
159-
160141
async def get_users(self) -> List[JsonDict]:
161142
"""Function to retrieve a list of users in users table.
162143

synapse/storage/databases/main/events_worker.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
8282
from synapse.util.caches.descriptors import cached, cachedList
8383
from synapse.util.caches.lrucache import AsyncLruCache
84+
from synapse.util.caches.stream_change_cache import StreamChangeCache
8485
from synapse.util.cancellation import cancellable
8586
from synapse.util.iterutils import batch_iter
8687
from synapse.util.metrics import Measure
@@ -233,6 +234,21 @@ def __init__(
233234
db_conn, "events", "stream_ordering", step=-1
234235
)
235236

237+
events_max = self._stream_id_gen.get_current_token()
238+
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
239+
db_conn,
240+
"current_state_delta_stream",
241+
entity_column="room_id",
242+
stream_column="stream_id",
243+
max_value=events_max, # As we share the stream id with events token
244+
limit=1000,
245+
)
246+
self._curr_state_delta_stream_cache: StreamChangeCache = StreamChangeCache(
247+
"_curr_state_delta_stream_cache",
248+
min_curr_state_delta_id,
249+
prefilled_cache=curr_state_delta_prefill,
250+
)
251+
236252
if hs.config.worker.run_background_tasks:
237253
# We periodically clean out old transaction ID mappings
238254
self._clock.looping_call(

synapse/storage/databases/main/stream.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,7 @@ def __init__(
415415
)
416416

417417
self._stream_order_on_start = self.get_room_max_stream_ordering()
418+
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
418419

419420
def get_room_max_stream_ordering(self) -> int:
420421
"""Get the stream_ordering of regular events that we have committed up to

tests/replication/slave/storage/test_events.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
from synapse.api.room_versions import RoomVersions
2222
from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict
2323
from synapse.handlers.room import RoomEventSource
24-
from synapse.replication.slave.storage.events import SlavedEventStore
2524
from synapse.storage.databases.main.event_push_actions import (
2625
NotifCounts,
2726
RoomNotifCounts,
2827
)
28+
from synapse.storage.databases.main.events_worker import EventsWorkerStore
2929
from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
3030
from synapse.types import PersistedEventPosition
3131

@@ -58,9 +58,9 @@ def unpatch():
5858
return unpatch
5959

6060

61-
class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
61+
class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase):
6262

63-
STORE_TYPE = SlavedEventStore
63+
STORE_TYPE = EventsWorkerStore
6464

6565
def setUp(self):
6666
# Patch up the equality operator for events so that we can check

0 commit comments

Comments
 (0)