Skip to content

Commit 86694b0

Browse files
authored
Add zero-copy make_mut (#695)
1 parent 0c17e99 commit 86694b0

File tree

4 files changed

+341
-2
lines changed

4 files changed

+341
-2
lines changed

src/bytes.rs

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::buf::IntoIter;
1515
#[allow(unused)]
1616
use crate::loom::sync::atomic::AtomicMut;
1717
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
18-
use crate::Buf;
18+
use crate::{Buf, BytesMut};
1919

2020
/// A cheaply cloneable and sliceable chunk of contiguous memory.
2121
///
@@ -113,6 +113,7 @@ pub(crate) struct Vtable {
113113
///
114114
/// takes `Bytes` to value
115115
pub to_vec: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Vec<u8>,
116+
pub to_mut: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> BytesMut,
116117
/// fn(data)
117118
pub is_unique: unsafe fn(&AtomicPtr<()>) -> bool,
118119
/// fn(data, ptr, len)
@@ -507,6 +508,49 @@ impl Bytes {
507508
self.truncate(0);
508509
}
509510

511+
/// Try to convert self into `BytesMut`.
512+
///
513+
/// If `self` is unique for the entire original buffer, this will succeed
514+
/// and return a `BytesMut` with the contents of `self` without copying.
515+
/// If `self` is not unique for the entire original buffer, this will fail
516+
/// and return self.
517+
///
518+
/// # Examples
519+
///
520+
/// ```
521+
/// use bytes::{Bytes, BytesMut};
522+
///
523+
/// let bytes = Bytes::from(b"hello".to_vec());
524+
/// assert_eq!(bytes.try_into_mut(), Ok(BytesMut::from(&b"hello"[..])));
525+
/// ```
526+
pub fn try_into_mut(self) -> Result<BytesMut, Bytes> {
527+
if self.is_unique() {
528+
Ok(self.make_mut())
529+
} else {
530+
Err(self)
531+
}
532+
}
533+
534+
/// Convert self into `BytesMut`.
535+
///
536+
/// If `self` is unique for the entire original buffer, this will return a
537+
/// `BytesMut` with the contents of `self` without copying.
538+
/// If `self` is not unique for the entire original buffer, this will make
539+
/// a copy of `self` subset of the original buffer in a new `BytesMut`.
540+
///
541+
/// # Examples
542+
///
543+
/// ```
544+
/// use bytes::{Bytes, BytesMut};
545+
///
546+
/// let bytes = Bytes::from(b"hello".to_vec());
547+
/// assert_eq!(bytes.make_mut(), BytesMut::from(&b"hello"[..]));
548+
/// ```
549+
pub fn make_mut(self) -> BytesMut {
550+
let bytes = ManuallyDrop::new(self);
551+
unsafe { (bytes.vtable.to_mut)(&bytes.data, bytes.ptr, bytes.len) }
552+
}
553+
510554
#[inline]
511555
pub(crate) unsafe fn with_vtable(
512556
ptr: *const u8,
@@ -917,6 +961,7 @@ impl fmt::Debug for Vtable {
917961
const STATIC_VTABLE: Vtable = Vtable {
918962
clone: static_clone,
919963
to_vec: static_to_vec,
964+
to_mut: static_to_mut,
920965
is_unique: static_is_unique,
921966
drop: static_drop,
922967
};
@@ -931,6 +976,11 @@ unsafe fn static_to_vec(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8
931976
slice.to_vec()
932977
}
933978

979+
unsafe fn static_to_mut(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
980+
let slice = slice::from_raw_parts(ptr, len);
981+
BytesMut::from(slice)
982+
}
983+
934984
fn static_is_unique(_: &AtomicPtr<()>) -> bool {
935985
false
936986
}
@@ -944,13 +994,15 @@ unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
944994
static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
945995
clone: promotable_even_clone,
946996
to_vec: promotable_even_to_vec,
997+
to_mut: promotable_even_to_mut,
947998
is_unique: promotable_is_unique,
948999
drop: promotable_even_drop,
9491000
};
9501001

9511002
static PROMOTABLE_ODD_VTABLE: Vtable = Vtable {
9521003
clone: promotable_odd_clone,
9531004
to_vec: promotable_odd_to_vec,
1005+
to_mut: promotable_odd_to_mut,
9541006
is_unique: promotable_is_unique,
9551007
drop: promotable_odd_drop,
9561008
};
@@ -994,12 +1046,47 @@ unsafe fn promotable_to_vec(
9941046
}
9951047
}
9961048

1049+
unsafe fn promotable_to_mut(
1050+
data: &AtomicPtr<()>,
1051+
ptr: *const u8,
1052+
len: usize,
1053+
f: fn(*mut ()) -> *mut u8,
1054+
) -> BytesMut {
1055+
let shared = data.load(Ordering::Acquire);
1056+
let kind = shared as usize & KIND_MASK;
1057+
1058+
if kind == KIND_ARC {
1059+
shared_to_mut_impl(shared.cast(), ptr, len)
1060+
} else {
1061+
// KIND_VEC is a view of an underlying buffer at a certain offset.
1062+
// The ptr + len always represents the end of that buffer.
1063+
// Before truncating it, it is first promoted to KIND_ARC.
1064+
// Thus, we can safely reconstruct a Vec from it without leaking memory.
1065+
debug_assert_eq!(kind, KIND_VEC);
1066+
1067+
let buf = f(shared);
1068+
let off = offset_from(ptr, buf);
1069+
let cap = off + len;
1070+
let v = Vec::from_raw_parts(buf, cap, cap);
1071+
1072+
let mut b = BytesMut::from_vec(v);
1073+
b.advance_unchecked(off);
1074+
b
1075+
}
1076+
}
1077+
9971078
unsafe fn promotable_even_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
9981079
promotable_to_vec(data, ptr, len, |shared| {
9991080
ptr_map(shared.cast(), |addr| addr & !KIND_MASK)
10001081
})
10011082
}
10021083

1084+
unsafe fn promotable_even_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
1085+
promotable_to_mut(data, ptr, len, |shared| {
1086+
ptr_map(shared.cast(), |addr| addr & !KIND_MASK)
1087+
})
1088+
}
1089+
10031090
unsafe fn promotable_even_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) {
10041091
data.with_mut(|shared| {
10051092
let shared = *shared;
@@ -1031,6 +1118,10 @@ unsafe fn promotable_odd_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize
10311118
promotable_to_vec(data, ptr, len, |shared| shared.cast())
10321119
}
10331120

1121+
unsafe fn promotable_odd_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
1122+
promotable_to_mut(data, ptr, len, |shared| shared.cast())
1123+
}
1124+
10341125
unsafe fn promotable_odd_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) {
10351126
data.with_mut(|shared| {
10361127
let shared = *shared;
@@ -1087,6 +1178,7 @@ const _: [(); 0 - mem::align_of::<Shared>() % 2] = []; // Assert that the alignm
10871178
static SHARED_VTABLE: Vtable = Vtable {
10881179
clone: shared_clone,
10891180
to_vec: shared_to_vec,
1181+
to_mut: shared_to_mut,
10901182
is_unique: shared_is_unique,
10911183
drop: shared_drop,
10921184
};
@@ -1133,6 +1225,45 @@ unsafe fn shared_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec
11331225
shared_to_vec_impl(data.load(Ordering::Relaxed).cast(), ptr, len)
11341226
}
11351227

1228+
unsafe fn shared_to_mut_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> BytesMut {
1229+
// The goal is to check if the current handle is the only handle
1230+
// that currently has access to the buffer. This is done by
1231+
// checking if the `ref_cnt` is currently 1.
1232+
//
1233+
// The `Acquire` ordering synchronizes with the `Release` as
1234+
// part of the `fetch_sub` in `release_shared`. The `fetch_sub`
1235+
// operation guarantees that any mutations done in other threads
1236+
// are ordered before the `ref_cnt` is decremented. As such,
1237+
// this `Acquire` will guarantee that those mutations are
1238+
// visible to the current thread.
1239+
//
1240+
// Otherwise, we take the other branch, copy the data and call `release_shared`.
1241+
if (*shared).ref_cnt.load(Ordering::Acquire) == 1 {
1242+
// Deallocate the `Shared` instance without running its destructor.
1243+
let shared = *Box::from_raw(shared);
1244+
let shared = ManuallyDrop::new(shared);
1245+
let buf = shared.buf;
1246+
let cap = shared.cap;
1247+
1248+
// Rebuild Vec
1249+
let off = offset_from(ptr, buf);
1250+
let v = Vec::from_raw_parts(buf, len + off, cap);
1251+
1252+
let mut b = BytesMut::from_vec(v);
1253+
b.advance_unchecked(off);
1254+
b
1255+
} else {
1256+
// Copy the data from Shared in a new Vec, then release it
1257+
let v = slice::from_raw_parts(ptr, len).to_vec();
1258+
release_shared(shared);
1259+
BytesMut::from_vec(v)
1260+
}
1261+
}
1262+
1263+
unsafe fn shared_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
1264+
shared_to_mut_impl(data.load(Ordering::Relaxed).cast(), ptr, len)
1265+
}
1266+
11361267
pub(crate) unsafe fn shared_is_unique(data: &AtomicPtr<()>) -> bool {
11371268
let shared = data.load(Ordering::Acquire);
11381269
let ref_cnt = (*shared.cast::<Shared>()).ref_cnt.load(Ordering::Relaxed);
@@ -1291,6 +1422,23 @@ where
12911422
new_addr as *mut u8
12921423
}
12931424

1425+
/// Precondition: dst >= original
1426+
///
1427+
/// The following line is equivalent to:
1428+
///
1429+
/// ```rust,ignore
1430+
/// self.ptr.as_ptr().offset_from(ptr) as usize;
1431+
/// ```
1432+
///
1433+
/// But due to min rust is 1.39 and it is only stabilized
1434+
/// in 1.47, we cannot use it.
1435+
#[inline]
1436+
fn offset_from(dst: *const u8, original: *const u8) -> usize {
1437+
debug_assert!(dst >= original);
1438+
1439+
dst as usize - original as usize
1440+
}
1441+
12941442
// compile-fails
12951443

12961444
/// ```compile_fail

src/bytes_mut.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -868,7 +868,7 @@ impl BytesMut {
868868
/// # SAFETY
869869
///
870870
/// The caller must ensure that `count` <= `self.cap`.
871-
unsafe fn advance_unchecked(&mut self, count: usize) {
871+
pub(crate) unsafe fn advance_unchecked(&mut self, count: usize) {
872872
// Setting the start to 0 is a no-op, so return early if this is the
873873
// case.
874874
if count == 0 {
@@ -1713,6 +1713,7 @@ unsafe fn rebuild_vec(ptr: *mut u8, mut len: usize, mut cap: usize, off: usize)
17131713
static SHARED_VTABLE: Vtable = Vtable {
17141714
clone: shared_v_clone,
17151715
to_vec: shared_v_to_vec,
1716+
to_mut: shared_v_to_mut,
17161717
is_unique: crate::bytes::shared_is_unique,
17171718
drop: shared_v_drop,
17181719
};
@@ -1747,6 +1748,35 @@ unsafe fn shared_v_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> V
17471748
}
17481749
}
17491750

1751+
unsafe fn shared_v_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
1752+
let shared: *mut Shared = data.load(Ordering::Relaxed).cast();
1753+
1754+
if (*shared).is_unique() {
1755+
let shared = &mut *shared;
1756+
1757+
// The capacity is always the original capacity of the buffer
1758+
// minus the offset from the start of the buffer
1759+
let v = &mut shared.vec;
1760+
let v_capacity = v.capacity();
1761+
let v_ptr = v.as_mut_ptr();
1762+
let offset = offset_from(ptr as *mut u8, v_ptr);
1763+
let cap = v_capacity - offset;
1764+
1765+
let ptr = vptr(ptr as *mut u8);
1766+
1767+
BytesMut {
1768+
ptr,
1769+
len,
1770+
cap,
1771+
data: shared,
1772+
}
1773+
} else {
1774+
let v = slice::from_raw_parts(ptr, len).to_vec();
1775+
release_shared(shared);
1776+
BytesMut::from_vec(v)
1777+
}
1778+
}
1779+
17501780
unsafe fn shared_v_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
17511781
data.with_mut(|shared| {
17521782
release_shared(*shared as *mut Shared);

0 commit comments

Comments
 (0)