Skip to content

Commit 5aa3fb6

Browse files
authored
refactor(iroh): Modularize protocol (#2454)
## Description Use a newly created macro crate https://github.com/n0-computer/nested-enum-utils to allow for from/to conversions for deeply nested enums. This allows us to split the protocol enum into multiple parts without changing anything about the basic concept of how things are handled. ## Breaking Changes Weirdly, I think there should be none since the protocol is private. ## Notes & open questions Obvious next steps would be - [x] define the subsystem specific messages in submodules - [x] publish the macro crate - [x] (maybe) split up the handling of the different subsystems - [x] (maybe) remove the subsystem prefixes from the messages Not so obvious next steps (maybe next PR) - Allow moving the subsystem protocols to different crates. This is difficult because you can not set the target when generating the conversions. ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. - [x] All breaking changes documented.
1 parent bc0b397 commit 5aa3fb6

18 files changed

+1607
-1449
lines changed

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ iroh-base = { version = "0.19.0", path = "../iroh-base", features = ["key"] }
3131
iroh-io = { version = "0.6.0", features = ["stats"] }
3232
iroh-metrics = { version = "0.19.0", path = "../iroh-metrics", optional = true }
3333
iroh-net = { version = "0.19.0", path = "../iroh-net" }
34+
nested_enum_utils = "0.1.0"
3435
num_cpus = { version = "1.15.0" }
3536
portable-atomic = "1"
3637
iroh-docs = { version = "0.19.0", path = "../iroh-docs" }

iroh/src/client/authors.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ use futures_lite::{stream::StreamExt, Stream};
55
use iroh_docs::{Author, AuthorId};
66
use ref_cast::RefCast;
77

8-
use crate::rpc_protocol::{
9-
AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest, AuthorGetDefaultRequest,
10-
AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest,
8+
use crate::rpc_protocol::authors::{
9+
CreateRequest, DeleteRequest, ExportRequest, GetDefaultRequest, ImportRequest, ListRequest,
10+
SetDefaultRequest,
1111
};
1212

1313
use super::{flatten, RpcClient};
@@ -27,7 +27,7 @@ impl Client {
2727
///
2828
/// If you need only a single author, use [`Self::default`].
2929
pub async fn create(&self) -> Result<AuthorId> {
30-
let res = self.rpc.rpc(AuthorCreateRequest).await??;
30+
let res = self.rpc.rpc(CreateRequest).await??;
3131
Ok(res.author_id)
3232
}
3333

@@ -38,7 +38,7 @@ impl Client {
3838
///
3939
/// The default author can be set with [`Self::set_default`].
4040
pub async fn default(&self) -> Result<AuthorId> {
41-
let res = self.rpc.rpc(AuthorGetDefaultRequest).await??;
41+
let res = self.rpc.rpc(GetDefaultRequest).await??;
4242
Ok(res.author_id)
4343
}
4444

@@ -49,31 +49,29 @@ impl Client {
4949
/// On a persistent node, the author id will be saved to a file in the data directory and
5050
/// reloaded after a restart.
5151
pub async fn set_default(&self, author_id: AuthorId) -> Result<()> {
52-
self.rpc
53-
.rpc(AuthorSetDefaultRequest { author_id })
54-
.await??;
52+
self.rpc.rpc(SetDefaultRequest { author_id }).await??;
5553
Ok(())
5654
}
5755

5856
/// List document authors for which we have a secret key.
5957
pub async fn list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
60-
let stream = self.rpc.server_streaming(AuthorListRequest {}).await?;
58+
let stream = self.rpc.server_streaming(ListRequest {}).await?;
6159
Ok(flatten(stream).map(|res| res.map(|res| res.author_id)))
6260
}
6361

6462
/// Export the given author.
6563
///
6664
/// Warning: This contains sensitive data.
6765
pub async fn export(&self, author: AuthorId) -> Result<Option<Author>> {
68-
let res = self.rpc.rpc(AuthorExportRequest { author }).await??;
66+
let res = self.rpc.rpc(ExportRequest { author }).await??;
6967
Ok(res.author)
7068
}
7169

7270
/// Import the given author.
7371
///
7472
/// Warning: This contains sensitive data.
7573
pub async fn import(&self, author: Author) -> Result<()> {
76-
self.rpc.rpc(AuthorImportRequest { author }).await??;
74+
self.rpc.rpc(ImportRequest { author }).await??;
7775
Ok(())
7876
}
7977

@@ -83,7 +81,7 @@ impl Client {
8381
///
8482
/// Returns an error if attempting to delete the default author.
8583
pub async fn delete(&self, author: AuthorId) -> Result<()> {
86-
self.rpc.rpc(AuthorDeleteRequest { author }).await??;
84+
self.rpc.rpc(DeleteRequest { author }).await??;
8785
Ok(())
8886
}
8987
}

iroh/src/client/blobs.rs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use iroh_blobs::{
2020
format::collection::{Collection, SimpleStore},
2121
get::db::DownloadProgress as BytesDownloadProgress,
2222
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
23+
util::SetTagOption,
2324
BlobFormat, Hash, Tag,
2425
};
2526
use iroh_net::NodeAddr;
@@ -31,12 +32,13 @@ use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
3132
use tokio_util::io::{ReaderStream, StreamReader};
3233
use tracing::warn;
3334

34-
use crate::rpc_protocol::{
35-
BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest,
36-
BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest,
37-
BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest,
38-
CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, SetTagOption,
35+
use crate::rpc_protocol::blobs::{
36+
AddPathRequest, AddStreamRequest, AddStreamUpdate, ConsistencyCheckRequest,
37+
CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadRequest,
38+
ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest, ReadAtResponse,
39+
ValidateRequest,
3940
};
41+
use crate::rpc_protocol::node::StatusRequest;
4042

4143
use super::{flatten, tags, Iroh, RpcClient};
4244

@@ -110,7 +112,7 @@ impl Client {
110112
) -> Result<AddProgress> {
111113
let stream = self
112114
.rpc
113-
.server_streaming(BlobAddPathRequest {
115+
.server_streaming(AddPathRequest {
114116
path,
115117
in_place,
116118
tag,
@@ -158,12 +160,12 @@ impl Client {
158160
input: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
159161
tag: SetTagOption,
160162
) -> anyhow::Result<AddProgress> {
161-
let (mut sink, progress) = self.rpc.bidi(BlobAddStreamRequest { tag }).await?;
163+
let (mut sink, progress) = self.rpc.bidi(AddStreamRequest { tag }).await?;
162164
let mut input = input.map(|chunk| match chunk {
163-
Ok(chunk) => Ok(BlobAddStreamUpdate::Chunk(chunk)),
165+
Ok(chunk) => Ok(AddStreamUpdate::Chunk(chunk)),
164166
Err(err) => {
165167
warn!("Abort send, reason: failed to read from source stream: {err:?}");
166-
Ok(BlobAddStreamUpdate::Abort)
168+
Ok(AddStreamUpdate::Abort)
167169
}
168170
});
169171
tokio::spawn(async move {
@@ -205,7 +207,7 @@ impl Client {
205207
) -> Result<impl Stream<Item = Result<ValidateProgress>>> {
206208
let stream = self
207209
.rpc
208-
.server_streaming(BlobValidateRequest { repair })
210+
.server_streaming(ValidateRequest { repair })
209211
.await?;
210212
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
211213
}
@@ -219,7 +221,7 @@ impl Client {
219221
) -> Result<impl Stream<Item = Result<ConsistencyCheckProgress>>> {
220222
let stream = self
221223
.rpc
222-
.server_streaming(BlobConsistencyCheckRequest { repair })
224+
.server_streaming(ConsistencyCheckRequest { repair })
223225
.await?;
224226
Ok(stream.map(|r| r.map_err(anyhow::Error::from)))
225227
}
@@ -266,7 +268,7 @@ impl Client {
266268
} = opts;
267269
let stream = self
268270
.rpc
269-
.server_streaming(BlobDownloadRequest {
271+
.server_streaming(DownloadRequest {
270272
hash,
271273
format,
272274
nodes,
@@ -295,7 +297,7 @@ impl Client {
295297
format: ExportFormat,
296298
mode: ExportMode,
297299
) -> Result<ExportProgress> {
298-
let req = BlobExportRequest {
300+
let req = ExportRequest {
299301
hash,
300302
path: destination,
301303
format,
@@ -309,13 +311,13 @@ impl Client {
309311

310312
/// List all complete blobs.
311313
pub async fn list(&self) -> Result<impl Stream<Item = Result<BlobInfo>>> {
312-
let stream = self.rpc.server_streaming(BlobListRequest).await?;
314+
let stream = self.rpc.server_streaming(ListRequest).await?;
313315
Ok(flatten(stream))
314316
}
315317

316318
/// List all incomplete (partial) blobs.
317319
pub async fn list_incomplete(&self) -> Result<impl Stream<Item = Result<IncompleteBlobInfo>>> {
318-
let stream = self.rpc.server_streaming(BlobListIncompleteRequest).await?;
320+
let stream = self.rpc.server_streaming(ListIncompleteRequest).await?;
319321
Ok(flatten(stream))
320322
}
321323

@@ -354,7 +356,7 @@ impl Client {
354356

355357
/// Delete a blob.
356358
pub async fn delete_blob(&self, hash: Hash) -> Result<()> {
357-
self.rpc.rpc(BlobDeleteBlobRequest { hash }).await??;
359+
self.rpc.rpc(DeleteRequest { hash }).await??;
358360
Ok(())
359361
}
360362

@@ -365,7 +367,7 @@ impl Client {
365367
blob_format: BlobFormat,
366368
addr_options: AddrInfoOptions,
367369
) -> Result<BlobTicket> {
368-
let mut addr = self.rpc.rpc(NodeStatusRequest).await??.addr;
370+
let mut addr = self.rpc.rpc(StatusRequest).await??.addr;
369371
addr.apply_options(addr_options);
370372
let ticket = BlobTicket::new(addr, hash, blob_format).expect("correct ticket");
371373

@@ -791,19 +793,19 @@ impl Reader {
791793
len: Option<usize>,
792794
) -> anyhow::Result<Self> {
793795
let stream = rpc
794-
.server_streaming(BlobReadAtRequest { hash, offset, len })
796+
.server_streaming(ReadAtRequest { hash, offset, len })
795797
.await?;
796798
let mut stream = flatten(stream);
797799

798800
let (size, is_complete) = match stream.next().await {
799-
Some(Ok(BlobReadAtResponse::Entry { size, is_complete })) => (size, is_complete),
801+
Some(Ok(ReadAtResponse::Entry { size, is_complete })) => (size, is_complete),
800802
Some(Err(err)) => return Err(err),
801803
Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")),
802804
None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")),
803805
};
804806

805807
let stream = stream.map(|item| match item {
806-
Ok(BlobReadAtResponse::Data { chunk }) => Ok(chunk),
808+
Ok(ReadAtResponse::Data { chunk }) => Ok(chunk),
807809
Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")),
808810
Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))),
809811
});

0 commit comments

Comments
 (0)