Skip to content

Commit 33dc559

Browse files
dignifiedquireArqu
andauthored
feat(iroh)!: make blobs::read_at more flexible (#2756)
Introduces `ReadAtLen` which allows to specify the behaviour of `read_at` more closely - `All` - reads until the end (formerly `None`) - `Exact(size)` - Reads exactly this many bytes (formerly `Some(size)` - `AtMost(size)` - Reads at most this many bytes, but allows for a shorter response if not enough data is available Closes #2738 ## Breaking Changes - `iroh::client::Blobs::read_at` and `read_at_to_bytes` now take `ReadAtLen` instead of `Option<usize>` --------- Co-authored-by: Asmir Avdicevic <[email protected]>
1 parent 0953263 commit 33dc559

File tree

4 files changed

+95
-27
lines changed

4 files changed

+95
-27
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,4 +362,4 @@ jobs:
362362
steps:
363363
- uses: actions/checkout@v4
364364
- run: pip install --user codespell[toml]
365-
- run: codespell --ignore-words-list=ans,crate,inout,ratatui,ser,stayin,swarmin,worl --skip=CHANGELOG.md
365+
- run: codespell --ignore-words-list=ans,atmost,crate,inout,ratatui,ser,stayin,swarmin,worl --skip=CHANGELOG.md

iroh/src/client/blobs.rs

Lines changed: 86 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl Client {
159159
/// Read offset + len from a single blob.
160160
///
161161
/// If `len` is `None` it will read the full blob.
162-
pub async fn read_at(&self, hash: Hash, offset: u64, len: Option<usize>) -> Result<Reader> {
162+
pub async fn read_at(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Reader> {
163163
Reader::from_rpc_read_at(&self.rpc, hash, offset, len).await
164164
}
165165

@@ -178,12 +178,7 @@ impl Client {
178178
/// Read all bytes of single blob at `offset` for length `len`.
179179
///
180180
/// This allocates a buffer for the full length.
181-
pub async fn read_at_to_bytes(
182-
&self,
183-
hash: Hash,
184-
offset: u64,
185-
len: Option<usize>,
186-
) -> Result<Bytes> {
181+
pub async fn read_at_to_bytes(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result<Bytes> {
187182
Reader::from_rpc_read_at(&self.rpc, hash, offset, len)
188183
.await?
189184
.read_to_bytes()
@@ -484,6 +479,28 @@ impl SimpleStore for Client {
484479
}
485480
}
486481

482+
/// Defines the way to read bytes.
483+
#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy)]
484+
pub enum ReadAtLen {
485+
/// Reads all available bytes.
486+
#[default]
487+
All,
488+
/// Reads exactly this many bytes, erroring out on larger or smaller.
489+
Exact(u64),
490+
/// Reads at most this many bytes.
491+
AtMost(u64),
492+
}
493+
494+
impl ReadAtLen {
495+
pub(crate) fn as_result_len(&self, size_remaining: u64) -> u64 {
496+
match self {
497+
ReadAtLen::All => size_remaining,
498+
ReadAtLen::Exact(len) => *len,
499+
ReadAtLen::AtMost(len) => std::cmp::min(*len, size_remaining),
500+
}
501+
}
502+
}
503+
487504
/// Whether to wrap the added data in a collection.
488505
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
489506
pub enum WrapOption {
@@ -872,14 +889,14 @@ impl Reader {
872889
}
873890

874891
pub(crate) async fn from_rpc_read(rpc: &RpcClient, hash: Hash) -> anyhow::Result<Self> {
875-
Self::from_rpc_read_at(rpc, hash, 0, None).await
892+
Self::from_rpc_read_at(rpc, hash, 0, ReadAtLen::All).await
876893
}
877894

878895
async fn from_rpc_read_at(
879896
rpc: &RpcClient,
880897
hash: Hash,
881898
offset: u64,
882-
len: Option<usize>,
899+
len: ReadAtLen,
883900
) -> anyhow::Result<Self> {
884901
let stream = rpc
885902
.server_streaming(ReadAtRequest { hash, offset, len })
@@ -898,9 +915,7 @@ impl Reader {
898915
Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")),
899916
Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))),
900917
});
901-
let len = len
902-
.map(|l| l as u64)
903-
.unwrap_or_else(|| size.value() - offset);
918+
let len = len.as_result_len(size.value() - offset);
904919
Ok(Self::new(size.value(), len, is_complete, Box::pin(stream)))
905920
}
906921

@@ -1120,66 +1135,114 @@ mod tests {
11201135
assert_eq!(&res, &buf[..]);
11211136

11221137
// Read at smaller than blob_get_chunk_size
1123-
let res = client.blobs().read_at_to_bytes(hash, 0, Some(100)).await?;
1138+
let res = client
1139+
.blobs()
1140+
.read_at_to_bytes(hash, 0, ReadAtLen::Exact(100))
1141+
.await?;
11241142
assert_eq!(res.len(), 100);
11251143
assert_eq!(&res[..], &buf[0..100]);
11261144

1127-
let res = client.blobs().read_at_to_bytes(hash, 20, Some(120)).await?;
1145+
let res = client
1146+
.blobs()
1147+
.read_at_to_bytes(hash, 20, ReadAtLen::Exact(120))
1148+
.await?;
11281149
assert_eq!(res.len(), 120);
11291150
assert_eq!(&res[..], &buf[20..140]);
11301151

11311152
// Read at equal to blob_get_chunk_size
11321153
let res = client
11331154
.blobs()
1134-
.read_at_to_bytes(hash, 0, Some(1024 * 64))
1155+
.read_at_to_bytes(hash, 0, ReadAtLen::Exact(1024 * 64))
11351156
.await?;
11361157
assert_eq!(res.len(), 1024 * 64);
11371158
assert_eq!(&res[..], &buf[0..1024 * 64]);
11381159

11391160
let res = client
11401161
.blobs()
1141-
.read_at_to_bytes(hash, 20, Some(1024 * 64))
1162+
.read_at_to_bytes(hash, 20, ReadAtLen::Exact(1024 * 64))
11421163
.await?;
11431164
assert_eq!(res.len(), 1024 * 64);
11441165
assert_eq!(&res[..], &buf[20..(20 + 1024 * 64)]);
11451166

11461167
// Read at larger than blob_get_chunk_size
11471168
let res = client
11481169
.blobs()
1149-
.read_at_to_bytes(hash, 0, Some(10 + 1024 * 64))
1170+
.read_at_to_bytes(hash, 0, ReadAtLen::Exact(10 + 1024 * 64))
11501171
.await?;
11511172
assert_eq!(res.len(), 10 + 1024 * 64);
11521173
assert_eq!(&res[..], &buf[0..(10 + 1024 * 64)]);
11531174

11541175
let res = client
11551176
.blobs()
1156-
.read_at_to_bytes(hash, 20, Some(10 + 1024 * 64))
1177+
.read_at_to_bytes(hash, 20, ReadAtLen::Exact(10 + 1024 * 64))
11571178
.await?;
11581179
assert_eq!(res.len(), 10 + 1024 * 64);
11591180
assert_eq!(&res[..], &buf[20..(20 + 10 + 1024 * 64)]);
11601181

11611182
// full length
1162-
let res = client.blobs().read_at_to_bytes(hash, 20, None).await?;
1183+
let res = client
1184+
.blobs()
1185+
.read_at_to_bytes(hash, 20, ReadAtLen::All)
1186+
.await?;
11631187
assert_eq!(res.len(), 1024 * 128 - 20);
11641188
assert_eq!(&res[..], &buf[20..]);
11651189

11661190
// size should be total
1167-
let reader = client.blobs().read_at(hash, 0, Some(20)).await?;
1191+
let reader = client
1192+
.blobs()
1193+
.read_at(hash, 0, ReadAtLen::Exact(20))
1194+
.await?;
11681195
assert_eq!(reader.size(), 1024 * 128);
11691196
assert_eq!(reader.response_size, 20);
11701197

1198+
// last chunk - exact
1199+
let res = client
1200+
.blobs()
1201+
.read_at_to_bytes(hash, 1024 * 127, ReadAtLen::Exact(1024))
1202+
.await?;
1203+
assert_eq!(res.len(), 1024);
1204+
assert_eq!(res, &buf[1024 * 127..]);
1205+
1206+
// last chunk - open
1207+
let res = client
1208+
.blobs()
1209+
.read_at_to_bytes(hash, 1024 * 127, ReadAtLen::All)
1210+
.await?;
1211+
assert_eq!(res.len(), 1024);
1212+
assert_eq!(res, &buf[1024 * 127..]);
1213+
1214+
// last chunk - larger
1215+
let mut res = client
1216+
.blobs()
1217+
.read_at(hash, 1024 * 127, ReadAtLen::AtMost(2048))
1218+
.await?;
1219+
assert_eq!(res.size, 1024 * 128);
1220+
assert_eq!(res.response_size, 1024);
1221+
let res = res.read_to_bytes().await?;
1222+
assert_eq!(res.len(), 1024);
1223+
assert_eq!(res, &buf[1024 * 127..]);
1224+
11711225
// out of bounds - too long
1172-
let res = client.blobs().read_at(hash, 0, Some(1024 * 128 + 1)).await;
1226+
let res = client
1227+
.blobs()
1228+
.read_at(hash, 0, ReadAtLen::Exact(1024 * 128 + 1))
1229+
.await;
11731230
let err = res.unwrap_err();
11741231
assert!(err.to_string().contains("out of bound"));
11751232

11761233
// out of bounds - offset larger than blob
1177-
let res = client.blobs().read_at(hash, 1024 * 128 + 1, None).await;
1234+
let res = client
1235+
.blobs()
1236+
.read_at(hash, 1024 * 128 + 1, ReadAtLen::All)
1237+
.await;
11781238
let err = res.unwrap_err();
11791239
assert!(err.to_string().contains("out of range"));
11801240

11811241
// out of bounds - offset + length too large
1182-
let res = client.blobs().read_at(hash, 1024 * 127, Some(1025)).await;
1242+
let res = client
1243+
.blobs()
1244+
.read_at(hash, 1024 * 127, ReadAtLen::Exact(1025))
1245+
.await;
11831246
let err = res.unwrap_err();
11841247
assert!(err.to_string().contains("out of bound"));
11851248

iroh/src/node/rpc.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1238,7 +1238,10 @@ impl<D: BaoStore> Handler<D> {
12381238
size
12391239
);
12401240

1241-
let len = req.len.unwrap_or((size.value() - req.offset) as usize);
1241+
let len: usize = req
1242+
.len
1243+
.as_result_len(size.value() - req.offset)
1244+
.try_into()?;
12421245

12431246
anyhow::ensure!(
12441247
req.offset + len as u64 <= size.value(),

iroh/src/rpc_protocol/blobs.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use nested_enum_utils::enum_conversions;
2121
use quic_rpc_derive::rpc_requests;
2222
use serde::{Deserialize, Serialize};
2323

24-
use crate::client::blobs::{BlobInfo, BlobStatus, DownloadMode, IncompleteBlobInfo, WrapOption};
24+
use crate::client::blobs::{
25+
BlobInfo, BlobStatus, DownloadMode, IncompleteBlobInfo, ReadAtLen, WrapOption,
26+
};
2527

2628
use super::RpcService;
2729

@@ -190,7 +192,7 @@ pub struct ReadAtRequest {
190192
/// Offset to start reading at
191193
pub offset: u64,
192194
/// Length of the data to get
193-
pub len: Option<usize>,
195+
pub len: ReadAtLen,
194196
}
195197

196198
/// Response to [`ReadAtRequest`]

0 commit comments

Comments
 (0)