Skip to content

Commit 407c9f1

Browse files
authored
Refactor Shred::Payload to use Bytes (#7049)
* Refactor Shred::Payload to use Bytes
1 parent 13f5a3f commit 407c9f1

File tree

15 files changed

+212
-105
lines changed

15 files changed

+212
-105
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/window_service.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use {
1111
result::{Error, Result},
1212
},
1313
agave_feature_set as feature_set,
14-
assert_matches::debug_assert_matches,
1514
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
1615
rayon::{prelude::*, ThreadPool},
1716
solana_clock::{Slot, DEFAULT_MS_PER_SLOT},
@@ -208,9 +207,6 @@ where
208207
}
209208
if repair {
210209
ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed);
211-
debug_assert_matches!(shred, shred::Payload::Unique(_));
212-
} else {
213-
debug_assert_matches!(shred, shred::Payload::Shared(_));
214210
}
215211
let shred = Shred::new_from_serialized_shred(shred).ok()?;
216212
Some((Cow::Owned(shred), repair))

gossip/src/duplicate_shred.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,15 +1102,15 @@ pub(crate) mod tests {
11021102
new_rand_coding_shreds(&mut rng, next_shred_index, 10, &shredder, &leader)[0].clone();
11031103
let mut data_shred_different_retransmitter_payload = data_shred.clone().into_payload();
11041104
shred::layout::set_retransmitter_signature(
1105-
&mut data_shred_different_retransmitter_payload,
1105+
&mut data_shred_different_retransmitter_payload.as_mut(),
11061106
&Signature::new_unique(),
11071107
)
11081108
.unwrap();
11091109
let data_shred_different_retransmitter =
11101110
Shred::new_from_serialized_shred(data_shred_different_retransmitter_payload).unwrap();
11111111
let mut coding_shred_different_retransmitter_payload = coding_shred.clone().into_payload();
11121112
shred::layout::set_retransmitter_signature(
1113-
&mut coding_shred_different_retransmitter_payload,
1113+
&mut coding_shred_different_retransmitter_payload.as_mut(),
11141114
&Signature::new_unique(),
11151115
)
11161116
.unwrap();

ledger-tool/src/output.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use {
2222
solana_ledger::{
2323
blockstore::{Blockstore, BlockstoreError},
2424
blockstore_meta::{DuplicateSlotProof, ErasureMeta},
25-
shred::{self, Shred, ShredType},
25+
shred::{Shred, ShredType},
2626
},
2727
solana_native_token::lamports_to_sol,
2828
solana_pubkey::Pubkey,
@@ -410,7 +410,7 @@ impl From<Shred> for CliDuplicateShred {
410410
merkle_root: shred.merkle_root().ok(),
411411
chained_merkle_root: shred.chained_merkle_root().ok(),
412412
last_in_slot: shred.last_in_slot(),
413-
payload: shred::Payload::unwrap_or_clone(shred.payload().clone()),
413+
payload: Vec::from(shred.into_payload().bytes),
414414
}
415415
}
416416
}

ledger/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ anyhow = { workspace = true }
3232
assert_matches = { workspace = true }
3333
bincode = { workspace = true }
3434
bitflags = { workspace = true, features = ["serde"] }
35+
bytes = { workspace = true }
3536
bzip2 = { workspace = true }
3637
chrono = { workspace = true, features = ["default", "serde"] }
3738
chrono-humanize = { workspace = true }

ledger/src/blockstore.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,13 +1004,11 @@ impl Blockstore {
10041004
match shred.shred_type() {
10051005
ShredType::Code => {
10061006
// Don't need Arc overhead here!
1007-
debug_assert_matches!(shred.payload(), shred::Payload::Unique(_));
10081007
recovered_shreds.push(shred.into_payload());
10091008
None
10101009
}
10111010
ShredType::Data => {
10121011
// Verify that the cloning is cheap here.
1013-
debug_assert_matches!(shred.payload(), shred::Payload::Shared(_));
10141012
recovered_shreds.push(shred.payload().clone());
10151013
Some(shred)
10161014
}

ledger/src/shred.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,7 +1711,10 @@ mod tests {
17111711
let mut shred = shred.into_payload();
17121712
let mut signature = [0u8; SIGNATURE_BYTES];
17131713
rng.fill(&mut signature[..]);
1714-
let out = layout::set_retransmitter_signature(&mut shred, &Signature::from(signature));
1714+
let out = layout::set_retransmitter_signature(
1715+
&mut shred.as_mut(),
1716+
&Signature::from(signature),
1717+
);
17151718
if chained && is_last_in_slot {
17161719
assert_matches!(out, Ok(()));
17171720
} else {
@@ -1760,7 +1763,7 @@ mod tests {
17601763
// (ignoring retransmitter signature) are duplicate.
17611764
for shred in &shreds {
17621765
let mut other = shred.payload().clone();
1763-
other[90] = other[90].wrapping_add(1);
1766+
other.as_mut()[90] = other[90].wrapping_add(1);
17641767
let other = Shred::new_from_serialized_shred(other).unwrap();
17651768
assert_ne!(shred.payload(), other.payload());
17661769
assert_eq!(

ledger/src/shred/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ macro_rules! impl_shred_common {
4747

4848
#[inline]
4949
fn set_signature(&mut self, signature: Signature) {
50-
self.payload[..SIZE_OF_SIGNATURE].copy_from_slice(signature.as_ref());
50+
self.payload.as_mut()[..SIZE_OF_SIGNATURE].copy_from_slice(signature.as_ref());
5151
self.common_header.signature = signature;
5252
}
5353
};

ledger/src/shred/legacy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl<'a> Shred<'a> for ShredData {
5353
where
5454
Payload: From<T>,
5555
{
56-
let mut payload = Payload::from(payload);
56+
let mut payload = Payload::from(payload).into_bytes_mut();
5757
let mut cursor = Cursor::new(&payload[..]);
5858
let common_header: ShredCommonHeader = deserialize_from_with_limit(&mut cursor)?;
5959
if common_header.shred_variant != ShredVariant::LegacyData {
@@ -71,7 +71,7 @@ impl<'a> Shred<'a> for ShredData {
7171
let shred = Self {
7272
common_header,
7373
data_header,
74-
payload,
74+
payload: payload.into(),
7575
};
7676
shred.sanitize().map(|_| shred)
7777
}

ledger/src/shred/merkle.rs

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use {
77
common::impl_shred_common,
88
dispatch,
99
merkle_tree::*,
10-
payload::Payload,
10+
payload::{Payload, PayloadMutGuard},
1111
shred_code, shred_data,
1212
traits::{
1313
Shred as ShredTrait, ShredCode as ShredCodeTrait, ShredData as ShredDataTrait,
@@ -77,7 +77,7 @@ pub(crate) enum Shred {
7777

7878
impl Shred {
7979
dispatch!(fn erasure_shard_index(&self) -> Result<usize, Error>);
80-
dispatch!(fn erasure_shard_mut(&mut self) -> Result<&mut [u8], Error>);
80+
dispatch!(fn erasure_shard_mut(&mut self) -> Result<PayloadMutGuard<Range<usize>>, Error>);
8181
dispatch!(fn merkle_node(&self) -> Result<Hash, Error>);
8282
dispatch!(fn sanitize(&self) -> Result<(), Error>);
8383
dispatch!(fn set_chained_merkle_root(&mut self, chained_merkle_root: &Hash) -> Result<(), Error>);
@@ -362,7 +362,8 @@ macro_rules! impl_merkle_shred {
362362

363363
fn set_chained_merkle_root(&mut self, chained_merkle_root: &Hash) -> Result<(), Error> {
364364
let offset = self.chained_merkle_root_offset()?;
365-
let Some(buffer) = self.payload.get_mut(offset..offset + SIZE_OF_MERKLE_ROOT) else {
365+
let Some(mut buffer) = self.payload.get_mut(offset..offset + SIZE_OF_MERKLE_ROOT)
366+
else {
366367
return Err(Error::InvalidPayloadSize(self.payload.len()));
367368
};
368369
buffer.copy_from_slice(chained_merkle_root.as_ref());
@@ -395,11 +396,11 @@ macro_rules! impl_merkle_shred {
395396
{
396397
let proof_size = self.proof_size()?;
397398
let proof_offset = self.proof_offset()?;
398-
let mut cursor = Cursor::new(
399-
self.payload
400-
.get_mut(proof_offset..)
401-
.ok_or(Error::InvalidProofSize(proof_size))?,
402-
);
399+
let mut slice = self
400+
.payload
401+
.get_mut(proof_offset..)
402+
.ok_or(Error::InvalidProofSize(proof_size))?;
403+
let mut cursor = Cursor::new(slice.as_mut());
403404
let proof_size = usize::from(proof_size);
404405
proof.into_iter().enumerate().try_for_each(|(k, entry)| {
405406
if k >= proof_size {
@@ -425,7 +426,7 @@ macro_rules! impl_merkle_shred {
425426

426427
fn set_retransmitter_signature(&mut self, signature: &Signature) -> Result<(), Error> {
427428
let offset = self.retransmitter_signature_offset()?;
428-
let Some(buffer) = self.payload.get_mut(offset..offset + SIZE_OF_SIGNATURE) else {
429+
let Some(mut buffer) = self.payload.get_mut(offset..offset + SIZE_OF_SIGNATURE) else {
429430
return Err(Error::InvalidPayloadSize(self.payload.len()));
430431
};
431432
buffer.copy_from_slice(signature.as_ref());
@@ -481,7 +482,7 @@ macro_rules! impl_merkle_shred {
481482
}
482483

483484
// Returns the erasure coded slice as a mutable reference.
484-
fn erasure_shard_mut(&mut self) -> Result<&mut [u8], Error> {
485+
fn erasure_shard_mut(&mut self) -> Result<PayloadMutGuard<Range<usize>>, Error> {
485486
let offsets = self.erasure_shard_offsets()?;
486487
let payload_size = self.payload.len();
487488
self.payload
@@ -800,14 +801,16 @@ pub(super) fn recover(
800801
batch
801802
};
802803
// Obtain erasure encoded shards from the shreds and reconstruct shreds.
803-
let mut shards: Vec<(&mut [u8], bool)> = shreds
804+
let mut shards = shreds
804805
.iter_mut()
805806
.zip(&mask)
806807
.map(|(shred, &mask)| Ok((shred.erasure_shard_mut()?, mask)))
807-
.collect::<Result<_, Error>>()?;
808+
.collect::<Result<Vec<_>, Error>>()?;
808809
reed_solomon_cache
809810
.get(num_data_shreds, num_coding_shreds)?
810811
.reconstruct(&mut shards)?;
812+
// Drop the mut guards to allow further mutation below.
813+
drop(shards);
811814
// Verify and sanitize recovered shreds, re-compute the Merkle tree and set
812815
// the merkle proof on the recovered shreds.
813816
let nodes = shreds
@@ -955,7 +958,7 @@ fn make_stub_shred(
955958
// while their payload is sent to retransmit-stage. Using a shared
956959
// payload between the two concurrent paths will reduce allocations
957960
// and memcopies.
958-
payload: Payload::from(std::sync::Arc::new(payload)),
961+
payload: Payload::from(payload),
959962
})
960963
};
961964
if let Some(chained_merkle_root) = chained_merkle_root {
@@ -1231,11 +1234,11 @@ fn finish_erasure_batch(
12311234
fn write_headers(shred: &mut Shred) -> Result<(), bincode::Error> {
12321235
match shred {
12331236
Shred::ShredCode(shred) => bincode::serialize_into(
1234-
&mut shred.payload[..],
1237+
&mut shred.payload.as_mut()[..],
12351238
&(&shred.common_header, &shred.coding_header),
12361239
),
12371240
Shred::ShredData(shred) => bincode::serialize_into(
1238-
&mut shred.payload[..],
1241+
&mut shred.payload.as_mut()[..],
12391242
&(&shred.common_header, &shred.data_header),
12401243
),
12411244
}
@@ -1267,7 +1270,7 @@ fn finish_erasure_batch(
12671270
shreds
12681271
.iter_mut()
12691272
.map(Shred::erasure_shard_mut)
1270-
.collect::<Result<Vec<&mut [u8]>, _>>()?,
1273+
.collect::<Result<Vec<_>, _>>()?,
12711274
)?;
12721275
// Set the chained_merkle_root for each shred.
12731276
if let Some(chained_merkle_root) = chained_merkle_root {
@@ -1614,12 +1617,6 @@ mod test {
16141617
}
16151618
});
16161619
assert_eq!(recovered_shreds, removed_shreds);
1617-
for shred in recovered_shreds {
1618-
match shred.shred_type() {
1619-
ShredType::Code => assert_matches!(shred.payload(), Payload::Unique(_)),
1620-
ShredType::Data => assert_matches!(shred.payload(), Payload::Shared(_)),
1621-
}
1622-
}
16231620
}
16241621
}
16251622

@@ -1913,9 +1910,7 @@ mod test {
19131910
})
19141911
.collect();
19151912
assert_eq!(recovered_data_shreds.len(), data_shreds.len());
1916-
for shred in &recovered_data_shreds {
1917-
assert_matches!(shred.payload(), Payload::Shared(_));
1918-
}
1913+
19191914
for (shred, other) in recovered_data_shreds.into_iter().zip(data_shreds) {
19201915
match shred {
19211916
Shred::ShredCode(_) => panic!("Invalid shred type!"),

0 commit comments

Comments
 (0)