Skip to content

Commit 51c9dca

Browse files
committed
Cache both head and tail index in both Consumer and Producer (again)
This reverts PR #48.
1 parent a434d67 commit 51c9dca

File tree

2 files changed

+26
-23
lines changed

2 files changed

+26
-23
lines changed

src/chunks.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,7 @@ impl<T> Producer<T> {
234234
/// For a safe alternative that provides mutable slices of [`Default`]-initialized slots,
235235
/// see [`Producer::write_chunk()`].
236236
pub fn write_chunk_uninit(&mut self, n: usize) -> Result<WriteChunkUninit<'_, T>, ChunkError> {
237-
// "tail" is only ever written by the producer thread, "Relaxed" is enough
238-
let tail = self.buffer.tail.load(Ordering::Relaxed);
237+
let tail = self.cached_tail.get();
239238

240239
// Check if the queue has *possibly* not enough slots.
241240
if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n {
@@ -285,8 +284,7 @@ impl<T> Consumer<T> {
285284
///
286285
/// See the documentation of the [`chunks`](crate::chunks#examples) module.
287286
pub fn read_chunk(&mut self, n: usize) -> Result<ReadChunk<'_, T>, ChunkError> {
288-
// "head" is only ever written by the consumer thread, "Relaxed" is enough
289-
let head = self.buffer.head.load(Ordering::Relaxed);
287+
let head = self.cached_head.get();
290288

291289
// Check if the queue has *possibly* not enough slots.
292290
if self.buffer.distance(head, self.cached_tail.get()) < n {
@@ -497,10 +495,9 @@ impl<T> WriteChunkUninit<'_, T> {
497495

498496
unsafe fn commit_unchecked(self, n: usize) -> usize {
499497
let p = self.producer;
500-
// "tail" is only ever written by the producer thread, "Relaxed" is enough
501-
let tail = p.buffer.tail.load(Ordering::Relaxed);
502-
let tail = p.buffer.increment(tail, n);
498+
let tail = p.buffer.increment(p.cached_tail.get(), n);
503499
p.buffer.tail.store(tail, Ordering::Release);
500+
p.cached_tail.set(tail);
504501
n
505502
}
506503

@@ -744,10 +741,9 @@ impl<T> ReadChunk<'_, T> {
744741
unsafe { self.second_ptr.add(i).drop_in_place() };
745742
}
746743
let c = self.consumer;
747-
// "head" is only ever written by the consumer thread, "Relaxed" is enough
748-
let head = c.buffer.head.load(Ordering::Relaxed);
749-
let head = c.buffer.increment(head, n);
744+
let head = c.buffer.increment(c.cached_head.get(), n);
750745
c.buffer.head.store(head, Ordering::Release);
746+
c.cached_head.set(head);
751747
n
752748
}
753749

@@ -799,10 +795,9 @@ impl<T> Drop for ReadChunkIntoIter<'_, T> {
799795
/// Non-iterated items remain in the ring buffer and are *not* dropped.
800796
fn drop(&mut self) {
801797
let c = &self.chunk.consumer;
802-
// "head" is only ever written by the consumer thread, "Relaxed" is enough
803-
let head = c.buffer.head.load(Ordering::Relaxed);
804-
let head = c.buffer.increment(head, self.iterated);
798+
let head = c.buffer.increment(c.cached_head.get(), self.iterated);
805799
c.buffer.head.store(head, Ordering::Release);
800+
c.cached_head.set(head);
806801
}
807802
}
808803

src/lib.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,11 @@ impl<T> RingBuffer<T> {
131131
let p = Producer {
132132
buffer: buffer.clone(),
133133
cached_head: Cell::new(0),
134+
cached_tail: Cell::new(0),
134135
};
135136
let c = Consumer {
136137
buffer,
138+
cached_head: Cell::new(0),
137139
cached_tail: Cell::new(0),
138140
};
139141
(p, c)
@@ -283,6 +285,11 @@ pub struct Producer<T> {
283285
///
284286
/// This value can be stale and sometimes needs to be resynchronized with `buffer.head`.
285287
cached_head: Cell<usize>,
288+
289+
/// A copy of `buffer.tail` for quick access.
290+
///
291+
/// This value is always in sync with `buffer.tail`.
292+
cached_tail: Cell<usize>,
286293
}
287294

288295
// SAFETY: After moving a Producer to another thread, there is still only a single thread
@@ -315,6 +322,7 @@ impl<T> Producer<T> {
315322
unsafe { self.buffer.slot_ptr(tail).write(value) };
316323
let tail = self.buffer.increment1(tail);
317324
self.buffer.tail.store(tail, Ordering::Release);
325+
self.cached_tail.set(tail);
318326
Ok(())
319327
} else {
320328
Err(PushError::Full(value))
@@ -342,9 +350,7 @@ impl<T> Producer<T> {
342350
pub fn slots(&self) -> usize {
343351
let head = self.buffer.head.load(Ordering::Acquire);
344352
self.cached_head.set(head);
345-
// "tail" is only ever written by the producer thread, "Relaxed" is enough
346-
let tail = self.buffer.tail.load(Ordering::Relaxed);
347-
self.buffer.capacity - self.buffer.distance(head, tail)
353+
self.buffer.capacity - self.buffer.distance(head, self.cached_tail.get())
348354
}
349355

350356
/// Returns `true` if there are currently no slots available for writing.
@@ -445,8 +451,7 @@ impl<T> Producer<T> {
445451
/// This is a strict subset of the functionality implemented in `write_chunk_uninit()`.
446452
/// For performance, this special case is immplemented separately.
447453
fn next_tail(&self) -> Option<usize> {
448-
// "tail" is only ever written by the producer thread, "Relaxed" is enough
449-
let tail = self.buffer.tail.load(Ordering::Relaxed);
454+
let tail = self.cached_tail.get();
450455

451456
// Check if the queue is *possibly* full.
452457
if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity {
@@ -488,6 +493,11 @@ pub struct Consumer<T> {
488493
/// A reference to the ring buffer.
489494
buffer: Arc<RingBuffer<T>>,
490495

496+
/// A copy of `buffer.head` for quick access.
497+
///
498+
/// This value is always in sync with `buffer.head`.
499+
cached_head: Cell<usize>,
500+
491501
/// A copy of `buffer.tail` for quick access.
492502
///
493503
/// This value can be stale and sometimes needs to be resynchronized with `buffer.tail`.
@@ -534,6 +544,7 @@ impl<T> Consumer<T> {
534544
let value = unsafe { self.buffer.slot_ptr(head).read() };
535545
let head = self.buffer.increment1(head);
536546
self.buffer.head.store(head, Ordering::Release);
547+
self.cached_head.set(head);
537548
Ok(value)
538549
} else {
539550
Err(PopError::Empty)
@@ -588,9 +599,7 @@ impl<T> Consumer<T> {
588599
pub fn slots(&self) -> usize {
589600
let tail = self.buffer.tail.load(Ordering::Acquire);
590601
self.cached_tail.set(tail);
591-
// "head" is only ever written by the consumer thread, "Relaxed" is enough
592-
let head = self.buffer.head.load(Ordering::Relaxed);
593-
self.buffer.distance(head, tail)
602+
self.buffer.distance(self.cached_head.get(), tail)
594603
}
595604

596605
/// Returns `true` if there are currently no slots available for reading.
@@ -690,8 +699,7 @@ impl<T> Consumer<T> {
690699
/// This is a strict subset of the functionality implemented in `read_chunk()`.
691700
/// For performance, this special case is immplemented separately.
692701
fn next_head(&self) -> Option<usize> {
693-
// "head" is only ever written by the consumer thread, "Relaxed" is enough
694-
let head = self.buffer.head.load(Ordering::Relaxed);
702+
let head = self.cached_head.get();
695703

696704
// Check if the queue is *possibly* empty.
697705
if head == self.cached_tail.get() {

0 commit comments

Comments
 (0)