Skip to content

Commit c12cdc5

Browse files
erikjohnstonH-Shay
authored andcommitted
Move towards using MultiWriterIdGenerator everywhere (element-hq#17226)
There is a problem with `StreamIdGenerator` where it can go backwards over restarts when a stream ID is requested but then not inserted into the DB. This is problematic if we want to land element-hq#17215, and is generally a potential cause for all sorts of nastiness. Instead of trying to fix `StreamIdGenerator`, we may as well move to `MultiWriterIdGenerator` that does not suffer from this problem (the latest positions are stored in `stream_positions` table). This involves adding SQLite support to the class. This only changes id generators that were already using `MultiWriterIdGenerator` under postgres, a separate PR will move the rest of the uses of `StreamIdGenerator` over.
1 parent ca4f065 commit c12cdc5

File tree

10 files changed

+341
-379
lines changed

10 files changed

+341
-379
lines changed

changelog.d/17226.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Move towards using `MultiWriterIdGenerator` everywhere.

synapse/storage/database.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2461,7 +2461,11 @@ def simple_select_list_paginate_txn(
24612461

24622462

24632463
def make_in_list_sql_clause(
2464-
database_engine: BaseDatabaseEngine, column: str, iterable: Collection[Any]
2464+
database_engine: BaseDatabaseEngine,
2465+
column: str,
2466+
iterable: Collection[Any],
2467+
*,
2468+
negative: bool = False,
24652469
) -> Tuple[str, list]:
24662470
"""Returns an SQL clause that checks the given column is in the iterable.
24672471
@@ -2474,6 +2478,7 @@ def make_in_list_sql_clause(
24742478
database_engine
24752479
column: Name of the column
24762480
iterable: The values to check the column against.
2481+
negative: Whether we should check for inequality, i.e. `NOT IN`
24772482
24782483
Returns:
24792484
A tuple of SQL query and the args
@@ -2482,9 +2487,19 @@ def make_in_list_sql_clause(
24822487
if database_engine.supports_using_any_list:
24832488
# This should hopefully be faster, but also makes postgres query
24842489
# stats easier to understand.
2485-
return "%s = ANY(?)" % (column,), [list(iterable)]
2490+
if not negative:
2491+
clause = f"{column} = ANY(?)"
2492+
else:
2493+
clause = f"{column} != ALL(?)"
2494+
2495+
return clause, [list(iterable)]
24862496
else:
2487-
return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)
2497+
params = ",".join("?" for _ in iterable)
2498+
if not negative:
2499+
clause = f"{column} IN ({params})"
2500+
else:
2501+
clause = f"{column} NOT IN ({params})"
2502+
return clause, list(iterable)
24882503

24892504

24902505
# These overloads ensure that `columns` and `iterable` values have the same length.

synapse/storage/databases/main/account_data.py

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,9 @@
4343
)
4444
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
4545
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
46-
from synapse.storage.engines import PostgresEngine
4746
from synapse.storage.util.id_generators import (
4847
AbstractStreamIdGenerator,
4948
MultiWriterIdGenerator,
50-
StreamIdGenerator,
5149
)
5250
from synapse.types import JsonDict, JsonMapping
5351
from synapse.util import json_encoder
@@ -75,37 +73,20 @@ def __init__(
7573

7674
self._account_data_id_gen: AbstractStreamIdGenerator
7775

78-
if isinstance(database.engine, PostgresEngine):
79-
self._account_data_id_gen = MultiWriterIdGenerator(
80-
db_conn=db_conn,
81-
db=database,
82-
notifier=hs.get_replication_notifier(),
83-
stream_name="account_data",
84-
instance_name=self._instance_name,
85-
tables=[
86-
("room_account_data", "instance_name", "stream_id"),
87-
("room_tags_revisions", "instance_name", "stream_id"),
88-
("account_data", "instance_name", "stream_id"),
89-
],
90-
sequence_name="account_data_sequence",
91-
writers=hs.config.worker.writers.account_data,
92-
)
93-
else:
94-
# Multiple writers are not supported for SQLite.
95-
#
96-
# We shouldn't be running in worker mode with SQLite, but its useful
97-
# to support it for unit tests.
98-
self._account_data_id_gen = StreamIdGenerator(
99-
db_conn,
100-
hs.get_replication_notifier(),
101-
"room_account_data",
102-
"stream_id",
103-
extra_tables=[
104-
("account_data", "stream_id"),
105-
("room_tags_revisions", "stream_id"),
106-
],
107-
is_writer=self._instance_name in hs.config.worker.writers.account_data,
108-
)
76+
self._account_data_id_gen = MultiWriterIdGenerator(
77+
db_conn=db_conn,
78+
db=database,
79+
notifier=hs.get_replication_notifier(),
80+
stream_name="account_data",
81+
instance_name=self._instance_name,
82+
tables=[
83+
("room_account_data", "instance_name", "stream_id"),
84+
("room_tags_revisions", "instance_name", "stream_id"),
85+
("account_data", "instance_name", "stream_id"),
86+
],
87+
sequence_name="account_data_sequence",
88+
writers=hs.config.worker.writers.account_data,
89+
)
10990

11091
account_max = self.get_max_account_data_stream_id()
11192
self._account_data_stream_cache = StreamChangeCache(

synapse/storage/databases/main/deviceinbox.py

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,9 @@
5050
LoggingTransaction,
5151
make_in_list_sql_clause,
5252
)
53-
from synapse.storage.engines import PostgresEngine
5453
from synapse.storage.util.id_generators import (
5554
AbstractStreamIdGenerator,
5655
MultiWriterIdGenerator,
57-
StreamIdGenerator,
5856
)
5957
from synapse.types import JsonDict
6058
from synapse.util import json_encoder
@@ -89,35 +87,23 @@ def __init__(
8987
expiry_ms=30 * 60 * 1000,
9088
)
9189

92-
if isinstance(database.engine, PostgresEngine):
93-
self._can_write_to_device = (
94-
self._instance_name in hs.config.worker.writers.to_device
95-
)
90+
self._can_write_to_device = (
91+
self._instance_name in hs.config.worker.writers.to_device
92+
)
9693

97-
self._to_device_msg_id_gen: AbstractStreamIdGenerator = (
98-
MultiWriterIdGenerator(
99-
db_conn=db_conn,
100-
db=database,
101-
notifier=hs.get_replication_notifier(),
102-
stream_name="to_device",
103-
instance_name=self._instance_name,
104-
tables=[
105-
("device_inbox", "instance_name", "stream_id"),
106-
("device_federation_outbox", "instance_name", "stream_id"),
107-
],
108-
sequence_name="device_inbox_sequence",
109-
writers=hs.config.worker.writers.to_device,
110-
)
111-
)
112-
else:
113-
self._can_write_to_device = True
114-
self._to_device_msg_id_gen = StreamIdGenerator(
115-
db_conn,
116-
hs.get_replication_notifier(),
117-
"device_inbox",
118-
"stream_id",
119-
extra_tables=[("device_federation_outbox", "stream_id")],
120-
)
94+
self._to_device_msg_id_gen: AbstractStreamIdGenerator = MultiWriterIdGenerator(
95+
db_conn=db_conn,
96+
db=database,
97+
notifier=hs.get_replication_notifier(),
98+
stream_name="to_device",
99+
instance_name=self._instance_name,
100+
tables=[
101+
("device_inbox", "instance_name", "stream_id"),
102+
("device_federation_outbox", "instance_name", "stream_id"),
103+
],
104+
sequence_name="device_inbox_sequence",
105+
writers=hs.config.worker.writers.to_device,
106+
)
121107

122108
max_device_inbox_id = self._to_device_msg_id_gen.get_current_token()
123109
device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(

synapse/storage/databases/main/events_worker.py

Lines changed: 33 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,10 @@
7575
LoggingDatabaseConnection,
7676
LoggingTransaction,
7777
)
78-
from synapse.storage.engines import PostgresEngine
7978
from synapse.storage.types import Cursor
8079
from synapse.storage.util.id_generators import (
8180
AbstractStreamIdGenerator,
8281
MultiWriterIdGenerator,
83-
StreamIdGenerator,
8482
)
8583
from synapse.storage.util.sequence import build_sequence_generator
8684
from synapse.types import JsonDict, get_domain_from_id
@@ -195,51 +193,28 @@ def __init__(
195193

196194
self._stream_id_gen: AbstractStreamIdGenerator
197195
self._backfill_id_gen: AbstractStreamIdGenerator
198-
if isinstance(database.engine, PostgresEngine):
199-
# If we're using Postgres than we can use `MultiWriterIdGenerator`
200-
# regardless of whether this process writes to the streams or not.
201-
self._stream_id_gen = MultiWriterIdGenerator(
202-
db_conn=db_conn,
203-
db=database,
204-
notifier=hs.get_replication_notifier(),
205-
stream_name="events",
206-
instance_name=hs.get_instance_name(),
207-
tables=[("events", "instance_name", "stream_ordering")],
208-
sequence_name="events_stream_seq",
209-
writers=hs.config.worker.writers.events,
210-
)
211-
self._backfill_id_gen = MultiWriterIdGenerator(
212-
db_conn=db_conn,
213-
db=database,
214-
notifier=hs.get_replication_notifier(),
215-
stream_name="backfill",
216-
instance_name=hs.get_instance_name(),
217-
tables=[("events", "instance_name", "stream_ordering")],
218-
sequence_name="events_backfill_stream_seq",
219-
positive=False,
220-
writers=hs.config.worker.writers.events,
221-
)
222-
else:
223-
# Multiple writers are not supported for SQLite.
224-
#
225-
# We shouldn't be running in worker mode with SQLite, but its useful
226-
# to support it for unit tests.
227-
self._stream_id_gen = StreamIdGenerator(
228-
db_conn,
229-
hs.get_replication_notifier(),
230-
"events",
231-
"stream_ordering",
232-
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
233-
)
234-
self._backfill_id_gen = StreamIdGenerator(
235-
db_conn,
236-
hs.get_replication_notifier(),
237-
"events",
238-
"stream_ordering",
239-
step=-1,
240-
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
241-
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
242-
)
196+
197+
self._stream_id_gen = MultiWriterIdGenerator(
198+
db_conn=db_conn,
199+
db=database,
200+
notifier=hs.get_replication_notifier(),
201+
stream_name="events",
202+
instance_name=hs.get_instance_name(),
203+
tables=[("events", "instance_name", "stream_ordering")],
204+
sequence_name="events_stream_seq",
205+
writers=hs.config.worker.writers.events,
206+
)
207+
self._backfill_id_gen = MultiWriterIdGenerator(
208+
db_conn=db_conn,
209+
db=database,
210+
notifier=hs.get_replication_notifier(),
211+
stream_name="backfill",
212+
instance_name=hs.get_instance_name(),
213+
tables=[("events", "instance_name", "stream_ordering")],
214+
sequence_name="events_backfill_stream_seq",
215+
positive=False,
216+
writers=hs.config.worker.writers.events,
217+
)
243218

244219
events_max = self._stream_id_gen.get_current_token()
245220
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
@@ -309,27 +284,17 @@ def get_chain_id_txn(txn: Cursor) -> int:
309284

310285
self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator
311286

312-
if isinstance(database.engine, PostgresEngine):
313-
self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
314-
db_conn=db_conn,
315-
db=database,
316-
notifier=hs.get_replication_notifier(),
317-
stream_name="un_partial_stated_event_stream",
318-
instance_name=hs.get_instance_name(),
319-
tables=[
320-
("un_partial_stated_event_stream", "instance_name", "stream_id")
321-
],
322-
sequence_name="un_partial_stated_event_stream_sequence",
323-
# TODO(faster_joins, multiple writers) Support multiple writers.
324-
writers=["master"],
325-
)
326-
else:
327-
self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
328-
db_conn,
329-
hs.get_replication_notifier(),
330-
"un_partial_stated_event_stream",
331-
"stream_id",
332-
)
287+
self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
288+
db_conn=db_conn,
289+
db=database,
290+
notifier=hs.get_replication_notifier(),
291+
stream_name="un_partial_stated_event_stream",
292+
instance_name=hs.get_instance_name(),
293+
tables=[("un_partial_stated_event_stream", "instance_name", "stream_id")],
294+
sequence_name="un_partial_stated_event_stream_sequence",
295+
# TODO(faster_joins, multiple writers) Support multiple writers.
296+
writers=["master"],
297+
)
333298

334299
def get_un_partial_stated_events_token(self, instance_name: str) -> int:
335300
return (

synapse/storage/databases/main/presence.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,11 @@
4040
LoggingTransaction,
4141
)
4242
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
43-
from synapse.storage.engines import PostgresEngine
4443
from synapse.storage.engines._base import IsolationLevel
4544
from synapse.storage.types import Connection
4645
from synapse.storage.util.id_generators import (
4746
AbstractStreamIdGenerator,
4847
MultiWriterIdGenerator,
49-
StreamIdGenerator,
5048
)
5149
from synapse.util.caches.descriptors import cached, cachedList
5250
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -91,21 +89,16 @@ def __init__(
9189
self._instance_name in hs.config.worker.writers.presence
9290
)
9391

94-
if isinstance(database.engine, PostgresEngine):
95-
self._presence_id_gen = MultiWriterIdGenerator(
96-
db_conn=db_conn,
97-
db=database,
98-
notifier=hs.get_replication_notifier(),
99-
stream_name="presence_stream",
100-
instance_name=self._instance_name,
101-
tables=[("presence_stream", "instance_name", "stream_id")],
102-
sequence_name="presence_stream_sequence",
103-
writers=hs.config.worker.writers.presence,
104-
)
105-
else:
106-
self._presence_id_gen = StreamIdGenerator(
107-
db_conn, hs.get_replication_notifier(), "presence_stream", "stream_id"
108-
)
92+
self._presence_id_gen = MultiWriterIdGenerator(
93+
db_conn=db_conn,
94+
db=database,
95+
notifier=hs.get_replication_notifier(),
96+
stream_name="presence_stream",
97+
instance_name=self._instance_name,
98+
tables=[("presence_stream", "instance_name", "stream_id")],
99+
sequence_name="presence_stream_sequence",
100+
writers=hs.config.worker.writers.presence,
101+
)
109102

110103
self.hs = hs
111104
self._presence_on_startup = self._get_active_presence(db_conn)

0 commit comments

Comments
 (0)