Description
Currently, openraft
allocates at many places, which is a bit contraproductive, since any OOM will panic and tear down the state machine unnecessarily. We should look at possibilities how to operate in constant memory.
Basically, we have following parts/task that cooperate:
- core task handling the current state
- log task applying logs to the log
- storage task applying logs to the state machine
- replication tasks sending logs to other nodes
- acceptor task (outside of
openraft
) receiving logs from the leader and pushing them toopenraft
Now, all of these tasks basically operate on the same list of entries, which are either received from the client (on the leader) or replicated from the leader (on the follower).
The idea is to have one buffer which accepts logs, which then are pushed through the consensus (replication), log and state machine. This buffer could be also fixed-size, so it can accept all the Entry
objects for the relevant logs and keep them until they are fully processed.
I.e., on the leader:
- the entry enters the buffer as the next log entry, which causes
accepted
pointer to change - the log task watches
accepted
pointer and when it changes, starts writing the log and updatessubmitted
pointer - after the log is written,
flushed
pointer is updated - replication tasks watch
accepted
pointer as well and send entries to the respective follower - after the replication is completed, the respective pointer for the follower is updated as well
- the core task watches
flushed
pointer and the pointers from followers and computes thecommitted
pointer - the core task updates
to_apply
pointer to the minimum offlushed
andcommitted
pointer (we cannot apply locally until the local log was written) - the storage task watches
to_apply
pointer and similar to log and replication tasks, applies log entries - after the log entry is applied, the
applied
pointer is updated and async client callback in the entry is called
The API for respective user code should provide entries as a pair of an async Stream
and a notification object, where the stream internally awaits the pointer change and then feeds the entries from the buffer until this pointer, then awaiting the next change. When the needful is done, the notification object is informed about processed entries (not necessarily for each single entry). This notification object could be a wrapper over watch::Sender
or similar for the first implementation.
On the follower:
- there is no RPC API, rather a connection API from the leader which feeds it log entries (as described above)
- upon connecting, typically, a task is started by the user code consuming these log entries from the network (user-specific)
- for each incoming log entry, the entry is pushed to
openraft
into the same buffer as the client write would put it (unless it's already there or known to be committed - when the leader retries after disconnect/reconnect) accepted
pointer will change, causing the log to be written- after the log is written,
flushed
pointer will change, which can be exposed for example as awatch::Receiver
to the task handling incoming entries - the task handling incoming entries will send the
flushed
state back in an implementation-specific way - the advantage is that receive and send part of the connection can be also handled independently
What about elections?
- the raft core task decides to send a
VoteRequest
upon a timeout - the
VoteRequest
is sent via a dedicated method, basically similar to what we have today - on the remote node, the
VoteRequest
received by the user-specific task is stored in peer's state and the raft core task is woken up (on aWatch
, for example) - the raft core task processes the
Vote
change and sends theVoteReply
back over anoneshot
channel or similar to the requestor user task, which sends it back over the network
What happens upon leader change?
- the storage task has no problem, since it only operates on committed entries and thus can continue as-is without any interruption
- the log write task needs to be notified to truncate the log before continuing (this can be handled by closing the
watch::Sender
sending theaccepted
pointer, after which the task is reinitialized/restarted and first truncates the log) - the replication tasks are started or terminated as needed due to the status change (this still can OOM, but let's handle that later)
- the core task continues running, dropping all entries in the buffer past the truncation point before restarting logging and replication
What about the snapshot?
- raft core task: decides about snapshot need and sets
snapshot_needed
flag - the storage task will trigger the snapshot upon reading the state the next time (e.g., by interspersing
SnapshotRequest
in theStream
feeding entries to the storage task) - when the snapshot is finished, the core state is updated with the new snapshot log ID
What about purging the log?
- raft core task: requests snapshots and computes the
purged
pointer as usual - log task: we can store
purged
andaccepted
in a common state object, so the log task can do the needful - similar to storage task, the
Stream
reading entries for the log can intersperse them withPurge
requests
So we have following state to watch:
- core task state, which includes the log
flushed
pointers of individual followers (also local one), including their votes, theapplied
pointer andsnapshot_log_id
- log task state, which includes
accepted
andpurged
pointers (in one state object) - storage task state, watching
to_apply
pointer andsnapshot_needed
flag (in one state object) - replication task states, watching
accepted
pointer
All tasks read entries from the shared buffer up to accepted
, only the raft core task adds entries to the buffer and updates accepted
afterwards, so the Rust AXM rules are followed. This will likely need a bit of unsafe
code to implement the buffer optimally.
The raft core task listens on multiple event sources:
- the core task state, which also includes voting requests
- a bounded channel sending requests from the client, as long as there is space in the buffer (so we don't accept more than the size of the buffer before the entries are processed by the log/storage/replication/snapshot)
Since the watch
state for the core task is relatively large and frequently updated, I'd implement it a bit differently - push the most of the individual requests via atomic change to the state member (up to 16B can be updated in one go, which should be sufficient for most items) and a simple signalization semaphore to ensure the task is running or will wake up after the update. This is also true for other signalization - for example, signaling the log task needs two 8B values, so it can be also done atomically by a simple atomic write to the respective value followed by task wakeup.
With this, we have no unbounded channels or other unbounded allocations in the entire state in the openraft
itself (except for leader change, where we restart replication tasks - that needs to be addressed separately). It's up to the user to implement the handling in an OOM-safe way.
I hope this all is somewhat understandable. We can start implementing it also piecewise - for example, first handle the local log writing and state machine update in this way (which can by default call legacy methods easily or alternatively we can provide a wrapper calling legacy interface to allow for a smooth transition). Then, continue with replication tasks.
Comments? Did I forget about something?