Skip to content

Commit 9a55122

Browse files
rklaehnmatheus23
andauthored
feat(iroh)!: Blob batch PR, attempt 3 (#2545)
## Description This is the third attempt to add a batch API for adding blobs. Previous one was #2339 The basic idea is the following: all changes to the store happen in the context of a _batch_. All write operations within a batch produce temp tags. These temp tags are scoped to the batch and keep the data alive as long as the batch exists. At some point, the API user has to upgrade one or more temp tags to permanent tags. All non-batch operations would long term be implemented in terms of batch operations. In a second step, the following rpc calls would be replaced by their batch equivalent. - AddStream - AddPath - CreateCollection The third one is very nice, since it means that the notion of a collection (as in a special kind of hashseq) no longer has to even exist in the node code. ## Breaking Changes - iroh::client::blobs::BlobStatus has a new case NotFound - iroh::client::blobs::BlobStatus::Partial: size is now a BaoBlobSize instead of a u64 All other public changes are adding of new APIs. ## Notes & open questions Note: in the previous version I had an optimisation to avoid storing TempTags in the case where there are multiple TempTags with the same hash. I removed this to keep things simple. We can add it back later. ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. --------- Co-authored-by: Philipp Krüger <[email protected]>
1 parent 168fa5b commit 9a55122

File tree

16 files changed

+1235
-38
lines changed

16 files changed

+1235
-38
lines changed

iroh-blobs/src/provider.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,30 @@ pub enum AddProgress {
159159
Abort(RpcError),
160160
}
161161

162+
/// Progress updates for the batch add operation.
163+
#[derive(Debug, Serialize, Deserialize)]
164+
pub enum BatchAddPathProgress {
165+
/// An item was found with the given size
166+
Found {
167+
/// The size of the entry in bytes.
168+
size: u64,
169+
},
170+
/// We got progress ingesting the item.
171+
Progress {
172+
/// The offset of the progress, in bytes.
173+
offset: u64,
174+
},
175+
/// We are done, and the hash is `hash`.
176+
Done {
177+
/// The hash of the entry.
178+
hash: Hash,
179+
},
180+
/// We got an error and need to abort.
181+
///
182+
/// This will be the last message in the stream.
183+
Abort(RpcError),
184+
}
185+
162186
/// Read the request from the getter.
163187
///
164188
/// Will fail if there is an error while reading, if the reader

iroh-blobs/src/store/fs.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -757,13 +757,6 @@ impl Store {
757757
Ok(self.0.dump().await?)
758758
}
759759

760-
/// Ensure that all operations before the sync are processed and persisted.
761-
///
762-
/// This is done by closing any open write transaction.
763-
pub async fn sync(&self) -> io::Result<()> {
764-
Ok(self.0.sync().await?)
765-
}
766-
767760
/// Import from a v0 or v1 flat store, for backwards compatibility.
768761
#[deprecated(
769762
since = "0.23.0",
@@ -1419,6 +1412,10 @@ impl super::Store for Store {
14191412
self.0.temp.temp_tag(value)
14201413
}
14211414

1415+
async fn sync(&self) -> io::Result<()> {
1416+
Ok(self.0.sync().await?)
1417+
}
1418+
14221419
async fn shutdown(&self) {
14231420
self.0.shutdown().await;
14241421
}

iroh-blobs/src/store/mem.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,10 @@ impl super::Store for Store {
243243
}
244244

245245
async fn shutdown(&self) {}
246+
247+
async fn sync(&self) -> io::Result<()> {
248+
Ok(())
249+
}
246250
}
247251

248252
#[derive(Debug, Default)]

iroh-blobs/src/store/readonly_mem.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,4 +333,8 @@ impl super::Store for Store {
333333
}
334334

335335
async fn shutdown(&self) {}
336+
337+
async fn sync(&self) -> io::Result<()> {
338+
Ok(())
339+
}
336340
}

iroh-blobs/src/store/traits.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,9 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
400400
/// Shutdown the store.
401401
fn shutdown(&self) -> impl Future<Output = ()> + Send;
402402

403+
/// Sync the store.
404+
fn sync(&self) -> impl Future<Output = io::Result<()>> + Send;
405+
403406
/// Validate the database
404407
///
405408
/// This will check that the file and outboard content is correct for all complete
@@ -703,7 +706,7 @@ pub enum ImportProgress {
703706
/// does not make any sense. E.g. an in memory implementation will always have
704707
/// to copy the file into memory. Also, a disk based implementation might choose
705708
/// to copy small files even if the mode is `Reference`.
706-
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
709+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
707710
pub enum ImportMode {
708711
/// This mode will copy the file into the database before hashing.
709712
///

iroh-blobs/src/util.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,11 @@ impl TempTag {
208208
self.inner.format
209209
}
210210

211+
/// The hash and format of the pinned item
212+
pub fn hash_and_format(&self) -> HashAndFormat {
213+
self.inner
214+
}
215+
211216
/// Keep the item alive until the end of the process
212217
pub fn leak(mut self) {
213218
// set the liveness tracker to None, so that the refcount is not decreased

iroh-cli/src/commands/blobs.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,10 +370,15 @@ impl BlobCommands {
370370

371371
let (blob_status, size) = match (status, format) {
372372
(BlobStatus::Complete { size }, BlobFormat::Raw) => ("blob", size),
373-
(BlobStatus::Partial { size }, BlobFormat::Raw) => ("incomplete blob", size),
373+
(BlobStatus::Partial { size }, BlobFormat::Raw) => {
374+
("incomplete blob", size.value())
375+
}
374376
(BlobStatus::Complete { size }, BlobFormat::HashSeq) => ("collection", size),
375377
(BlobStatus::Partial { size }, BlobFormat::HashSeq) => {
376-
("incomplete collection", size)
378+
("incomplete collection", size.value())
379+
}
380+
(BlobStatus::NotFound, _) => {
381+
return Err(anyhow!("blob is missing"));
377382
}
378383
};
379384
println!(

iroh/src/client.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ pub mod gossip;
2323
pub mod node;
2424
pub mod tags;
2525

26+
/// Iroh rpc connection - boxed so that we can have a concrete type.
27+
pub(crate) type RpcConnection = quic_rpc::transport::boxed::Connection<RpcService>;
28+
2629
// Keep this type exposed, otherwise every occurrence of `RpcClient` in the API
2730
// will show up as `RpcClient<RpcService, Connection<RpcService>>` in the docs.
2831
/// Iroh rpc client - boxed so that we can have a concrete type.

iroh/src/client/blobs.rs

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use std::{
5555
task::{Context, Poll},
5656
};
5757

58-
use anyhow::{anyhow, Result};
58+
use anyhow::{anyhow, Context as _, Result};
5959
use bytes::Bytes;
6060
use futures_lite::{Stream, StreamExt};
6161
use futures_util::SinkExt;
@@ -65,7 +65,7 @@ use iroh_blobs::{
6565
export::ExportProgress as BytesExportProgress,
6666
format::collection::{Collection, SimpleStore},
6767
get::db::DownloadProgress as BytesDownloadProgress,
68-
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
68+
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
6969
util::SetTagOption,
7070
BlobFormat, Hash, Tag,
7171
};
@@ -77,12 +77,14 @@ use serde::{Deserialize, Serialize};
7777
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
7878
use tokio_util::io::{ReaderStream, StreamReader};
7979
use tracing::warn;
80+
mod batch;
81+
pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch};
8082

8183
use crate::rpc_protocol::blobs::{
82-
AddPathRequest, AddStreamRequest, AddStreamUpdate, ConsistencyCheckRequest,
83-
CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadRequest,
84-
ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest, ReadAtResponse,
85-
ValidateRequest,
84+
AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse,
85+
BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse,
86+
DeleteRequest, DownloadRequest, ExportRequest, ListIncompleteRequest, ListRequest,
87+
ReadAtRequest, ReadAtResponse, ValidateRequest,
8688
};
8789
use crate::rpc_protocol::node::StatusRequest;
8890

@@ -102,6 +104,38 @@ impl<'a> From<&'a Iroh> for &'a RpcClient {
102104
}
103105

104106
impl Client {
107+
/// Check if a blob is completely stored on the node.
108+
///
109+
/// Note that this will return false for blobs that are partially stored on
110+
/// the node.
111+
pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
112+
let status = self.rpc.rpc(BlobStatusRequest { hash }).await??;
113+
Ok(status.0)
114+
}
115+
116+
/// Check if a blob is completely stored on the node.
117+
///
118+
/// This is just a convenience wrapper around `status` that returns a boolean.
119+
pub async fn has(&self, hash: Hash) -> Result<bool> {
120+
match self.status(hash).await {
121+
Ok(BlobStatus::Complete { .. }) => Ok(true),
122+
Ok(_) => Ok(false),
123+
Err(err) => Err(err),
124+
}
125+
}
126+
127+
/// Create a new batch for adding data.
128+
///
129+
/// A batch is a context in which temp tags are created and data is added to the node. Temp tags
130+
/// are automatically deleted when the batch is dropped, leading to the data being garbage collected
131+
/// unless a permanent tag is created for it.
132+
pub async fn batch(&self) -> Result<Batch> {
133+
let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
134+
let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??;
135+
let rpc = self.rpc.clone();
136+
Ok(Batch::new(batch, rpc, updates, 1024))
137+
}
138+
105139
/// Stream the contents of a a single blob.
106140
///
107141
/// Returns a [`Reader`], which can report the size of the blob before reading it.
@@ -424,17 +458,6 @@ impl Client {
424458
Ok(ticket)
425459
}
426460

427-
/// Get the status of a blob.
428-
pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
429-
// TODO: this could be implemented more efficiently
430-
let reader = self.read(hash).await?;
431-
if reader.is_complete {
432-
Ok(BlobStatus::Complete { size: reader.size })
433-
} else {
434-
Ok(BlobStatus::Partial { size: reader.size })
435-
}
436-
}
437-
438461
fn tags_client(&self) -> tags::Client {
439462
tags::Client {
440463
rpc: self.rpc.clone(),
@@ -449,9 +472,10 @@ impl SimpleStore for Client {
449472
}
450473

451474
/// Whether to wrap the added data in a collection.
452-
#[derive(Debug, Serialize, Deserialize)]
475+
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
453476
pub enum WrapOption {
454477
/// Do not wrap the file or directory.
478+
#[default]
455479
NoWrap,
456480
/// Wrap the file or directory in a collection.
457481
Wrap {
@@ -461,12 +485,14 @@ pub enum WrapOption {
461485
}
462486

463487
/// Status information about a blob.
464-
#[derive(Debug, Clone, PartialEq, Eq)]
488+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
465489
pub enum BlobStatus {
490+
/// The blob is not stored at all.
491+
NotFound,
466492
/// The blob is only stored partially.
467493
Partial {
468494
/// The size of the currently stored partial blob.
469-
size: u64,
495+
size: BaoBlobSize,
470496
},
471497
/// The blob is stored completely.
472498
Complete {
@@ -943,7 +969,6 @@ pub enum DownloadMode {
943969
mod tests {
944970
use super::*;
945971

946-
use anyhow::Context as _;
947972
use iroh_blobs::hashseq::HashSeq;
948973
use iroh_net::NodeId;
949974
use rand::RngCore;

0 commit comments

Comments
 (0)