Skip to content

Commit d72facd

Browse files
author
Stephen Gutekanst
committed
object: remove MPSC queue takeAll() in favor of just pop()
Signed-off-by: Stephen Gutekanst <[email protected]>
1 parent 4bcafe8 commit d72facd

File tree

1 file changed

+2
-171
lines changed

1 file changed

+2
-171
lines changed

src/mpsc.zig

Lines changed: 2 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
//! MPSC (Multi Producer, Single Consumer) lock-free FIFO queue with atomic batch-take support
2-
//!
3-
//! The queue offers two ways to consume items:
4-
//! 1. As a traditional MPSC queue using pop() - only one consumer thread may call pop()
5-
//! 2. As a multi-consumer queue using takeAll() - multiple threads may compete to take all items
1+
//! MPSC (Multi Producer, Single Consumer) lock-free FIFO queue
62
//!
73
//! Internally, the queue maintains a lock-free atomic pool of batch-allocated nodes for reuse.
84
//! Nodes are acquired and owned exclusively by individual threads, then given to the queue and
@@ -15,10 +11,8 @@
1511
//! 1. FIFO ordering is maintained
1612
//! 2. Multiple threads can always push() items in parallel
1713
//! 3. No locks/mutexes are needed
18-
//! 4. Multiple consumer threads can safely compete to takeAll() (only one succeeds), or a single
19-
//! consumer thread may pop().
14+
//! 4. A single consumer thread may pop().
2015
//!
21-
//! Note: takeAll() is parallel-safe, but pop() is never parallel-safe with itself or takeAll().
2216
const std = @import("std");
2317

2418
/// Lock-free atomic pool of nodes for memory allocation
@@ -293,92 +287,6 @@ pub fn Queue(comptime Value: type) type {
293287
}
294288
}
295289

296-
/// Attempts to atomically take all nodes from the queue, returning the chain of all nodes
297-
/// currently in the queue, or null if (a) the queue is empty or (b) another takeAll()
298-
/// consumer beat us to taking it all.
299-
///
300-
/// This operation is safe to call from multiple threads in parallel - only one will succeed
301-
/// in taking the nodes. Although takeAll() is safe to call in parallel, pop() may not be
302-
/// called in parallel with itself or takeAll().
303-
///
304-
/// Caller takes ownership of all nodes up to the head at time of operation, and must free
305-
/// the entire chain by calling releaseAll(returned_node.?) later.
306-
///
307-
/// The returned node is the first node in FIFO order, i.e. node is 1st, node.next is 2nd,
308-
/// node.next.next is 3rd, and so on in FIFO order.
309-
pub fn takeAll(q: *@This()) ?*Node {
310-
outer: while (true) {
311-
var tail = q.tail;
312-
const next = @atomicLoad(?*Node, &tail.next, .acquire);
313-
314-
// First reset head to empty node atomically to ensure new pushes will link to [empty]
315-
// rather than our taken chain. This also acts as our point of taking ownership and
316-
// competing against other parallel takeAll() invocations.
317-
//
318-
// Before: head -> [A]
319-
// After: head -> [empty]
320-
while (true) {
321-
const current_head = @atomicLoad(*Node, &q.head, .acquire);
322-
if (current_head == &q.empty) {
323-
// Another takeAll won
324-
return null;
325-
}
326-
if (@cmpxchgStrong(*Node, &q.head, current_head, &q.empty, .acq_rel, .acquire)) |_| {
327-
continue;
328-
}
329-
break;
330-
}
331-
332-
// Handle empty node advancement if needed
333-
if (tail == &q.empty) {
334-
if (next) |tail_next| {
335-
// Before: tail -> [empty] -> [A] <- head
336-
// After: tail -> [A] <- head
337-
if (@cmpxchgStrong(*Node, &q.tail, tail, tail_next, .acq_rel, .acquire)) |_| {
338-
continue :outer;
339-
}
340-
tail = tail_next;
341-
} else return null; // State: tail -> [empty] <- head
342-
}
343-
344-
// Try to take ownership of the chain
345-
//
346-
// Before: tail -> [B] -> [A] <- head=[empty]
347-
// After: tail=[empty]
348-
// Return: [B] -> [A]
349-
if (@cmpxchgStrong(*Node, &q.tail, tail, &q.empty, .acq_rel, .acquire)) |_| {
350-
// Lost race (with another takeAll() or pop()), retry from start
351-
continue :outer;
352-
}
353-
354-
// Ensure all previous atomic operations (including linking) are complete
355-
// Specifically this part of pushRaw():
356-
//
357-
// // Link previous node to new node
358-
// @atomicStore(?*Node, &prev.next, node, .release);
359-
//
360-
_ = @atomicLoad(*Node, &q.head, .acquire);
361-
362-
return tail;
363-
}
364-
}
365-
366-
/// Release a chain of nodes back to the pool starting from the given node.
367-
/// Used to return nodes acquired via takeAll() back to the pool.
368-
///
369-
/// State: start -> [B] -> [A] -> null
370-
/// After: (all nodes returned to pool)
371-
pub fn releaseAll(q: *@This(), start: *Node) void {
372-
var current = start;
373-
while (true) {
374-
const next = current.next;
375-
current.next = null;
376-
q.pool.release(current);
377-
if (next == null) break;
378-
current = next.?;
379-
}
380-
}
381-
382290
pub fn deinit(q: *@This(), allocator: std.mem.Allocator) void {
383291
q.pool.deinit(allocator);
384292
}
@@ -403,80 +311,3 @@ test "basic" {
403311
try std.testing.expectEqual(queue.pop(), 3);
404312
try std.testing.expectEqual(queue.pop(), null);
405313
}
406-
407-
test "takeAll" {
408-
const allocator = std.testing.allocator;
409-
410-
var queue: Queue(u32) = undefined;
411-
try queue.init(allocator, 32);
412-
defer queue.deinit(allocator);
413-
414-
// Take empty queue
415-
try std.testing.expectEqual(queue.takeAll(), null);
416-
try std.testing.expect(queue.head == &queue.empty);
417-
try std.testing.expect(queue.tail == &queue.empty);
418-
419-
// Take single-element queue
420-
try queue.push(allocator, 1);
421-
if (queue.takeAll()) |nodes| {
422-
defer queue.releaseAll(nodes);
423-
try std.testing.expectEqual(nodes.value, 1);
424-
try std.testing.expectEqual(nodes.next, null);
425-
try std.testing.expect(queue.head == &queue.empty);
426-
try std.testing.expect(queue.tail == &queue.empty);
427-
} else {
428-
return error.TestUnexpectedNull;
429-
}
430-
431-
// Take empty queue again
432-
try std.testing.expectEqual(queue.takeAll(), null);
433-
try std.testing.expect(queue.head == &queue.empty);
434-
try std.testing.expect(queue.tail == &queue.empty);
435-
436-
// Multiple elements with push after takeAll
437-
try queue.push(allocator, 1);
438-
try queue.push(allocator, 2);
439-
try queue.push(allocator, 3);
440-
if (queue.takeAll()) |nodes| {
441-
try std.testing.expectEqual(nodes.value, 1);
442-
try std.testing.expectEqual(nodes.next.?.value, 2);
443-
try std.testing.expectEqual(nodes.next.?.next.?.value, 3);
444-
try std.testing.expectEqual(nodes.next.?.next.?.next, null);
445-
try std.testing.expect(queue.head == &queue.empty);
446-
try std.testing.expect(queue.tail == &queue.empty);
447-
448-
// Push while holding taken nodes
449-
try queue.push(allocator, 42);
450-
try std.testing.expect(queue.head != &queue.empty);
451-
try std.testing.expect(queue.tail == &queue.empty);
452-
453-
// Then release held nodes
454-
queue.releaseAll(nodes);
455-
} else return error.TestUnexpectedNull;
456-
457-
// Verify queue state after all operations
458-
try std.testing.expectEqual(queue.pop(), 42);
459-
try std.testing.expectEqual(queue.pop(), null);
460-
try std.testing.expect(queue.head == &queue.empty);
461-
try std.testing.expect(queue.tail == &queue.empty);
462-
}
463-
464-
test "single takeAll" {
465-
const allocator = std.testing.allocator;
466-
467-
var queue: Queue(u32) = undefined;
468-
try queue.init(allocator, 32);
469-
defer queue.deinit(allocator);
470-
471-
try queue.push(allocator, 1);
472-
473-
if (queue.takeAll()) |nodes| {
474-
try std.testing.expectEqual(nodes.value, 1);
475-
try std.testing.expectEqual(nodes.next, null);
476-
try std.testing.expect(queue.head == &queue.empty);
477-
try std.testing.expect(queue.tail == &queue.empty);
478-
479-
// Then release held nodes
480-
queue.releaseAll(nodes);
481-
} else return error.TestUnexpectedNull;
482-
}

0 commit comments

Comments
 (0)