Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 1eea662

Browse files
Add a get_next_txn method to StreamIdGenerator to match MultiWriterIdGenerator (#15191
1 parent ecbe0dd commit 1eea662

File tree

4 files changed

+48
-11
lines changed

4 files changed

+48
-11
lines changed

changelog.d/15191.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add a `get_next_txn` method to `StreamIdGenerator` to match `MultiWriterIdGenerator`.

synapse/storage/databases/main/account_data.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
from synapse.storage.engines import PostgresEngine
4141
from synapse.storage.util.id_generators import (
4242
AbstractStreamIdGenerator,
43-
AbstractStreamIdTracker,
4443
MultiWriterIdGenerator,
4544
StreamIdGenerator,
4645
)
@@ -64,14 +63,12 @@ def __init__(
6463
):
6564
super().__init__(database, db_conn, hs)
6665

67-
# `_can_write_to_account_data` indicates whether the current worker is allowed
68-
# to write account data. A value of `True` implies that `_account_data_id_gen`
69-
# is an `AbstractStreamIdGenerator` and not just a tracker.
70-
self._account_data_id_gen: AbstractStreamIdTracker
7166
self._can_write_to_account_data = (
7267
self._instance_name in hs.config.worker.writers.account_data
7368
)
7469

70+
self._account_data_id_gen: AbstractStreamIdGenerator
71+
7572
if isinstance(database.engine, PostgresEngine):
7673
self._account_data_id_gen = MultiWriterIdGenerator(
7774
db_conn=db_conn,
@@ -558,7 +555,6 @@ async def add_account_data_to_room(
558555
The maximum stream ID.
559556
"""
560557
assert self._can_write_to_account_data
561-
assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator)
562558

563559
content_json = json_encoder.encode(content)
564560

@@ -598,7 +594,6 @@ async def remove_account_data_for_room(
598594
data to delete.
599595
"""
600596
assert self._can_write_to_account_data
601-
assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator)
602597

603598
def _remove_account_data_for_room_txn(
604599
txn: LoggingTransaction, next_id: int
@@ -663,7 +658,6 @@ async def add_account_data_for_user(
663658
The maximum stream ID.
664659
"""
665660
assert self._can_write_to_account_data
666-
assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator)
667661

668662
async with self._account_data_id_gen.get_next() as next_id:
669663
await self.db_pool.runInteraction(
@@ -770,7 +764,6 @@ async def remove_account_data_for_user(
770764
to delete.
771765
"""
772766
assert self._can_write_to_account_data
773-
assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator)
774767

775768
def _remove_account_data_for_user_txn(
776769
txn: LoggingTransaction, next_id: int

synapse/storage/util/id_generators.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,15 @@ def get_next_mult(self, n: int) -> AsyncContextManager[Sequence[int]]:
158158
"""
159159
raise NotImplementedError()
160160

161+
@abc.abstractmethod
162+
def get_next_txn(self, txn: LoggingTransaction) -> int:
163+
"""
164+
Usage:
165+
stream_id_gen.get_next_txn(txn)
166+
# ... persist events ...
167+
"""
168+
raise NotImplementedError()
169+
161170

162171
class StreamIdGenerator(AbstractStreamIdGenerator):
163172
"""Generates and tracks stream IDs for a stream with a single writer.
@@ -263,6 +272,40 @@ def manager() -> Generator[Sequence[int], None, None]:
263272

264273
return _AsyncCtxManagerWrapper(manager())
265274

275+
def get_next_txn(self, txn: LoggingTransaction) -> int:
276+
"""
277+
Retrieve the next stream ID from within a database transaction.
278+
279+
Clean-up functions will be called when the transaction finishes.
280+
281+
Args:
282+
txn: The database transaction object.
283+
284+
Returns:
285+
The next stream ID.
286+
"""
287+
if not self._is_writer:
288+
raise Exception("Tried to allocate stream ID on non-writer")
289+
290+
# Get the next stream ID.
291+
with self._lock:
292+
self._current += self._step
293+
next_id = self._current
294+
295+
self._unfinished_ids[next_id] = next_id
296+
297+
def clear_unfinished_id(id_to_clear: int) -> None:
298+
"""A function to mark processing this ID as finished"""
299+
with self._lock:
300+
self._unfinished_ids.pop(id_to_clear)
301+
302+
# Mark this ID as finished once the database transaction itself finishes.
303+
txn.call_after(clear_unfinished_id, next_id)
304+
txn.call_on_exception(clear_unfinished_id, next_id)
305+
306+
# Return the new ID.
307+
return next_id
308+
266309
def get_current_token(self) -> int:
267310
if not self._is_writer:
268311
return self._current
@@ -568,7 +611,7 @@ def get_next_txn(self, txn: LoggingTransaction) -> int:
568611
"""
569612
Usage:
570613
571-
stream_id = stream_id_gen.get_next(txn)
614+
stream_id = stream_id_gen.get_next_txn(txn)
572615
# ... persist event ...
573616
"""
574617

synapse/storage/util/sequence.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ def __init__(self, get_first_callback: GetFirstCallbackType):
205205
"""
206206
Args:
207207
get_first_callback: a callback which is called on the first call to
208-
get_next_id_txn; should return the curreent maximum id
208+
get_next_id_txn; should return the current maximum id
209209
"""
210210
# the callback. this is cleared after it is called, so that it can be GCed.
211211
self._callback: Optional[GetFirstCallbackType] = get_first_callback

0 commit comments

Comments
 (0)