Skip to content

Direction towards allocation-free operations #1184

Open
@schreter

Description

@schreter

Currently, openraft allocates at many places, which is a bit contraproductive, since any OOM will panic and tear down the state machine unnecessarily. We should look at possibilities how to operate in constant memory.

Basically, we have following parts/task that cooperate:

  • core task handling the current state
  • log task applying logs to the log
  • storage task applying logs to the state machine
  • replication tasks sending logs to other nodes
  • acceptor task (outside of openraft) receiving logs from the leader and pushing them to openraft

Now, all of these tasks basically operate on the same list of entries, which are either received from the client (on the leader) or replicated from the leader (on the follower).

The idea is to have one buffer which accepts logs, which then are pushed through the consensus (replication), log and state machine. This buffer could be also fixed-size, so it can accept all the Entry objects for the relevant logs and keep them until they are fully processed.

I.e., on the leader:

  • the entry enters the buffer as the next log entry, which causes accepted pointer to change
  • the log task watches accepted pointer and when it changes, starts writing the log and updates submitted pointer
  • after the log is written, flushed pointer is updated
  • replication tasks watch accepted pointer as well and send entries to the respective follower
  • after the replication is completed, the respective pointer for the follower is updated as well
  • the core task watches flushed pointer and the pointers from followers and computes the committed pointer
  • the core task updates to_apply pointer to the minimum of flushed and committed pointer (we cannot apply locally until the local log was written)
  • the storage task watches to_apply pointer and similar to log and replication tasks, applies log entries
  • after the log entry is applied, the applied pointer is updated and async client callback in the entry is called

The API for respective user code should provide entries as a pair of an async Stream and a notification object, where the stream internally awaits the pointer change and then feeds the entries from the buffer until this pointer, then awaiting the next change. When the needful is done, the notification object is informed about processed entries (not necessarily for each single entry). This notification object could be a wrapper over watch::Sender or similar for the first implementation.

On the follower:

  • there is no RPC API, rather a connection API from the leader which feeds it log entries (as described above)
  • upon connecting, typically, a task is started by the user code consuming these log entries from the network (user-specific)
  • for each incoming log entry, the entry is pushed to openraft into the same buffer as the client write would put it (unless it's already there or known to be committed - when the leader retries after disconnect/reconnect)
  • accepted pointer will change, causing the log to be written
  • after the log is written, flushed pointer will change, which can be exposed for example as a watch::Receiver to the task handling incoming entries
  • the task handling incoming entries will send the flushed state back in an implementation-specific way
  • the advantage is that receive and send part of the connection can be also handled independently

What about elections?

  • the raft core task decides to send a VoteRequest upon a timeout
  • the VoteRequest is sent via a dedicated method, basically similar to what we have today
  • on the remote node, the VoteRequest received by the user-specific task is stored in peer's state and the raft core task is woken up (on a Watch, for example)
  • the raft core task processes the Vote change and sends the VoteReply back over an oneshot channel or similar to the requestor user task, which sends it back over the network

What happens upon leader change?

  • the storage task has no problem, since it only operates on committed entries and thus can continue as-is without any interruption
  • the log write task needs to be notified to truncate the log before continuing (this can be handled by closing the watch::Sender sending the accepted pointer, after which the task is reinitialized/restarted and first truncates the log)
  • the replication tasks are started or terminated as needed due to the status change (this still can OOM, but let's handle that later)
  • the core task continues running, dropping all entries in the buffer past the truncation point before restarting logging and replication

What about the snapshot?

  • raft core task: decides about snapshot need and sets snapshot_needed flag
  • the storage task will trigger the snapshot upon reading the state the next time (e.g., by interspersing SnapshotRequest in the Stream feeding entries to the storage task)
  • when the snapshot is finished, the core state is updated with the new snapshot log ID

What about purging the log?

  • raft core task: requests snapshots and computes the purged pointer as usual
  • log task: we can store purged and accepted in a common state object, so the log task can do the needful
  • similar to storage task, the Stream reading entries for the log can intersperse them with Purge requests

So we have following state to watch:

  • core task state, which includes the log flushed pointers of individual followers (also local one), including their votes, the applied pointer and snapshot_log_id
  • log task state, which includes accepted and purged pointers (in one state object)
  • storage task state, watching to_apply pointer and snapshot_needed flag (in one state object)
  • replication task states, watching accepted pointer

All tasks read entries from the shared buffer up to accepted, only the raft core task adds entries to the buffer and updates accepted afterwards, so the Rust AXM rules are followed. This will likely need a bit of unsafe code to implement the buffer optimally.

The raft core task listens on multiple event sources:

  • the core task state, which also includes voting requests
  • a bounded channel sending requests from the client, as long as there is space in the buffer (so we don't accept more than the size of the buffer before the entries are processed by the log/storage/replication/snapshot)

Since the watch state for the core task is relatively large and frequently updated, I'd implement it a bit differently - push the most of the individual requests via atomic change to the state member (up to 16B can be updated in one go, which should be sufficient for most items) and a simple signalization semaphore to ensure the task is running or will wake up after the update. This is also true for other signalization - for example, signaling the log task needs two 8B values, so it can be also done atomically by a simple atomic write to the respective value followed by task wakeup.

With this, we have no unbounded channels or other unbounded allocations in the entire state in the openraft itself (except for leader change, where we restart replication tasks - that needs to be addressed separately). It's up to the user to implement the handling in an OOM-safe way.

I hope this all is somewhat understandable. We can start implementing it also piecewise - for example, first handle the local log writing and state machine update in this way (which can by default call legacy methods easily or alternatively we can provide a wrapper calling legacy interface to allow for a smooth transition). Then, continue with replication tasks.

Comments? Did I forget about something?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions