From 3e907aafe2de00bb40557072033966f205b4f6e1 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 24 Mar 2025 08:33:47 -0500 Subject: [PATCH 1/7] feat: Use RepoType in Repo and impl DefaultStorage --- src/dag.rs | 7 +- src/ipns/mod.rs | 1 + src/lib.rs | 95 +++++++----- src/p2p/behaviour.rs | 3 +- src/p2p/bitswap.rs | 13 +- src/p2p/bitswap/sessions.rs | 13 +- src/p2p/mod.rs | 3 +- src/refs.rs | 16 +- src/repo/blockstore/arc.rs | 42 ++++++ src/repo/blockstore/either.rs | 75 ++++++++++ src/repo/blockstore/flatfs.rs | 2 +- src/repo/blockstore/memory.rs | 1 + src/repo/blockstore/mod.rs | 3 + src/repo/datastore/arc.rs | 66 ++++++++ src/repo/datastore/either.rs | 124 +++++++++++++++ src/repo/datastore/flatfs.rs | 2 +- src/repo/datastore/memory.rs | 4 +- src/repo/datastore/mod.rs | 3 + src/repo/default_impl.rs | 268 +++++++++++++++++++++++++++++++++ src/repo/lock.rs | 38 ++++- src/repo/mod.rs | 273 +++++++++++++++++++++------------- src/task.rs | 8 +- src/unixfs/add.rs | 11 +- src/unixfs/cat.rs | 13 +- src/unixfs/get.rs | 11 +- src/unixfs/ls.rs | 12 +- 26 files changed, 916 insertions(+), 191 deletions(-) create mode 100644 src/repo/blockstore/arc.rs create mode 100644 src/repo/blockstore/either.rs create mode 100644 src/repo/datastore/arc.rs create mode 100644 src/repo/datastore/either.rs create mode 100644 src/repo/default_impl.rs diff --git a/src/dag.rs b/src/dag.rs index f5b90b626..ede625f6d 100644 --- a/src/dag.rs +++ b/src/dag.rs @@ -3,6 +3,7 @@ use crate::block::BlockCodec; use crate::error::Error; use crate::path::{IpfsPath, PathRoot, SlashedPath}; +use crate::repo::default_impl::DefaultStorage; use crate::repo::Repo; use crate::{Block, Ipfs}; use bytes::Bytes; @@ -179,11 +180,11 @@ impl RawResolveLocalError { #[derive(Clone, Debug)] pub struct IpldDag { ipfs: Option, - repo: Repo, + repo: Repo, } -impl From for IpldDag { - fn from(repo: Repo) -> Self { +impl From> for IpldDag { + fn from(repo: Repo) -> Self { IpldDag { ipfs: None, repo } } } diff --git a/src/ipns/mod.rs b/src/ipns/mod.rs index c9daa8df4..94944a394 100644 --- a/src/ipns/mod.rs +++ b/src/ipns/mod.rs @@ -5,6 +5,7 @@ use std::borrow::Borrow; use crate::p2p::DnsResolver; use crate::path::{IpfsPath, PathRoot}; +use crate::repo::DataStore; use crate::Ipfs; mod dnslink; diff --git a/src/lib.rs b/src/lib.rs index 18ae334dd..2cf801a84 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,7 +61,8 @@ use p2p::{ RelayConfig, RequestResponseConfig, SwarmConfig, TransportConfig, }; use repo::{ - BlockStore, DataStore, GCConfig, GCTrigger, Lock, RepoFetch, RepoInsertPin, RepoRemovePin, + default_impl::DefaultStorage, BlockStore, DataStore, GCConfig, GCTrigger, Lock, RepoFetch, + RepoInsertPin, RepoRemovePin, }; use tracing::Span; @@ -87,8 +88,8 @@ pub use self::{ use async_rt::{AbortableJoinHandle, CommunicationTask}; use ipld_core::cid::Cid; use ipld_core::ipld::Ipld; -use std::borrow::Borrow; use std::convert::Infallible; +use std::{borrow::Borrow, path::PathBuf}; use std::{ collections::{BTreeSet, HashMap, HashSet}, fmt, @@ -182,7 +183,11 @@ struct IpfsOptions { /// /// It is **not** recommended to set this to IPFS_PATH without first at least backing up your /// existing repository. - pub ipfs_path: StorageType, + pub ipfs_path: Option, + + /// Name of the namespace used for indexeddb + #[cfg(target_arch="wasm32")] + pub namespace: Option, /// Nodes used as bootstrap peers. pub bootstrap: Vec, @@ -285,7 +290,9 @@ pub enum RepoProvider { impl Default for IpfsOptions { fn default() -> Self { Self { - ipfs_path: StorageType::Memory, + ipfs_path: None, + #[cfg(target_arch="wasm32")] + namespace: None, bootstrap: Default::default(), relay_server_config: Default::default(), kad_configuration: Either::Left(Default::default()), @@ -333,7 +340,7 @@ impl fmt::Debug for IpfsOptions { #[allow(clippy::type_complexity)] pub struct Ipfs { span: Span, - repo: Repo, + repo: Repo, key: Keypair, keystore: Keystore, identify_conf: IdentifyConfiguration, @@ -557,7 +564,7 @@ pub struct UninitializedIpfs + Send> { keys: Option, options: IpfsOptions, fdlimit: Option, - repo_handle: Option, + repo_handle: Repo, local_external_addr: bool, swarm_event: Option>, // record_validators: HashMap bool + Sync + Send>>, @@ -583,7 +590,7 @@ impl + Send> UninitializedIpfs { keys: None, options: Default::default(), fdlimit: None, - repo_handle: None, + repo_handle: Repo::new_memory(), // record_validators: Default::default(), record_key_validator: Default::default(), local_external_addr: false, @@ -604,10 +611,10 @@ impl + Send> UninitializedIpfs { } /// Set storage type for the repo. - pub fn set_storage_type(mut self, storage_type: StorageType) -> Self { - self.options.ipfs_path = storage_type; - self - } + // pub fn set_storage_type(mut self, storage_type: StorageType) -> Self { + // self.options.ipfs_path = storage_type; + // self + // } /// Adds a listening address pub fn add_listening_addr(mut self, addr: Multiaddr) -> Self { @@ -792,7 +799,15 @@ impl + Send> UninitializedIpfs { #[cfg(not(target_arch = "wasm32"))] pub fn set_path>(mut self, path: P) -> Self { let path = path.as_ref().to_path_buf(); - self.options.ipfs_path = StorageType::Disk(path); + self.options.ipfs_path = Some(path); + self + } + + /// Sets a namespace + #[cfg(target_arch = "wasm32")] + pub fn set_namespace>(mut self, ns: S) -> Self { + let ns = ns.as_ref().to_string(); + self.options.namespace = Some(ns); self } @@ -853,8 +868,8 @@ impl + Send> UninitializedIpfs { } /// Set block and data repo - pub fn set_repo(mut self, repo: &Repo) -> Self { - self.repo_handle = Some(repo.clone()); + pub fn set_repo(mut self, repo: &Repo) -> Self { + self.repo_handle = Repo::clone(repo); self } @@ -940,23 +955,34 @@ impl + Send> UninitializedIpfs { // instruments the IpfsFuture, the background task. let swarm_span = tracing::trace_span!(parent: &root_span, "swarm"); - let repo = match repo_handle { - Some(repo) => { - if repo.is_online() { - anyhow::bail!("Repo is already initialized"); - } - repo - } - None => { - #[cfg(not(target_arch = "wasm32"))] - if let StorageType::Disk(path) = &options.ipfs_path { + let mut repo = repo_handle; + + if repo.is_online() { + anyhow::bail!("Repo is already initialized"); + } + + #[cfg(not(target_arch = "wasm32"))] + { + repo = match &options.ipfs_path { + Some(path) => { if !path.is_dir() { tokio::fs::create_dir_all(path).await?; } + Repo::::new_fs(path) } - Repo::new(&mut options.ipfs_path) - } - }; + None => repo, + }; + } + + #[cfg(target_arch = "wasm32")] + { + repo = match &options.namespace { + Some(ns) => { + Repo::::new_idb(Some(ns.clone())) + } + None => repo, + }; + } repo.init().instrument(init_span.clone()).await?; @@ -1039,7 +1065,7 @@ impl + Send> UninitializedIpfs { let gc_handle = gc_config.map(|config| { async_rt::task::spawn_abortable({ - let repo = repo.clone(); + let repo = Repo::clone(&repo); async move { let GCConfig { duration, trigger } = config; let use_config_timer = duration != Duration::ZERO; @@ -1176,7 +1202,7 @@ impl Ipfs { } /// Return an [`Repo`] to access the internal repo of the node - pub fn repo(&self) -> &Repo { + pub fn repo(&self) -> &Repo { &self.repo } @@ -1191,13 +1217,13 @@ impl Ipfs { } /// Puts a block into the ipfs repo. - pub fn put_block(&self, block: &Block) -> RepoPutBlock { + pub fn put_block(&self, block: &Block) -> RepoPutBlock { self.repo.put_block(block).span(self.span.clone()) } /// Retrieves a block from the local blockstore, or starts fetching from the network or join an /// already started fetch. - pub fn get_block(&self, cid: impl Borrow) -> RepoGetBlock { + pub fn get_block(&self, cid: impl Borrow) -> RepoGetBlock { self.repo.get_block(cid).span(self.span.clone()) } @@ -1239,7 +1265,7 @@ impl Ipfs { /// If a recursive `insert_pin` operation is interrupted because of a crash or the crash /// prevents from synchronizing the data store to disk, this will leave the system in an inconsistent /// state. The remedy is to re-pin recursive pins. - pub fn insert_pin(&self, cid: impl Borrow) -> RepoInsertPin { + pub fn insert_pin(&self, cid: impl Borrow) -> RepoInsertPin { self.repo().pin(cid).span(self.span.clone()) } @@ -1249,7 +1275,7 @@ impl Ipfs { /// /// Unpinning an indirectly pinned Cid is not possible other than through its recursively /// pinned tree roots. - pub fn remove_pin(&self, cid: impl Borrow) -> RepoRemovePin { + pub fn remove_pin(&self, cid: impl Borrow) -> RepoRemovePin { self.repo().remove_pin(cid).span(self.span.clone()) } @@ -2131,7 +2157,7 @@ impl Ipfs { } /// Fetches the block, and, if set, recursively walk the graph loading all the blocks to the blockstore. - pub fn fetch(&self, cid: &Cid) -> RepoFetch { + pub fn fetch(&self, cid: &Cid) -> RepoFetch { self.repo.fetch(cid).span(self.span.clone()) } @@ -3085,6 +3111,7 @@ pub use node::Node; /// Node module provides an easy to use interface used in `tests/`. mod node { + use super::*; /// Node encapsulates everything to setup a testing instance so that multi-node tests become diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index ad304c205..bcbeb04c9 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -9,6 +9,7 @@ use either::Either; use serde::{Deserialize, Serialize}; use crate::error::Error; +use crate::repo::default_impl::DefaultStorage; use crate::{IntoAddPeerOpt, IpfsOptions}; use crate::repo::Repo; @@ -362,7 +363,7 @@ where pub(crate) fn new( keypair: &Keypair, options: &IpfsOptions, - repo: &Repo, + repo: &Repo, custom: Option, ) -> Result<(Self, Option), Error> { let bootstrap = options.bootstrap.clone(); diff --git a/src/p2p/bitswap.rs b/src/p2p/bitswap.rs index b3a8dfe0f..fe7451a3d 100644 --- a/src/p2p/bitswap.rs +++ b/src/p2p/bitswap.rs @@ -35,7 +35,10 @@ mod bitswap_pb { } } -use crate::{repo::Repo, Block}; +use crate::{ + repo::{default_impl::DefaultStorage, Repo}, + Block, +}; use self::{ message::{BitswapMessage, BitswapRequest, BitswapResponse, RequestType}, @@ -62,14 +65,14 @@ pub struct Behaviour { events: VecDeque::ToSwarm, THandlerInEvent>>, connections: HashMap>, blacklist_connections: HashMap>, - store: Repo, + store: Repo, want_session: StreamMap, have_session: StreamMap, waker: Option, } impl Behaviour { - pub fn new(store: &Repo) -> Self { + pub fn new(store: &Repo) -> Self { Self { events: Default::default(), connections: Default::default(), @@ -559,7 +562,7 @@ impl NetworkBehaviour for Behaviour { mod test { use std::time::Duration; - use crate::block::BlockCodec; + use crate::{block::BlockCodec, repo::default_impl::DefaultStorage}; use futures::StreamExt; use ipld_core::cid::Cid; use libp2p::{ @@ -954,7 +957,7 @@ mod test { Ok(()) } - async fn build_swarm() -> (PeerId, Multiaddr, Swarm, Repo) { + async fn build_swarm() -> (PeerId, Multiaddr, Swarm, Repo) { let repo = Repo::new_memory(); let mut swarm = SwarmBuilder::with_new_identity() diff --git a/src/p2p/bitswap/sessions.rs b/src/p2p/bitswap/sessions.rs index 358cf99e1..507878b88 100644 --- a/src/p2p/bitswap/sessions.rs +++ b/src/p2p/bitswap/sessions.rs @@ -14,7 +14,10 @@ use ipld_core::cid::Cid; use libp2p::PeerId; use std::fmt::Debug; -use crate::{repo::Repo, Block}; +use crate::{ + repo::{default_impl::DefaultStorage, Repo}, + Block, +}; const CAP_THRESHOLD: usize = 100; @@ -91,7 +94,7 @@ pub struct WantSession { discovery: WantDiscovery, received: bool, waker: Option, - repo: Repo, + repo: Repo, state: WantSessionState, timeout: Option, discovery_timeout: Duration, @@ -100,7 +103,7 @@ pub struct WantSession { } impl WantSession { - pub fn new(repo: &Repo, cid: Cid, timeout: Option) -> Self { + pub fn new(repo: &Repo, cid: Cid, timeout: Option) -> Self { Self { cid, wants: Default::default(), @@ -540,13 +543,13 @@ pub struct HaveSession { want: HashMap, send_dont_have: HashSet, have: Option, - repo: Repo, + repo: Repo, waker: Option, state: HaveSessionState, } impl HaveSession { - pub fn new(repo: &Repo, cid: Cid) -> Self { + pub fn new(repo: &Repo, cid: Cid) -> Self { let mut session = Self { cid, want: HashMap::new(), diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index 03cd635ce..b18277c13 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -1,5 +1,6 @@ //! P2P handling for IPFS nodes. use crate::error::Error; +use crate::repo::default_impl::DefaultStorage; use crate::repo::Repo; use crate::{IpfsOptions, TTransportFn}; use std::convert::TryInto; @@ -226,7 +227,7 @@ impl Default for SwarmConfig { pub(crate) fn create_swarm( keypair: &Keypair, options: &IpfsOptions, - repo: &Repo, + repo: &Repo, span: Span, (custom, custom_transport): (Option, Option), ) -> Result, Error> diff --git a/src/refs.rs b/src/refs.rs index b11451367..90d8a2497 100644 --- a/src/refs.rs +++ b/src/refs.rs @@ -2,6 +2,7 @@ use crate::block::BlockCodec; use crate::repo::Repo; +use crate::repo::RepoTypes; use async_stream::stream; use futures::stream::Stream; use ipld_core::{cid::Cid, ipld::Ipld}; @@ -107,13 +108,14 @@ impl IpldRefs { self } - pub fn refs_of_resolved<'a, MaybeOwned, Iter>( + pub fn refs_of_resolved<'a, S, MaybeOwned, Iter>( self, repo: MaybeOwned, iplds: Iter, ) -> impl Stream> + Send + 'a where - MaybeOwned: Borrow + Send + 'a, + S: RepoTypes, + MaybeOwned: Borrow> + Send + 'a, Iter: IntoIterator + Send + 'a, { iplds_refs_inner(repo, iplds, self) @@ -137,14 +139,15 @@ impl IpldRefs { /// /// Depending on how this function is called, the lifetime will be tied to the lifetime of given /// `&Ipfs` or `'static` when given ownership of `Ipfs`. -pub fn iplds_refs<'a, MaybeOwned, Iter>( +pub fn iplds_refs<'a, S, MaybeOwned, Iter>( repo: MaybeOwned, iplds: Iter, max_depth: Option, unique: bool, ) -> impl Stream> + Send + 'a where - MaybeOwned: Borrow + Send + 'a, + S: RepoTypes, + MaybeOwned: Borrow> + Send + 'a, Iter: IntoIterator + Send + 'a, { use futures::stream::TryStreamExt; @@ -165,13 +168,14 @@ where }) } -fn iplds_refs_inner<'a, MaybeOwned, Iter>( +fn iplds_refs_inner<'a, S, MaybeOwned, Iter>( repo: MaybeOwned, iplds: Iter, opts: IpldRefs, ) -> impl Stream> + Send + 'a where - MaybeOwned: Borrow + Send + 'a, + S: RepoTypes, + MaybeOwned: Borrow> + Send + 'a, Iter: IntoIterator, { let mut work = VecDeque::new(); diff --git a/src/repo/blockstore/arc.rs b/src/repo/blockstore/arc.rs new file mode 100644 index 000000000..3e9aac672 --- /dev/null +++ b/src/repo/blockstore/arc.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use ipld_core::cid::Cid; + +use crate::repo::{BlockPut, BlockStore}; + +use crate::error::Error; +use crate::Block; + + +#[async_trait] +impl BlockStore for Arc { + async fn init(&self) -> Result<(), Error> { + (**self).init().await + } + async fn contains(&self, cid: &Cid) -> Result { + (**self).contains(cid).await + } + async fn get(&self, cid: &Cid) -> Result, Error> { + (**self).get(cid).await + } + async fn size(&self, cid: &[Cid]) -> Result, Error> { + (**self).size(cid).await + } + async fn total_size(&self) -> Result { + (**self).total_size().await + } + async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> { + (**self).put(block).await + } + async fn remove(&self, cid: &Cid) -> Result<(), Error> { + (**self).remove(cid).await + } + async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid> { + (**self).remove_many(blocks).await + } + async fn list(&self) -> BoxStream<'static, Cid> { + (**self).list().await + } +} \ No newline at end of file diff --git a/src/repo/blockstore/either.rs b/src/repo/blockstore/either.rs new file mode 100644 index 000000000..4504430ba --- /dev/null +++ b/src/repo/blockstore/either.rs @@ -0,0 +1,75 @@ +use crate::error::Error; +use crate::{ + repo::{BlockPut, BlockStore}, + Block, +}; +use async_trait::async_trait; +use either::Either; +use futures::stream::BoxStream; +use ipld_core::cid::Cid; + +#[async_trait] +impl BlockStore for Either { + async fn init(&self) -> Result<(), Error> { + match self { + Either::Left(ref blockstore) => blockstore.init().await, + Either::Right(ref blockstore) => blockstore.init().await, + } + } + + async fn contains(&self, cid: &Cid) -> Result { + match self { + Either::Left(ref blockstore) => blockstore.contains(cid).await, + Either::Right(ref blockstore) => blockstore.contains(cid).await, + } + } + + async fn get(&self, cid: &Cid) -> Result, Error> { + match self { + Either::Left(ref blockstore) => blockstore.get(cid).await, + Either::Right(ref blockstore) => blockstore.get(cid).await, + } + } + + async fn size(&self, cid: &[Cid]) -> Result, Error> { + match self { + Either::Left(ref blockstore) => blockstore.size(cid).await, + Either::Right(ref blockstore) => blockstore.size(cid).await, + } + } + + async fn total_size(&self) -> Result { + match self { + Either::Left(ref blockstore) => blockstore.total_size().await, + Either::Right(ref blockstore) => blockstore.total_size().await, + } + } + + async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> { + match self { + Either::Left(ref blockstore) => blockstore.put(block).await, + Either::Right(ref blockstore) => blockstore.put(block).await, + } + } + + async fn remove(&self, cid: &Cid) -> Result<(), Error> { + match self { + Either::Left(ref blockstore) => blockstore.remove(cid).await, + Either::Right(ref blockstore) => blockstore.remove(cid).await, + } + } + + async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid> { + match self { + Either::Left(ref blockstore) => blockstore.remove_many(blocks).await, + Either::Right(ref blockstore) => blockstore.remove_many(blocks).await, + } + } + + async fn list(&self) -> BoxStream<'static, Cid> { + match self { + Either::Left(ref blockstore) => blockstore.list().await, + Either::Right(ref blockstore) => blockstore.list().await, + } + } +} diff --git a/src/repo/blockstore/flatfs.rs b/src/repo/blockstore/flatfs.rs index aaccbc536..47bff43f7 100644 --- a/src/repo/blockstore/flatfs.rs +++ b/src/repo/blockstore/flatfs.rs @@ -16,7 +16,7 @@ use tokio_stream::wrappers::ReadDirStream; /// File system backed block store. /// /// For information on path mangling, please see `block_path` and `filestem_to_block_cid`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FsBlockStore { inner: Arc>, } diff --git a/src/repo/blockstore/memory.rs b/src/repo/blockstore/memory.rs index 5d8a47f6e..f8867a1ac 100644 --- a/src/repo/blockstore/memory.rs +++ b/src/repo/blockstore/memory.rs @@ -16,6 +16,7 @@ use std::sync::Arc; /// Describes an in-memory block store. /// /// Blocks are stored as a `HashMap` of the `Cid` and `Block`. +#[derive(Clone)] pub struct MemBlockStore { inner: Arc>, } diff --git a/src/repo/blockstore/mod.rs b/src/repo/blockstore/mod.rs index fdaed1fc0..e88a827da 100644 --- a/src/repo/blockstore/mod.rs +++ b/src/repo/blockstore/mod.rs @@ -3,3 +3,6 @@ pub mod flatfs; #[cfg(target_arch = "wasm32")] pub mod idb; pub mod memory; + +pub mod either; +pub mod arc; \ No newline at end of file diff --git a/src/repo/datastore/arc.rs b/src/repo/datastore/arc.rs new file mode 100644 index 000000000..1446f6083 --- /dev/null +++ b/src/repo/datastore/arc.rs @@ -0,0 +1,66 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use ipld_core::cid::Cid; +use crate::repo::{DataStore, PinStore, References}; +use crate::error::Error; +use crate::{PinKind, PinMode}; + +#[async_trait] +impl DataStore for Arc { + async fn init(&self) -> Result<(), Error> { + (**self).init().await + } + + async fn contains(&self, key: &[u8]) -> Result { + (**self).contains(key).await + } + + async fn get(&self, key: &[u8]) -> Result>, Error> { + (**self).get(key).await + } + + async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { + (**self).put(key, value).await + } + + async fn remove(&self, key: &[u8]) -> Result<(), Error> { + (**self).remove(key).await + } + + async fn iter(&self) -> BoxStream<'static, (Vec, Vec)> { + (**self).iter().await + } +} + +#[async_trait] +impl PinStore for Arc

{ + async fn is_pinned(&self, block: &Cid) -> Result { + (**self).is_pinned(block).await + } + + async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> { + (**self).insert_direct_pin(target).await + } + + async fn insert_recursive_pin(&self, target: &Cid, referenced: References<'_>) -> Result<(), Error> { + (**self).insert_recursive_pin(target, referenced).await + } + + async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> { + (**self).remove_direct_pin(target).await + } + + async fn remove_recursive_pin(&self, target: &Cid, referenced: References<'_>) -> Result<(), Error> { + (**self).remove_recursive_pin(target, referenced).await + } + + async fn list(&self, mode: Option) -> BoxStream<'static, Result<(Cid, PinMode), Error>> { + (**self).list(mode).await + } + + async fn query(&self, ids: Vec, requirement: Option) -> Result)>, Error> { + (**self).query(ids, requirement).await + } +} \ No newline at end of file diff --git a/src/repo/datastore/either.rs b/src/repo/datastore/either.rs new file mode 100644 index 000000000..92b7116ff --- /dev/null +++ b/src/repo/datastore/either.rs @@ -0,0 +1,124 @@ +use async_trait::async_trait; +use either::Either; +use futures::stream::BoxStream; +use ipld_core::cid::Cid; + +use crate::error::Error; +use crate::repo::{DataStore, PinStore, References}; +use crate::{PinKind, PinMode}; + +#[async_trait] +impl DataStore for Either { + async fn init(&self) -> Result<(), Error> { + match self { + Either::Left(ref datastore) => datastore.init().await, + Either::Right(ref datastore) => datastore.init().await, + } + } + + async fn contains(&self, key: &[u8]) -> Result { + match self { + Either::Left(ref datastore) => datastore.contains(key).await, + Either::Right(ref datastore) => datastore.contains(key).await, + } + } + + async fn get(&self, key: &[u8]) -> Result>, Error> { + match self { + Either::Left(ref datastore) => datastore.get(key).await, + Either::Right(ref datastore) => datastore.get(key).await, + } + } + + async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { + match self { + Either::Left(ref datastore) => datastore.put(key, value).await, + Either::Right(ref datastore) => datastore.put(key, value).await, + } + } + + async fn remove(&self, key: &[u8]) -> Result<(), Error> { + match self { + Either::Left(ref datastore) => datastore.remove(key).await, + Either::Right(ref datastore) => datastore.remove(key).await, + } + } + + async fn iter(&self) -> BoxStream<'static, (Vec, Vec)> { + match self { + Either::Left(ref datastore) => datastore.iter().await, + Either::Right(ref datastore) => datastore.iter().await, + } + } +} + +#[async_trait] +impl PinStore for Either { + async fn is_pinned(&self, block: &Cid) -> Result { + match self { + Either::Left(ref datastore) => datastore.is_pinned(block).await, + Either::Right(ref datastore) => datastore.is_pinned(block).await, + } + } + + async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> { + match self { + Either::Left(ref datastore) => datastore.insert_direct_pin(target).await, + Either::Right(ref datastore) => datastore.insert_direct_pin(target).await, + } + } + + async fn insert_recursive_pin( + &self, + target: &Cid, + referenced: References<'_>, + ) -> Result<(), Error> { + match self { + Either::Left(ref datastore) => datastore.insert_recursive_pin(target, referenced).await, + Either::Right(ref datastore) => { + datastore.insert_recursive_pin(target, referenced).await + } + } + } + + async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> { + match self { + Either::Left(ref datastore) => datastore.remove_direct_pin(target).await, + Either::Right(ref datastore) => datastore.remove_direct_pin(target).await, + } + } + + async fn remove_recursive_pin( + &self, + target: &Cid, + referenced: References<'_>, + ) -> Result<(), Error> { + match self { + Either::Left(ref datastore) => datastore.remove_recursive_pin(target, referenced).await, + Either::Right(ref datastore) => { + datastore.remove_recursive_pin(target, referenced).await + } + } + } + + async fn list( + &self, + mode: Option, + ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> { + match self { + Either::Left(ref datastore) => datastore.list(mode).await, + Either::Right(ref datastore) => datastore.list(mode).await, + } + } + + async fn query( + &self, + ids: Vec, + requirement: Option, + ) -> Result)>, Error> { + match self { + Either::Left(ref datastore) => datastore.query(ids, requirement).await, + Either::Right(ref datastore) => datastore.query(ids, requirement).await, + } + } +} diff --git a/src/repo/datastore/flatfs.rs b/src/repo/datastore/flatfs.rs index 135505124..bcacdd02d 100644 --- a/src/repo/datastore/flatfs.rs +++ b/src/repo/datastore/flatfs.rs @@ -20,7 +20,7 @@ use tokio_util::either::Either; /// their indirect descendants. Pin files are separated by their file extensions. /// /// When modifying, single lock is used. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FsDataStore { /// The base directory under which we have a sharded directory structure, and the individual /// blocks are stored under the shard. See unixfs/examples/cat.rs for read example. diff --git a/src/repo/datastore/memory.rs b/src/repo/datastore/memory.rs index b2fda2170..e4b235c82 100644 --- a/src/repo/datastore/memory.rs +++ b/src/repo/datastore/memory.rs @@ -14,9 +14,9 @@ use std::collections::HashMap; use std::sync::Arc; /// Describes an in-memory `DataStore`. -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct MemDataStore { - inner: Mutex, Vec>>, + inner: Arc, Vec>>>, // this could also be PinDocument however doing any serialization allows to see the required // error types easier pin: Arc, Vec>>>, diff --git a/src/repo/datastore/mod.rs b/src/repo/datastore/mod.rs index 84dc68ecf..95e32ed06 100644 --- a/src/repo/datastore/mod.rs +++ b/src/repo/datastore/mod.rs @@ -4,3 +4,6 @@ pub mod memory; #[cfg(target_arch = "wasm32")] pub mod idb; + +pub mod either; +pub mod arc; \ No newline at end of file diff --git a/src/repo/default_impl.rs b/src/repo/default_impl.rs new file mode 100644 index 000000000..b85bf2c93 --- /dev/null +++ b/src/repo/default_impl.rs @@ -0,0 +1,268 @@ +#[cfg(not(target_arch = "wasm32"))] +use crate::repo::blockstore::flatfs::FsBlockStore; +#[cfg(target_arch = "wasm32")] +use crate::repo::blockstore::idb::IdbBlockStore; +use crate::repo::blockstore::memory::MemBlockStore; +#[cfg(not(target_arch = "wasm32"))] +use crate::repo::datastore::flatfs::FsDataStore; +#[cfg(target_arch = "wasm32")] +use crate::repo::datastore::idb::IdbDataStore; +use crate::repo::datastore::memory::MemDataStore; +use crate::repo::{lock, RepoTypes}; +use crate::Block; + +#[cfg(target_arch = "wasm32")] +use std::sync::Arc; + +use async_trait::async_trait; +use either::Either; +use futures::stream::BoxStream; +use ipld_core::cid::Cid; + +use crate::error::Error; + +use super::{ + BlockPut, BlockStore, DataStore, Lock, LockError, PinKind, PinMode, PinStore, References, +}; + +#[derive(Debug)] +#[cfg(not(target_arch = "wasm32"))] +pub struct DefaultStorage { + blockstore: Either, + datastore: Either, + lockfile: Either, +} + +#[derive(Debug)] +#[cfg(target_arch = "wasm32")] +pub struct DefaultStorage { + blockstore: Either>, + datastore: Either>, + lockfile: Either, +} + +impl Default for DefaultStorage { + fn default() -> Self { + Self { + blockstore: Either::Left(MemBlockStore::new(Default::default())), + datastore: Either::Left(MemDataStore::new(Default::default())), + lockfile: Either::Left(lock::MemLock), + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +impl DefaultStorage { + /// Set path to trigger persistent storage + #[allow(dead_code)] + pub(crate) fn set_path(&mut self, path: impl AsRef) { + let path = path.as_ref().to_path_buf(); + self.blockstore = Either::Right(FsBlockStore::new(path.clone())); + self.datastore = Either::Right(FsDataStore::new(path.clone())); + self.lockfile = Either::Right(lock::FsLock::new(path.clone())); + } + + pub(crate) fn set_blockstore_path(&mut self, path: impl AsRef) { + let path = path.as_ref().to_path_buf(); + self.blockstore = Either::Right(FsBlockStore::new(path.clone())); + } + + pub(crate) fn set_datastore_path(&mut self, path: impl AsRef) { + let path = path.as_ref().to_path_buf(); + self.datastore = Either::Right(FsDataStore::new(path.clone())); + } + + pub(crate) fn set_lockfile(&mut self, path: impl AsRef) { + let path = path.as_ref().to_path_buf(); + self.lockfile = Either::Right(lock::FsLock::new(path.clone())); + } + + #[allow(dead_code)] + pub(crate) fn remove_paths(&mut self) { + self.blockstore = Either::Left(MemBlockStore::new(Default::default())); + self.datastore = Either::Left(MemDataStore::new(Default::default())); + self.lockfile = Either::Left(lock::MemLock); + } +} + +#[cfg(target_arch = "wasm32")] +impl DefaultStorage { + /// Set path to trigger persistent storage + pub(crate) fn set_namespace(&mut self, namespace: impl Into>) { + let namespace = namespace.into(); + self.blockstore = Either::Right(Arc::new(IdbBlockStore::new(namespace.clone()))); + self.datastore = Either::Right(Arc::new(IdbDataStore::new(namespace.clone()))); + self.lockfile = Either::Right(lock::MemLock); + } + + #[allow(dead_code)] + pub(crate) fn remove_namespace(&mut self) { + self.blockstore = Either::Left(MemBlockStore::new(Default::default())); + self.datastore = Either::Left(MemDataStore::new(Default::default())); + self.lockfile = Either::Left(lock::MemLock); + } +} + +impl Clone for DefaultStorage { + fn clone(&self) -> Self { + Self { + blockstore: self.blockstore.clone(), + datastore: self.datastore.clone(), + lockfile: self.lockfile.clone(), + } + } +} + +impl RepoTypes for DefaultStorage { + type TBlockStore = DefaultStorage; + type TDataStore = DefaultStorage; + type TLock = DefaultStorage; + + fn blockstore(&self) -> &Self::TBlockStore { + self + } + + fn blockstore_mut(&mut self) -> &mut Self::TBlockStore { + self + } + + fn datastore(&self) -> &Self::TDataStore { + self + } + + fn datastore_mut(&mut self) -> &mut Self::TDataStore { + self + } + + fn lock(&self) -> &Self::TLock { + self + } + + fn lock_mut(&mut self) -> &mut Self::TLock { + self + } +} + +impl Unpin for DefaultStorage {} + +#[async_trait] +impl BlockStore for DefaultStorage { + async fn init(&self) -> Result<(), Error> { + self.blockstore.init().await + } + + async fn contains(&self, cid: &Cid) -> Result { + self.blockstore.contains(cid).await + } + + async fn get(&self, cid: &Cid) -> Result, Error> { + self.blockstore.get(cid).await + } + + async fn size(&self, cid: &[Cid]) -> Result, Error> { + self.blockstore.size(cid).await + } + + async fn total_size(&self) -> Result { + self.blockstore.total_size().await + } + + async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> { + self.blockstore.put(block).await + } + + async fn remove(&self, cid: &Cid) -> Result<(), Error> { + self.blockstore.remove(cid).await + } + + async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid> { + self.blockstore.remove_many(blocks).await + } + + async fn list(&self) -> BoxStream<'static, Cid> { + self.blockstore.list().await + } +} + +#[async_trait] +impl DataStore for DefaultStorage { + async fn init(&self) -> Result<(), Error> { + self.datastore.init().await + } + + async fn contains(&self, key: &[u8]) -> Result { + self.datastore.contains(key).await + } + + async fn get(&self, key: &[u8]) -> Result>, Error> { + self.datastore.get(key).await + } + + async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { + self.datastore.put(key, value).await + } + + async fn remove(&self, key: &[u8]) -> Result<(), Error> { + self.datastore.remove(key).await + } + + async fn iter(&self) -> BoxStream<'static, (Vec, Vec)> { + self.datastore.iter().await + } +} + +#[async_trait] +impl PinStore for DefaultStorage { + async fn is_pinned(&self, block: &Cid) -> Result { + self.datastore.is_pinned(block).await + } + + async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> { + self.datastore.insert_direct_pin(target).await + } + + async fn insert_recursive_pin( + &self, + target: &Cid, + referenced: References<'_>, + ) -> Result<(), Error> { + self.datastore + .insert_recursive_pin(target, referenced) + .await + } + + async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> { + self.datastore.remove_direct_pin(target).await + } + + async fn remove_recursive_pin( + &self, + target: &Cid, + referenced: References<'_>, + ) -> Result<(), Error> { + self.datastore + .remove_recursive_pin(target, referenced) + .await + } + + async fn list( + &self, + mode: Option, + ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> { + self.datastore.list(mode).await + } + + async fn query( + &self, + ids: Vec, + requirement: Option, + ) -> Result)>, Error> { + self.datastore.query(ids, requirement).await + } +} + +impl Lock for DefaultStorage { + fn try_exclusive(&self) -> Result<(), LockError> { + self.lockfile.try_exclusive() + } +} diff --git a/src/repo/lock.rs b/src/repo/lock.rs index 2d238a34c..70d8de6c1 100644 --- a/src/repo/lock.rs +++ b/src/repo/lock.rs @@ -2,11 +2,22 @@ //! //! Consists of [`FsDataStore`] and [`FsBlockStore`]. +#[cfg(not(target_arch = "wasm32"))] +use std::sync::Arc; + +use either::Either; + use super::{Lock, LockError}; #[cfg(not(target_arch = "wasm32"))] -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct FsLock { + inner: Arc, +} + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Debug)] +struct InnerFsLock { file: parking_lot::Mutex>, path: std::path::PathBuf, state: parking_lot::Mutex, @@ -23,9 +34,11 @@ enum State { impl FsLock { pub fn new(path: std::path::PathBuf) -> Self { Self { - file: parking_lot::Mutex::new(None), - path, - state: parking_lot::Mutex::new(State::Unlocked), + inner: Arc::new(InnerFsLock { + file: parking_lot::Mutex::new(None), + path, + state: parking_lot::Mutex::new(State::Unlocked), + }), } } } @@ -41,18 +54,18 @@ impl Lock for FsLock { .write(true) .create(true) .truncate(true) - .open(&self.path)?; + .open(&self.inner.path)?; file.try_lock_exclusive()?; - *self.state.lock() = State::Exclusive; - *self.file.lock() = Some(file); + *self.inner.state.lock() = State::Exclusive; + *self.inner.file.lock() = Some(file); Ok(()) } } -#[derive(Debug, Default)] +#[derive(Clone, Copy, Debug, Default)] pub struct MemLock; impl Lock for MemLock { @@ -61,6 +74,15 @@ impl Lock for MemLock { } } +impl Lock for Either { + fn try_exclusive(&self) -> Result<(), LockError> { + match self { + Either::Left(lock) => lock.try_exclusive(), + Either::Right(lock) => lock.try_exclusive(), + } + } +} + #[cfg(test)] mod tests { use super::{FsLock, Lock}; diff --git a/src/repo/mod.rs b/src/repo/mod.rs index 82a74be87..eddc44f58 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -1,8 +1,9 @@ //! Storage implementation(s) backing the [`crate::Ipfs`]. use crate::error::Error; -use crate::{Block, StorageType}; +use crate::Block; use async_trait::async_trait; use core::fmt::Debug; +use default_impl::DefaultStorage; use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::future::{BoxFuture, Either}; use futures::sink::SinkExt; @@ -30,6 +31,8 @@ use tracing::{Instrument, Span}; #[cfg(test)] mod common_tests; +pub(crate) mod default_impl; + pub mod blockstore; pub mod datastore; pub mod lock; @@ -64,6 +67,10 @@ pub enum BlockRmError { NotFound(Cid), } +pub trait StoreOpt { + fn into_opt(self) -> Self; +} + /// This API is being discussed and evolved, which will likely lead to breakage. #[async_trait] pub trait BlockStore: Debug + Send + Sync { @@ -322,24 +329,76 @@ impl> PinKind { type SubscriptionsMap = HashMap>>>; +/// Represents the configuration of the Ipfs node, its backing blockstore and datastore. +pub trait StorageTypes: RepoTypes {} +impl StorageTypes for T {} + +pub trait RepoTypes: Clone + Send + Sync + 'static { + /// Describes a blockstore. + type TBlockStore: BlockStore; + /// Describes a datastore. + type TDataStore: DataStore; + type TLock: Lock; + + fn blockstore(&self) -> &Self::TBlockStore; + fn blockstore_mut(&mut self) -> &mut Self::TBlockStore; + fn datastore(&self) -> &Self::TDataStore; + fn datastore_mut(&mut self) -> &mut Self::TDataStore; + fn lock(&self) -> &Self::TLock; + fn lock_mut(&mut self) -> &mut Self::TLock; +} + +#[derive(Copy, Clone, Debug)] +pub struct Memory; + +impl RepoTypes for Memory { + type TBlockStore = blockstore::memory::MemBlockStore; + type TDataStore = datastore::memory::MemDataStore; + type TLock = lock::MemLock; + + fn blockstore(&self) -> &Self::TBlockStore { + unimplemented!() + } + + fn blockstore_mut(&mut self) -> &mut Self::TBlockStore { + todo!() + } + + fn datastore(&self) -> &Self::TDataStore { + todo!() + } + + fn datastore_mut(&mut self) -> &mut Self::TDataStore { + todo!() + } + + fn lock(&self) -> &Self::TLock { + todo!() + } + + fn lock_mut(&mut self) -> &mut Self::TLock { + todo!() + } +} + /// Describes a repo. /// Consolidates a blockstore, a datastore and a subscription registry. #[allow(clippy::type_complexity)] #[derive(Debug, Clone)] -pub struct Repo { - pub(crate) inner: Arc, +pub struct Repo { + pub(crate) inner: Arc>, } #[derive(Debug)] -pub(crate) struct RepoInner { +pub(crate) struct RepoInner { online: AtomicBool, initialized: AtomicBool, max_storage_size: AtomicUsize, - block_store: Box, - data_store: Box, + block_store: S::TBlockStore, + data_store: S::TDataStore, events: RwLock>>, pub(crate) subscriptions: Mutex, - lockfile: Box, + lockfile: S::TLock, pub(crate) gclock: tokio::sync::RwLock<()>, } @@ -361,50 +420,31 @@ pub enum RepoEvent { RemovedBlock(Cid), } -impl Repo { - pub fn new(repo_type: &mut StorageType) -> Self { - match repo_type { - StorageType::Memory => Repo::new_memory(), - #[cfg(not(target_arch = "wasm32"))] - StorageType::Disk(path) => Repo::new_fs(path), - #[cfg(target_arch = "wasm32")] - StorageType::IndexedDb { namespace } => Repo::new_idb(namespace.take()), - StorageType::Custom { - blockstore, - datastore, - lock, - } => Repo::new_raw( - blockstore.take().expect("Requires blockstore"), - datastore.take().expect("Requires datastore"), - lock.take() - .expect("Requires lockfile for data and block store"), - ), - } - } - - pub fn new_raw( - block_store: Box, - data_store: Box, - lockfile: Box, - ) -> Self { - let inner = RepoInner { - initialized: AtomicBool::default(), - online: AtomicBool::default(), - block_store, - data_store, - events: Default::default(), - subscriptions: Default::default(), - lockfile, - max_storage_size: Default::default(), - gclock: Default::default(), - }; - Repo { - inner: Arc::new(inner), - } - } +impl Repo { + // pub fn new(repo_type: &mut StorageType) -> Self { + // match repo_type { + // StorageType::Memory => Repo::new_memory(), + // #[cfg(not(target_arch = "wasm32"))] + // StorageType::Disk(path) => Repo::new_fs(path), + // #[cfg(target_arch = "wasm32")] + // StorageType::IndexedDb { namespace } => Repo::new_idb(namespace.take()), + // StorageType::Custom { + // blockstore, + // datastore, + // lock, + // } => Repo::new_raw( + // blockstore.take().expect("Requires blockstore"), + // datastore.take().expect("Requires datastore"), + // lock.take() + // .expect("Requires lockfile for data and block store"), + // ), + // } + // } #[cfg(not(target_arch = "wasm32"))] pub fn new_fs(path: impl AsRef) -> Self { + let mut default = DefaultStorage::default(); + let path = path.as_ref().to_path_buf(); let mut blockstore_path = path.clone(); let mut datastore_path = path.clone(); @@ -413,25 +453,46 @@ impl Repo { datastore_path.push("datastore"); lockfile_path.push("repo_lock"); - let block_store = Box::new(blockstore::flatfs::FsBlockStore::new(blockstore_path)); - let data_store = Box::new(datastore::flatfs::FsDataStore::new(datastore_path)); - let lockfile = Box::new(lock::FsLock::new(lockfile_path)); - Self::new_raw(block_store, data_store, lockfile) + default.set_blockstore_path(blockstore_path); + default.set_datastore_path(datastore_path); + default.set_lockfile(lockfile_path); + + Self::new_raw(default.clone(), default.clone(), default) } pub fn new_memory() -> Self { - let block_store = Box::new(blockstore::memory::MemBlockStore::new(Default::default())); - let data_store = Box::new(datastore::memory::MemDataStore::new(Default::default())); - let lockfile = Box::new(lock::MemLock); - Self::new_raw(block_store, data_store, lockfile) + let default = DefaultStorage::default(); + Self::new_raw(default.clone(), default.clone(), default) } #[cfg(target_arch = "wasm32")] pub fn new_idb(namespace: Option) -> Self { - let block_store = Box::new(blockstore::idb::IdbBlockStore::new(namespace.clone())); - let data_store = Box::new(datastore::idb::IdbDataStore::new(namespace)); - let lockfile = Box::new(lock::MemLock); - Self::new_raw(block_store, data_store, lockfile) + let mut default = DefaultStorage::default(); + default.set_namespace(namespace); + Self::new_raw(default.clone(), default.clone(), default) + } +} + +impl Repo { + pub fn new_raw( + block_store: S::TBlockStore, + data_store: S::TDataStore, + lockfile: S::TLock, + ) -> Self { + let inner = RepoInner { + initialized: AtomicBool::default(), + online: AtomicBool::default(), + block_store, + data_store, + events: Default::default(), + subscriptions: Default::default(), + lockfile, + max_storage_size: Default::default(), + gclock: Default::default(), + }; + Repo { + inner: Arc::new(inner), + } } pub fn set_max_storage_size(&self, size: usize) { @@ -587,22 +648,22 @@ impl Repo { } /// Puts a block into the block store. - pub fn put_block(&self, block: &Block) -> RepoPutBlock { + pub fn put_block(&self, block: &Block) -> RepoPutBlock { RepoPutBlock::new(self, block).broadcast_on_new_block(true) } /// Retrives a block from the block store, or starts fetching it from the network and awaits /// until it has been fetched. #[inline] - pub fn get_block>(&self, cid: C) -> RepoGetBlock { - RepoGetBlock::new(self.clone(), cid) + pub fn get_block>(&self, cid: C) -> RepoGetBlock { + RepoGetBlock::new(Repo::clone(self), cid) } /// Retrives a set of blocks from the block store, or starts fetching them from the network and awaits /// until it has been fetched. #[inline] - pub fn get_blocks(&self, cids: impl IntoIterator>) -> RepoGetBlocks { - RepoGetBlocks::new(self.clone()).blocks(cids) + pub fn get_blocks(&self, cids: impl IntoIterator>) -> RepoGetBlocks { + RepoGetBlocks::new(Repo::clone(self)).blocks(cids) } /// Get the size of listed blocks @@ -720,8 +781,8 @@ impl Repo { /// /// Recursively pinned Cids cannot be re-pinned non-recursively but non-recursively pinned Cids /// can be "upgraded to" being recursively pinned. - pub fn pin>(&self, cid: C) -> RepoInsertPin { - RepoInsertPin::new(self.clone(), cid) + pub fn pin>(&self, cid: C) -> RepoInsertPin { + RepoInsertPin::new(Repo::clone(self), cid) } /// Unpins a given Cid recursively or only directly. @@ -730,12 +791,12 @@ impl Repo { /// /// Unpinning an indirectly pinned Cid is not possible other than through its recursively /// pinned tree roots. - pub fn remove_pin>(&self, cid: C) -> RepoRemovePin { - RepoRemovePin::new(self.clone(), cid) + pub fn remove_pin>(&self, cid: C) -> RepoRemovePin { + RepoRemovePin::new(Repo::clone(self), cid) } - pub fn fetch>(&self, cid: C) -> RepoFetch { - RepoFetch::new(self.clone(), cid) + pub fn fetch>(&self, cid: C) -> RepoFetch { + RepoFetch::new(Repo::clone(self), cid) } /// Pins a given Cid recursively or directly (non-recursively). @@ -845,7 +906,7 @@ pub struct GCGuard<'a> { _g: RwLockReadGuard<'a, ()>, } -impl Repo { +impl Repo { /// Hold a guard to prevent GC from running until this guard has dropped /// Note: Until this guard drops, the GC task, if enabled, would not perform any cleanup. /// If the GC task is running, this guard will await until GC finishes @@ -854,17 +915,17 @@ impl Repo { GCGuard { _g } } - pub fn data_store(&self) -> &dyn DataStore { - &*self.inner.data_store + pub fn data_store(&self) -> &S::TDataStore { + &self.inner.data_store } } -pub struct RepoGetBlock { - instance: RepoGetBlocks, +pub struct RepoGetBlock { + instance: RepoGetBlocks, } -impl RepoGetBlock { - pub fn new(repo: Repo, cid: impl Borrow) -> Self { +impl RepoGetBlock { + pub fn new(repo: Repo, cid: impl Borrow) -> Self { let instance = RepoGetBlocks::new(repo).block(cid); Self { instance } } @@ -902,7 +963,7 @@ impl RepoGetBlock { } } -impl Future for RepoGetBlock { +impl Future for RepoGetBlock { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut self; @@ -913,8 +974,8 @@ impl Future for RepoGetBlock { } } -pub struct RepoGetBlocks { - repo: Option, +pub struct RepoGetBlocks { + repo: Option>, cids: IndexSet, providers: IndexSet, local: bool, @@ -923,8 +984,8 @@ pub struct RepoGetBlocks { stream: Option>>, } -impl RepoGetBlocks { - pub fn new(repo: Repo) -> Self { +impl RepoGetBlocks { + pub fn new(repo: Repo) -> Self { Self { repo: Some(repo), cids: IndexSet::new(), @@ -977,7 +1038,7 @@ impl RepoGetBlocks { } } -impl Stream for RepoGetBlocks { +impl Stream for RepoGetBlocks { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -1101,7 +1162,7 @@ impl Stream for RepoGetBlocks { } } -impl IntoFuture for RepoGetBlocks { +impl IntoFuture for RepoGetBlocks { type Output = Result, Error>; type IntoFuture = BoxFuture<'static, Self::Output>; fn into_future(self) -> Self::IntoFuture { @@ -1113,18 +1174,18 @@ impl IntoFuture for RepoGetBlocks { } } -pub struct RepoPutBlock { - repo: Repo, +pub struct RepoPutBlock { + repo: Repo, block: Option, span: Option, broadcast_on_new_block: bool, } -impl RepoPutBlock { - fn new(repo: &Repo, block: &Block) -> Self { +impl RepoPutBlock { + fn new(repo: &Repo, block: &Block) -> Self { let block = Some(block.clone()); Self { - repo: repo.clone(), + repo: Repo::clone(repo), block, span: None, broadcast_on_new_block: true, @@ -1142,7 +1203,7 @@ impl RepoPutBlock { } } -impl IntoFuture for RepoPutBlock { +impl IntoFuture for RepoPutBlock { type IntoFuture = BoxFuture<'static, Self::Output>; type Output = Result; fn into_future(mut self) -> Self::IntoFuture { @@ -1175,8 +1236,8 @@ impl IntoFuture for RepoPutBlock { } } -pub struct RepoFetch { - repo: Repo, +pub struct RepoFetch { + repo: Repo, cid: Cid, span: Option, providers: Vec, @@ -1185,8 +1246,8 @@ pub struct RepoFetch { refs: crate::refs::IpldRefs, } -impl RepoFetch { - pub fn new>(repo: Repo, cid: C) -> Self { +impl RepoFetch { + pub fn new>(repo: Repo, cid: C) -> Self { let cid = cid.borrow(); Self { repo, @@ -1245,7 +1306,7 @@ impl RepoFetch { } } -impl IntoFuture for RepoFetch { +impl IntoFuture for RepoFetch { type Output = Result<(), Error>; type IntoFuture = BoxFuture<'static, Self::Output>; @@ -1290,8 +1351,8 @@ impl IntoFuture for RepoFetch { } } -pub struct RepoInsertPin { - repo: Repo, +pub struct RepoInsertPin { + repo: Repo, cid: Cid, span: Option, providers: Vec, @@ -1301,8 +1362,8 @@ pub struct RepoInsertPin { refs: crate::refs::IpldRefs, } -impl RepoInsertPin { - pub fn new>(repo: Repo, cid: C) -> Self { +impl RepoInsertPin { + pub fn new>(repo: Repo, cid: C) -> Self { let cid = cid.borrow(); Self { repo, @@ -1378,7 +1439,7 @@ impl RepoInsertPin { } } -impl IntoFuture for RepoInsertPin { +impl IntoFuture for RepoInsertPin { type Output = Result<(), Error>; type IntoFuture = BoxFuture<'static, Self::Output>; @@ -1425,16 +1486,16 @@ impl IntoFuture for RepoInsertPin { } } -pub struct RepoRemovePin { - repo: Repo, +pub struct RepoRemovePin { + repo: Repo, cid: Cid, span: Option, recursive: bool, refs: crate::refs::IpldRefs, } -impl RepoRemovePin { - pub fn new>(repo: Repo, cid: C) -> Self { +impl RepoRemovePin { + pub fn new>(repo: Repo, cid: C) -> Self { let cid = cid.borrow(); Self { repo, @@ -1458,7 +1519,7 @@ impl RepoRemovePin { } } -impl IntoFuture for RepoRemovePin { +impl IntoFuture for RepoRemovePin { type Output = Result<(), Error>; type IntoFuture = BoxFuture<'static, Self::Output>; diff --git a/src/task.rs b/src/task.rs index f5091396c..d0e2a0230 100644 --- a/src/task.rs +++ b/src/task.rs @@ -10,7 +10,7 @@ use futures::{ }; use pollable_map::stream::optional::OptionalStream; -use crate::{p2p::MultiaddrExt, Channel, InnerPubsubEvent}; +use crate::{p2p::MultiaddrExt, repo::default_impl::DefaultStorage, Channel, InnerPubsubEvent}; use crate::{ConnectionEvents, PeerConnectionEvents, TSwarmEvent}; use crate::{config::BOOTSTRAP_NODES, IpfsEvent, TSwarmEventFn}; @@ -65,7 +65,7 @@ pub struct IpfsTask> { pub listening_addresses: HashMap>, pub provider_stream: HashMap>, pub record_stream: HashMap>, - pub repo: Repo, + pub repo: Repo, pub kad_subscriptions: HashMap>, pub dht_peer_lookup: HashMap>>, pub bootstraps: HashSet, @@ -92,7 +92,7 @@ pub struct IpfsTask> { } impl> IpfsTask { - pub fn new(swarm: TSwarm, repo: &Repo, event_capacity: usize) -> Self { + pub fn new(swarm: TSwarm, repo: &Repo, event_capacity: usize) -> Self { IpfsTask { repo_events: OptionalStream::default(), from_facade: OptionalStream::default(), @@ -106,7 +106,7 @@ impl> IpfsTask { bitswap_cancellable: Default::default(), repo: repo.clone(), bootstraps: Default::default(), - swarm_event: Default::default(), + swarm_event: None, timer: Default::default(), relay_listener: Default::default(), local_external_addr: false, diff --git a/src/unixfs/add.rs b/src/unixfs/add.rs index 7ebdf0dee..c0f2ada21 100644 --- a/src/unixfs/add.rs +++ b/src/unixfs/add.rs @@ -1,6 +1,9 @@ use std::task::{Context, Poll}; -use crate::{repo::Repo, Block}; +use crate::{ + repo::{default_impl::DefaultStorage, Repo}, + Block, +}; use bytes::Bytes; use either::Either; #[allow(unused_imports)] @@ -47,7 +50,7 @@ impl From<&Path> for AddOpt { #[must_use = "does nothing unless you `.await` or poll the stream"] pub struct UnixfsAdd { - core: Option>, + core: Option>>, opt: Option, span: Span, chunk: Chunker, @@ -62,11 +65,11 @@ impl UnixfsAdd { Self::with_either(Either::Left(ipfs.clone()), opt) } - pub fn with_repo(repo: &Repo, opt: impl Into) -> Self { + pub fn with_repo(repo: &Repo, opt: impl Into) -> Self { Self::with_either(Either::Right(repo.clone()), opt) } - fn with_either(core: Either, opt: impl Into) -> Self { + fn with_either(core: Either>, opt: impl Into) -> Self { let opt = opt.into(); Self { core: Some(core), diff --git a/src/unixfs/cat.rs b/src/unixfs/cat.rs index ea8d39b98..59369c963 100644 --- a/src/unixfs/cat.rs +++ b/src/unixfs/cat.rs @@ -1,3 +1,4 @@ +use crate::repo::default_impl::DefaultStorage; use crate::{dag::IpldDag, repo::Repo, Block, Ipfs}; use async_stream::try_stream; use bytes::Bytes; @@ -23,7 +24,7 @@ use super::TraversalFailed; /// Returns a stream of bytes on the file pointed with the Cid. #[must_use = "does nothing unless you `.await` or poll the stream"] pub struct UnixfsCat { - core: Option>, + core: Option>>, span: Span, length: Option, starting_point: Option, @@ -39,11 +40,17 @@ impl UnixfsCat { Self::with_either(Either::Left(ipfs.clone()), starting_point) } - pub fn with_repo(repo: &Repo, starting_point: impl Into) -> Self { + pub fn with_repo( + repo: &Repo, + starting_point: impl Into, + ) -> Self { Self::with_either(Either::Right(repo.clone()), starting_point) } - fn with_either(core: Either, starting_point: impl Into) -> Self { + fn with_either( + core: Either>, + starting_point: impl Into, + ) -> Self { let starting_point = starting_point.into(); Self { core: Some(core), diff --git a/src/unixfs/get.rs b/src/unixfs/get.rs index 5c1dab642..1db23c742 100644 --- a/src/unixfs/get.rs +++ b/src/unixfs/get.rs @@ -15,6 +15,7 @@ use std::{ use tokio::io::AsyncWriteExt; use tracing::{Instrument, Span}; +use crate::repo::default_impl::DefaultStorage; use crate::{dag::IpldDag, repo::Repo, Ipfs, IpfsPath}; #[allow(unused_imports)] @@ -22,7 +23,7 @@ use super::{TraversalFailed, UnixfsStatus}; #[must_use = "does nothing unless you `.await` or poll the stream"] pub struct UnixfsGet { - core: Option>, + core: Option>>, dest: PathBuf, span: Span, path: Option, @@ -37,12 +38,16 @@ impl UnixfsGet { Self::with_either(Either::Left(ipfs.clone()), path, dest) } - pub fn with_repo(repo: &Repo, path: impl Into, dest: impl AsRef) -> Self { + pub fn with_repo( + repo: &Repo, + path: impl Into, + dest: impl AsRef, + ) -> Self { Self::with_either(Either::Right(repo.clone()), path, dest) } fn with_either( - core: Either, + core: Either>, path: impl Into, dest: impl AsRef, ) -> Self { diff --git a/src/unixfs/ls.rs b/src/unixfs/ls.rs index 7a4cfca89..434e0a316 100644 --- a/src/unixfs/ls.rs +++ b/src/unixfs/ls.rs @@ -12,7 +12,11 @@ use std::task::Context; use std::{task::Poll, time::Duration}; use tracing::{Instrument, Span}; -use crate::{dag::IpldDag, repo::Repo, Ipfs, IpfsPath}; +use crate::{ + dag::IpldDag, + repo::{default_impl::DefaultStorage, Repo}, + Ipfs, IpfsPath, +}; #[derive(Debug)] pub enum Entry { @@ -24,7 +28,7 @@ pub enum Entry { #[must_use = "does nothing unless you `.await` or poll the stream"] pub struct UnixfsLs { - core: Option>, + core: Option>>, span: Span, path: Option, providers: Vec, @@ -38,11 +42,11 @@ impl UnixfsLs { Self::with_either(Either::Left(ipfs.clone()), path) } - pub fn with_repo(repo: &Repo, path: impl Into) -> Self { + pub fn with_repo(repo: &Repo, path: impl Into) -> Self { Self::with_either(Either::Right(repo.clone()), path) } - fn with_either(core: Either, path: impl Into) -> Self { + fn with_either(core: Either>, path: impl Into) -> Self { let path = path.into(); Self { core: Some(core), From 41bc9883806c4733ed6973946ceb097b25cf1a99 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 24 Mar 2025 09:13:26 -0500 Subject: [PATCH 2/7] chore: remove generic type from set_namespace --- src/lib.rs | 65 +++++++----------------------------------------------- 1 file changed, 8 insertions(+), 57 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2cf801a84..90739cb18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,8 +61,7 @@ use p2p::{ RelayConfig, RequestResponseConfig, SwarmConfig, TransportConfig, }; use repo::{ - default_impl::DefaultStorage, BlockStore, DataStore, GCConfig, GCTrigger, Lock, RepoFetch, - RepoInsertPin, RepoRemovePin, + default_impl::DefaultStorage, GCConfig, GCTrigger, RepoFetch, RepoInsertPin, RepoRemovePin, }; use tracing::Span; @@ -127,51 +126,6 @@ use libp2p::{request_response::InboundRequestId, swarm::dial_opts::PeerCondition pub use libp2p_connection_limits::ConnectionLimits; use serde::Serialize; -#[allow(dead_code)] -#[deprecated(note = "Use `StoreageType` instead")] -type StoragePath = StorageType; - -#[derive(Default, Debug)] -pub enum StorageType { - #[cfg(not(target_arch = "wasm32"))] - Disk(std::path::PathBuf), - #[default] - Memory, - #[cfg(target_arch = "wasm32")] - IndexedDb { namespace: Option }, - Custom { - blockstore: Option>, - datastore: Option>, - lock: Option>, - }, -} - -impl PartialEq for StorageType { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - #[cfg(not(target_arch = "wasm32"))] - (StorageType::Disk(left_path), StorageType::Disk(right_path)) => { - left_path.eq(right_path) - } - #[cfg(target_arch = "wasm32")] - ( - StorageType::IndexedDb { namespace: left }, - StorageType::IndexedDb { namespace: right }, - ) => left.eq(right), - (StorageType::Memory, StorageType::Memory) => true, - (StorageType::Custom { .. }, StorageType::Custom { .. }) => { - //Do we really care if they equal? - //TODO: Possibly implement PartialEq/Eq for the traits so we could make sure - // that they do or dont eq each other. For now this will always be true - true - } - _ => false, - } - } -} - -impl Eq for StorageType {} - /// Ipfs node options used to configure the node to be created with [`UninitializedIpfs`]. struct IpfsOptions { /// The path of the ipfs repo (blockstore and datastore). @@ -185,9 +139,9 @@ struct IpfsOptions { /// existing repository. pub ipfs_path: Option, - /// Name of the namespace used for indexeddb - #[cfg(target_arch="wasm32")] - pub namespace: Option, + /// Enables and supply a name of the namespace used for indexeddb + #[cfg(target_arch = "wasm32")] + pub namespace: Option>, /// Nodes used as bootstrap peers. pub bootstrap: Vec, @@ -291,7 +245,7 @@ impl Default for IpfsOptions { fn default() -> Self { Self { ipfs_path: None, - #[cfg(target_arch="wasm32")] + #[cfg(target_arch = "wasm32")] namespace: None, bootstrap: Default::default(), relay_server_config: Default::default(), @@ -805,8 +759,7 @@ impl + Send> UninitializedIpfs { /// Sets a namespace #[cfg(target_arch = "wasm32")] - pub fn set_namespace>(mut self, ns: S) -> Self { - let ns = ns.as_ref().to_string(); + pub fn set_namespace(mut self, ns: Option) -> Self { self.options.namespace = Some(ns); self } @@ -976,10 +929,8 @@ impl + Send> UninitializedIpfs { #[cfg(target_arch = "wasm32")] { - repo = match &options.namespace { - Some(ns) => { - Repo::::new_idb(Some(ns.clone())) - } + repo = match options.namespace.take() { + Some(ns) => Repo::::new_idb(ns), None => repo, }; } From 62cb1c8e2fd4c46667d7e181bb49b5863dc16c76 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 24 Mar 2025 09:36:12 -0500 Subject: [PATCH 3/7] chore: fmt --- src/repo/blockstore/arc.rs | 3 +-- src/repo/blockstore/mod.rs | 2 +- src/repo/datastore/arc.rs | 33 ++++++++++++++++++++++++--------- src/repo/datastore/mod.rs | 2 +- 4 files changed, 27 insertions(+), 13 deletions(-) diff --git a/src/repo/blockstore/arc.rs b/src/repo/blockstore/arc.rs index 3e9aac672..44015127a 100644 --- a/src/repo/blockstore/arc.rs +++ b/src/repo/blockstore/arc.rs @@ -9,7 +9,6 @@ use crate::repo::{BlockPut, BlockStore}; use crate::error::Error; use crate::Block; - #[async_trait] impl BlockStore for Arc { async fn init(&self) -> Result<(), Error> { @@ -39,4 +38,4 @@ impl BlockStore for Arc { async fn list(&self) -> BoxStream<'static, Cid> { (**self).list().await } -} \ No newline at end of file +} diff --git a/src/repo/blockstore/mod.rs b/src/repo/blockstore/mod.rs index e88a827da..e4842b0c9 100644 --- a/src/repo/blockstore/mod.rs +++ b/src/repo/blockstore/mod.rs @@ -4,5 +4,5 @@ pub mod flatfs; pub mod idb; pub mod memory; +pub mod arc; pub mod either; -pub mod arc; \ No newline at end of file diff --git a/src/repo/datastore/arc.rs b/src/repo/datastore/arc.rs index 1446f6083..a6e6f63b2 100644 --- a/src/repo/datastore/arc.rs +++ b/src/repo/datastore/arc.rs @@ -1,11 +1,11 @@ use std::sync::Arc; +use crate::error::Error; +use crate::repo::{DataStore, PinStore, References}; +use crate::{PinKind, PinMode}; use async_trait::async_trait; use futures::stream::BoxStream; use ipld_core::cid::Cid; -use crate::repo::{DataStore, PinStore, References}; -use crate::error::Error; -use crate::{PinKind, PinMode}; #[async_trait] impl DataStore for Arc { @@ -39,12 +39,16 @@ impl PinStore for Arc

{ async fn is_pinned(&self, block: &Cid) -> Result { (**self).is_pinned(block).await } - + async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> { (**self).insert_direct_pin(target).await } - async fn insert_recursive_pin(&self, target: &Cid, referenced: References<'_>) -> Result<(), Error> { + async fn insert_recursive_pin( + &self, + target: &Cid, + referenced: References<'_>, + ) -> Result<(), Error> { (**self).insert_recursive_pin(target, referenced).await } @@ -52,15 +56,26 @@ impl PinStore for Arc

{ (**self).remove_direct_pin(target).await } - async fn remove_recursive_pin(&self, target: &Cid, referenced: References<'_>) -> Result<(), Error> { + async fn remove_recursive_pin( + &self, + target: &Cid, + referenced: References<'_>, + ) -> Result<(), Error> { (**self).remove_recursive_pin(target, referenced).await } - async fn list(&self, mode: Option) -> BoxStream<'static, Result<(Cid, PinMode), Error>> { + async fn list( + &self, + mode: Option, + ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> { (**self).list(mode).await } - async fn query(&self, ids: Vec, requirement: Option) -> Result)>, Error> { + async fn query( + &self, + ids: Vec, + requirement: Option, + ) -> Result)>, Error> { (**self).query(ids, requirement).await } -} \ No newline at end of file +} diff --git a/src/repo/datastore/mod.rs b/src/repo/datastore/mod.rs index 95e32ed06..443dc6d98 100644 --- a/src/repo/datastore/mod.rs +++ b/src/repo/datastore/mod.rs @@ -5,5 +5,5 @@ pub mod memory; #[cfg(target_arch = "wasm32")] pub mod idb; +pub mod arc; pub mod either; -pub mod arc; \ No newline at end of file From ad8af1345f926884e6482d866ec4f0097ddea636 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 24 Mar 2025 10:05:43 -0500 Subject: [PATCH 4/7] chore: remove redundant functions --- src/repo/default_impl.rs | 24 ------------------------ src/repo/mod.rs | 39 --------------------------------------- 2 files changed, 63 deletions(-) diff --git a/src/repo/default_impl.rs b/src/repo/default_impl.rs index b85bf2c93..d23940a63 100644 --- a/src/repo/default_impl.rs +++ b/src/repo/default_impl.rs @@ -117,30 +117,6 @@ impl RepoTypes for DefaultStorage { type TBlockStore = DefaultStorage; type TDataStore = DefaultStorage; type TLock = DefaultStorage; - - fn blockstore(&self) -> &Self::TBlockStore { - self - } - - fn blockstore_mut(&mut self) -> &mut Self::TBlockStore { - self - } - - fn datastore(&self) -> &Self::TDataStore { - self - } - - fn datastore_mut(&mut self) -> &mut Self::TDataStore { - self - } - - fn lock(&self) -> &Self::TLock { - self - } - - fn lock_mut(&mut self) -> &mut Self::TLock { - self - } } impl Unpin for DefaultStorage {} diff --git a/src/repo/mod.rs b/src/repo/mod.rs index eddc44f58..385725b42 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -339,47 +339,8 @@ pub trait RepoTypes: Clone + Send + Sync + 'static { /// Describes a datastore. type TDataStore: DataStore; type TLock: Lock; - - fn blockstore(&self) -> &Self::TBlockStore; - fn blockstore_mut(&mut self) -> &mut Self::TBlockStore; - fn datastore(&self) -> &Self::TDataStore; - fn datastore_mut(&mut self) -> &mut Self::TDataStore; - fn lock(&self) -> &Self::TLock; - fn lock_mut(&mut self) -> &mut Self::TLock; } -#[derive(Copy, Clone, Debug)] -pub struct Memory; - -impl RepoTypes for Memory { - type TBlockStore = blockstore::memory::MemBlockStore; - type TDataStore = datastore::memory::MemDataStore; - type TLock = lock::MemLock; - - fn blockstore(&self) -> &Self::TBlockStore { - unimplemented!() - } - - fn blockstore_mut(&mut self) -> &mut Self::TBlockStore { - todo!() - } - - fn datastore(&self) -> &Self::TDataStore { - todo!() - } - - fn datastore_mut(&mut self) -> &mut Self::TDataStore { - todo!() - } - - fn lock(&self) -> &Self::TLock { - todo!() - } - - fn lock_mut(&mut self) -> &mut Self::TLock { - todo!() - } -} /// Describes a repo. /// Consolidates a blockstore, a datastore and a subscription registry. From fe1b2f092fd842fc3030c7a225acfc9457a62dd7 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 24 Mar 2025 10:16:24 -0500 Subject: [PATCH 5/7] chore: remove async-trait from traits --- src/repo/blockstore/arc.rs | 2 - src/repo/blockstore/either.rs | 2 - src/repo/blockstore/flatfs.rs | 2 - src/repo/blockstore/idb.rs | 2 - src/repo/blockstore/memory.rs | 2 - src/repo/datastore/arc.rs | 3 -- src/repo/datastore/either.rs | 3 -- src/repo/datastore/flatfs.rs | 5 +-- src/repo/datastore/idb.rs | 4 -- src/repo/datastore/memory.rs | 3 -- src/repo/default_impl.rs | 4 -- src/repo/mod.rs | 75 +++++++++++++++-------------------- 12 files changed, 33 insertions(+), 74 deletions(-) diff --git a/src/repo/blockstore/arc.rs b/src/repo/blockstore/arc.rs index 44015127a..98dea3f17 100644 --- a/src/repo/blockstore/arc.rs +++ b/src/repo/blockstore/arc.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; use futures::stream::BoxStream; use ipld_core::cid::Cid; @@ -9,7 +8,6 @@ use crate::repo::{BlockPut, BlockStore}; use crate::error::Error; use crate::Block; -#[async_trait] impl BlockStore for Arc { async fn init(&self) -> Result<(), Error> { (**self).init().await diff --git a/src/repo/blockstore/either.rs b/src/repo/blockstore/either.rs index 4504430ba..c6bfe4423 100644 --- a/src/repo/blockstore/either.rs +++ b/src/repo/blockstore/either.rs @@ -3,12 +3,10 @@ use crate::{ repo::{BlockPut, BlockStore}, Block, }; -use async_trait::async_trait; use either::Either; use futures::stream::BoxStream; use ipld_core::cid::Cid; -#[async_trait] impl BlockStore for Either { async fn init(&self) -> Result<(), Error> { match self { diff --git a/src/repo/blockstore/flatfs.rs b/src/repo/blockstore/flatfs.rs index 47bff43f7..c9ebec02e 100644 --- a/src/repo/blockstore/flatfs.rs +++ b/src/repo/blockstore/flatfs.rs @@ -2,7 +2,6 @@ use crate::error::Error; use crate::repo::paths::{block_path, filestem_to_block_cid}; use crate::repo::{BlockPut, BlockStore}; use crate::Block; -use async_trait::async_trait; use futures::stream::{self, BoxStream}; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use ipld_core::cid::Cid; @@ -34,7 +33,6 @@ impl FsBlockStore { } } -#[async_trait] impl BlockStore for FsBlockStore { async fn init(&self) -> Result<(), Error> { let inner = &*self.inner.read().await; diff --git a/src/repo/blockstore/idb.rs b/src/repo/blockstore/idb.rs index 139680495..bd68249b3 100644 --- a/src/repo/blockstore/idb.rs +++ b/src/repo/blockstore/idb.rs @@ -4,7 +4,6 @@ use crate::{ repo::{BlockPut, BlockStore}, Block, Error, }; -use async_trait::async_trait; use futures::{channel::oneshot, stream::BoxStream, SinkExt, StreamExt}; use idb::{Database, DatabaseEvent, Factory, ObjectStoreParams, TransactionMode}; use ipld_core::cid::Cid; @@ -41,7 +40,6 @@ impl IdbBlockStore { } } -#[async_trait] impl BlockStore for IdbBlockStore { async fn init(&self) -> Result<(), Error> { let factory = self.factory.clone(); diff --git a/src/repo/blockstore/memory.rs b/src/repo/blockstore/memory.rs index f8867a1ac..a3a3c26cc 100644 --- a/src/repo/blockstore/memory.rs +++ b/src/repo/blockstore/memory.rs @@ -2,7 +2,6 @@ use crate::error::Error; use crate::repo::{BlockPut, BlockStore}; use crate::Block; -use async_trait::async_trait; use futures::stream::{self, BoxStream}; use futures::StreamExt; use ipld_core::cid::Cid; @@ -43,7 +42,6 @@ impl MemBlockStore { } } -#[async_trait] impl BlockStore for MemBlockStore { async fn init(&self) -> Result<(), Error> { Ok(()) diff --git a/src/repo/datastore/arc.rs b/src/repo/datastore/arc.rs index a6e6f63b2..832c13ee2 100644 --- a/src/repo/datastore/arc.rs +++ b/src/repo/datastore/arc.rs @@ -3,11 +3,9 @@ use std::sync::Arc; use crate::error::Error; use crate::repo::{DataStore, PinStore, References}; use crate::{PinKind, PinMode}; -use async_trait::async_trait; use futures::stream::BoxStream; use ipld_core::cid::Cid; -#[async_trait] impl DataStore for Arc { async fn init(&self) -> Result<(), Error> { (**self).init().await @@ -34,7 +32,6 @@ impl DataStore for Arc { } } -#[async_trait] impl PinStore for Arc

{ async fn is_pinned(&self, block: &Cid) -> Result { (**self).is_pinned(block).await diff --git a/src/repo/datastore/either.rs b/src/repo/datastore/either.rs index 92b7116ff..c84591ce1 100644 --- a/src/repo/datastore/either.rs +++ b/src/repo/datastore/either.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use either::Either; use futures::stream::BoxStream; use ipld_core::cid::Cid; @@ -7,7 +6,6 @@ use crate::error::Error; use crate::repo::{DataStore, PinStore, References}; use crate::{PinKind, PinMode}; -#[async_trait] impl DataStore for Either { async fn init(&self) -> Result<(), Error> { match self { @@ -52,7 +50,6 @@ impl DataStore for Either { } } -#[async_trait] impl PinStore for Either { async fn is_pinned(&self, block: &Cid) -> Result { match self { diff --git a/src/repo/datastore/flatfs.rs b/src/repo/datastore/flatfs.rs index bcacdd02d..77803ed6b 100644 --- a/src/repo/datastore/flatfs.rs +++ b/src/repo/datastore/flatfs.rs @@ -2,7 +2,6 @@ use crate::error::Error; use crate::repo::paths::{filestem_to_pin_cid, pin_path}; use crate::repo::{DataStore, PinKind, PinMode, PinModeRequirement, PinStore, References}; -use async_trait::async_trait; use core::convert::TryFrom; use futures::stream::{BoxStream, TryStreamExt}; use futures::StreamExt; @@ -176,7 +175,7 @@ fn build_kv, P: AsRef>( /// The column operations are all unimplemented pending at least downscoping of the /// DataStore trait itself. -#[async_trait] + impl DataStore for FsDataStore { async fn init(&self) -> Result<(), Error> { // Although `pins` directory is created when inserting a data, is it not created when there are any attempts at listing the pins (thus causing to fail) @@ -213,7 +212,7 @@ impl DataStore for FsDataStore { // PinStore is a trait from ipfs::repo implemented on FsDataStore defined at ipfs::repo::fs or // parent module. -#[async_trait] + impl PinStore for FsDataStore { async fn is_pinned(&self, cid: &Cid) -> Result { let path = pin_path(self.path.join("pins"), cid); diff --git a/src/repo/datastore/idb.rs b/src/repo/datastore/idb.rs index dab549c64..552d04091 100644 --- a/src/repo/datastore/idb.rs +++ b/src/repo/datastore/idb.rs @@ -1,7 +1,5 @@ use std::{collections::BTreeSet, rc::Rc, str::FromStr, sync::OnceLock}; -use async_trait::async_trait; - use crate::{ repo::{DataStore, PinModeRequirement, PinStore, References}, Error, PinKind, PinMode, @@ -44,7 +42,6 @@ impl IdbDataStore { } } -#[async_trait] impl DataStore for IdbDataStore { async fn init(&self) -> Result<(), Error> { let factory = self.factory.clone(); @@ -232,7 +229,6 @@ impl DataStore for IdbDataStore { // in the transactional parts of the [`Infallible`] is used to signal there is no additional // custom error, not that the transaction was infallible in itself. -#[async_trait] impl PinStore for IdbDataStore { async fn is_pinned(&self, cid: &Cid) -> Result { let cid = cid.to_owned(); diff --git a/src/repo/datastore/memory.rs b/src/repo/datastore/memory.rs index e4b235c82..b683b74ff 100644 --- a/src/repo/datastore/memory.rs +++ b/src/repo/datastore/memory.rs @@ -1,6 +1,5 @@ use crate::error::Error; use crate::repo::{DataStore, PinKind, PinMode, PinModeRequirement, PinStore}; -use async_trait::async_trait; use futures::StreamExt; use ipld_core::cid::{self, Cid}; use std::path::PathBuf; @@ -105,7 +104,6 @@ impl MemDataStore { } } -#[async_trait] impl PinStore for MemDataStore { async fn is_pinned(&self, block: &Cid) -> Result { let key = block.to_bytes(); @@ -312,7 +310,6 @@ impl PinStore for MemDataStore { } } -#[async_trait] impl DataStore for MemDataStore { async fn init(&self) -> Result<(), Error> { Ok(()) diff --git a/src/repo/default_impl.rs b/src/repo/default_impl.rs index d23940a63..0597a7475 100644 --- a/src/repo/default_impl.rs +++ b/src/repo/default_impl.rs @@ -14,7 +14,6 @@ use crate::Block; #[cfg(target_arch = "wasm32")] use std::sync::Arc; -use async_trait::async_trait; use either::Either; use futures::stream::BoxStream; use ipld_core::cid::Cid; @@ -121,7 +120,6 @@ impl RepoTypes for DefaultStorage { impl Unpin for DefaultStorage {} -#[async_trait] impl BlockStore for DefaultStorage { async fn init(&self) -> Result<(), Error> { self.blockstore.init().await @@ -160,7 +158,6 @@ impl BlockStore for DefaultStorage { } } -#[async_trait] impl DataStore for DefaultStorage { async fn init(&self) -> Result<(), Error> { self.datastore.init().await @@ -187,7 +184,6 @@ impl DataStore for DefaultStorage { } } -#[async_trait] impl PinStore for DefaultStorage { async fn is_pinned(&self, block: &Cid) -> Result { self.datastore.is_pinned(block).await diff --git a/src/repo/mod.rs b/src/repo/mod.rs index 385725b42..ee1ba613e 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -1,7 +1,6 @@ //! Storage implementation(s) backing the [`crate::Ipfs`]. use crate::error::Error; use crate::Block; -use async_trait::async_trait; use core::fmt::Debug; use default_impl::DefaultStorage; use futures::channel::mpsc::{channel, Receiver, Sender}; @@ -72,50 +71,45 @@ pub trait StoreOpt { } /// This API is being discussed and evolved, which will likely lead to breakage. -#[async_trait] pub trait BlockStore: Debug + Send + Sync { - async fn init(&self) -> Result<(), Error>; + fn init(&self) -> impl Future> + Send; - #[deprecated] - async fn open(&self) -> Result<(), Error> { - Ok(()) - } /// Returns whether a block is present in the blockstore. - async fn contains(&self, cid: &Cid) -> Result; + fn contains(&self, cid: &Cid) -> impl Future> + Send; /// Returns a block from the blockstore. - async fn get(&self, cid: &Cid) -> Result, Error>; + fn get(&self, cid: &Cid) -> impl Future, Error>> + Send; /// Get the size of a single block - async fn size(&self, cid: &[Cid]) -> Result, Error>; + fn size(&self, cid: &[Cid]) -> impl Future, Error>> + Send; /// Get a total size of the block store - async fn total_size(&self) -> Result; + fn total_size(&self) -> impl Future> + Send; /// Inserts a block in the blockstore. - async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error>; + fn put(&self, block: &Block) -> impl Future> + Send; /// Removes a block from the blockstore. - async fn remove(&self, cid: &Cid) -> Result<(), Error>; + fn remove(&self, cid: &Cid) -> impl Future> + Send; /// Remove multiple blocks from the blockstore - async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid>; + fn remove_many( + &self, + blocks: BoxStream<'static, Cid>, + ) -> impl Future> + Send; /// Returns a list of the blocks (Cids), in the blockstore. - async fn list(&self) -> BoxStream<'static, Cid>; + fn list(&self) -> impl Future> + Send; } -#[async_trait] /// Generic layer of abstraction for a key-value data store. pub trait DataStore: PinStore + Debug + Send + Sync { - async fn init(&self) -> Result<(), Error>; - #[deprecated] - async fn open(&self) -> Result<(), Error> { - Ok(()) - } + fn init(&self) -> impl Future> + Send; /// Checks if a key is present in the datastore. - async fn contains(&self, key: &[u8]) -> Result; + fn contains(&self, key: &[u8]) -> impl Future> + Send; /// Returns the value associated with a key from the datastore. - async fn get(&self, key: &[u8]) -> Result>, Error>; + fn get(&self, key: &[u8]) -> impl Future>, Error>> + Send; /// Puts the value under the key in the datastore. - async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error>; + fn put(&self, key: &[u8], value: &[u8]) -> impl Future> + Send; /// Removes a key-value pair from the datastore. - async fn remove(&self, key: &[u8]) -> Result<(), Error>; + fn remove(&self, key: &[u8]) -> impl Future> + Send; /// Iterate over the k/v of the datastore - async fn iter(&self) -> futures::stream::BoxStream<'static, (Vec, Vec)>; + fn iter( + &self, + ) -> impl Future, Vec)>> + Send; } #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] @@ -201,43 +195,37 @@ pub trait Lock: Debug + Send + Sync { type References<'a> = BoxStream<'a, Result>; -#[async_trait] pub trait PinStore: Debug + Send + Sync { - async fn is_pinned(&self, block: &Cid) -> Result; + fn is_pinned(&self, block: &Cid) -> impl Future> + Send; - async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error>; + fn insert_direct_pin(&self, target: &Cid) -> impl Future> + Send; - async fn insert_recursive_pin( + fn insert_recursive_pin( &self, target: &Cid, referenced: References<'_>, - ) -> Result<(), Error>; + ) -> impl Future> + Send; - async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error>; + fn remove_direct_pin(&self, target: &Cid) -> impl Future> + Send; - async fn remove_recursive_pin( + fn remove_recursive_pin( &self, target: &Cid, referenced: References<'_>, - ) -> Result<(), Error>; + ) -> impl Future> + Send; - async fn list( + fn list( &self, mode: Option, - ) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>>; + ) -> impl Future>> + Send; - // here we should have resolved ids - // go-ipfs: doesnt start fetching the paths - // js-ipfs: starts fetching paths - // FIXME: there should probably be an additional Result<$inner, Error> here; the per pin error - // is serde OR cid::Error. /// Returns error if any of the ids isn't pinned in the required type, otherwise returns /// the pin details if all of the cids are pinned in one way or the another. - async fn query( + fn query( &self, ids: Vec, requirement: Option, - ) -> Result)>, Error>; + ) -> impl Future)>, Error>> + Send; } /// `PinMode` is the description of pin type for quering purposes. @@ -341,7 +329,6 @@ pub trait RepoTypes: Clone + Send + Sync + 'static { type TLock: Lock; } - /// Describes a repo. /// Consolidates a blockstore, a datastore and a subscription registry. #[allow(clippy::type_complexity)] From 13ac608baeed654322f87bc3e173988229bc4a48 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 24 Mar 2025 10:41:03 -0500 Subject: [PATCH 6/7] chore: remove redundant ref keywords --- src/repo/blockstore/either.rs | 36 ++++++++++++------------ src/repo/datastore/either.rs | 52 +++++++++++++++++------------------ 2 files changed, 44 insertions(+), 44 deletions(-) diff --git a/src/repo/blockstore/either.rs b/src/repo/blockstore/either.rs index c6bfe4423..accbaba2f 100644 --- a/src/repo/blockstore/either.rs +++ b/src/repo/blockstore/either.rs @@ -10,64 +10,64 @@ use ipld_core::cid::Cid; impl BlockStore for Either { async fn init(&self) -> Result<(), Error> { match self { - Either::Left(ref blockstore) => blockstore.init().await, - Either::Right(ref blockstore) => blockstore.init().await, + Either::Left(blockstore) => blockstore.init().await, + Either::Right(blockstore) => blockstore.init().await, } } async fn contains(&self, cid: &Cid) -> Result { match self { - Either::Left(ref blockstore) => blockstore.contains(cid).await, - Either::Right(ref blockstore) => blockstore.contains(cid).await, + Either::Left(blockstore) => blockstore.contains(cid).await, + Either::Right(blockstore) => blockstore.contains(cid).await, } } async fn get(&self, cid: &Cid) -> Result, Error> { match self { - Either::Left(ref blockstore) => blockstore.get(cid).await, - Either::Right(ref blockstore) => blockstore.get(cid).await, + Either::Left(blockstore) => blockstore.get(cid).await, + Either::Right(blockstore) => blockstore.get(cid).await, } } async fn size(&self, cid: &[Cid]) -> Result, Error> { match self { - Either::Left(ref blockstore) => blockstore.size(cid).await, - Either::Right(ref blockstore) => blockstore.size(cid).await, + Either::Left(blockstore) => blockstore.size(cid).await, + Either::Right(blockstore) => blockstore.size(cid).await, } } async fn total_size(&self) -> Result { match self { - Either::Left(ref blockstore) => blockstore.total_size().await, - Either::Right(ref blockstore) => blockstore.total_size().await, + Either::Left(blockstore) => blockstore.total_size().await, + Either::Right(blockstore) => blockstore.total_size().await, } } async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> { match self { - Either::Left(ref blockstore) => blockstore.put(block).await, - Either::Right(ref blockstore) => blockstore.put(block).await, + Either::Left(blockstore) => blockstore.put(block).await, + Either::Right(blockstore) => blockstore.put(block).await, } } async fn remove(&self, cid: &Cid) -> Result<(), Error> { match self { - Either::Left(ref blockstore) => blockstore.remove(cid).await, - Either::Right(ref blockstore) => blockstore.remove(cid).await, + Either::Left(blockstore) => blockstore.remove(cid).await, + Either::Right(blockstore) => blockstore.remove(cid).await, } } async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid> { match self { - Either::Left(ref blockstore) => blockstore.remove_many(blocks).await, - Either::Right(ref blockstore) => blockstore.remove_many(blocks).await, + Either::Left(blockstore) => blockstore.remove_many(blocks).await, + Either::Right(blockstore) => blockstore.remove_many(blocks).await, } } async fn list(&self) -> BoxStream<'static, Cid> { match self { - Either::Left(ref blockstore) => blockstore.list().await, - Either::Right(ref blockstore) => blockstore.list().await, + Either::Left(blockstore) => blockstore.list().await, + Either::Right(blockstore) => blockstore.list().await, } } } diff --git a/src/repo/datastore/either.rs b/src/repo/datastore/either.rs index c84591ce1..863659dc5 100644 --- a/src/repo/datastore/either.rs +++ b/src/repo/datastore/either.rs @@ -9,43 +9,43 @@ use crate::{PinKind, PinMode}; impl DataStore for Either { async fn init(&self) -> Result<(), Error> { match self { - Either::Left(ref datastore) => datastore.init().await, - Either::Right(ref datastore) => datastore.init().await, + Either::Left(datastore) => datastore.init().await, + Either::Right(datastore) => datastore.init().await, } } async fn contains(&self, key: &[u8]) -> Result { match self { - Either::Left(ref datastore) => datastore.contains(key).await, - Either::Right(ref datastore) => datastore.contains(key).await, + Either::Left(datastore) => datastore.contains(key).await, + Either::Right(datastore) => datastore.contains(key).await, } } async fn get(&self, key: &[u8]) -> Result>, Error> { match self { - Either::Left(ref datastore) => datastore.get(key).await, - Either::Right(ref datastore) => datastore.get(key).await, + Either::Left(datastore) => datastore.get(key).await, + Either::Right(datastore) => datastore.get(key).await, } } async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { match self { - Either::Left(ref datastore) => datastore.put(key, value).await, - Either::Right(ref datastore) => datastore.put(key, value).await, + Either::Left(datastore) => datastore.put(key, value).await, + Either::Right(datastore) => datastore.put(key, value).await, } } async fn remove(&self, key: &[u8]) -> Result<(), Error> { match self { - Either::Left(ref datastore) => datastore.remove(key).await, - Either::Right(ref datastore) => datastore.remove(key).await, + Either::Left(datastore) => datastore.remove(key).await, + Either::Right(datastore) => datastore.remove(key).await, } } async fn iter(&self) -> BoxStream<'static, (Vec, Vec)> { match self { - Either::Left(ref datastore) => datastore.iter().await, - Either::Right(ref datastore) => datastore.iter().await, + Either::Left(datastore) => datastore.iter().await, + Either::Right(datastore) => datastore.iter().await, } } } @@ -53,15 +53,15 @@ impl DataStore for Either { impl PinStore for Either { async fn is_pinned(&self, block: &Cid) -> Result { match self { - Either::Left(ref datastore) => datastore.is_pinned(block).await, - Either::Right(ref datastore) => datastore.is_pinned(block).await, + Either::Left(datastore) => datastore.is_pinned(block).await, + Either::Right(datastore) => datastore.is_pinned(block).await, } } async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> { match self { - Either::Left(ref datastore) => datastore.insert_direct_pin(target).await, - Either::Right(ref datastore) => datastore.insert_direct_pin(target).await, + Either::Left(datastore) => datastore.insert_direct_pin(target).await, + Either::Right(datastore) => datastore.insert_direct_pin(target).await, } } @@ -71,8 +71,8 @@ impl PinStore for Either { referenced: References<'_>, ) -> Result<(), Error> { match self { - Either::Left(ref datastore) => datastore.insert_recursive_pin(target, referenced).await, - Either::Right(ref datastore) => { + Either::Left(datastore) => datastore.insert_recursive_pin(target, referenced).await, + Either::Right(datastore) => { datastore.insert_recursive_pin(target, referenced).await } } @@ -80,8 +80,8 @@ impl PinStore for Either { async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> { match self { - Either::Left(ref datastore) => datastore.remove_direct_pin(target).await, - Either::Right(ref datastore) => datastore.remove_direct_pin(target).await, + Either::Left(datastore) => datastore.remove_direct_pin(target).await, + Either::Right(datastore) => datastore.remove_direct_pin(target).await, } } @@ -91,8 +91,8 @@ impl PinStore for Either { referenced: References<'_>, ) -> Result<(), Error> { match self { - Either::Left(ref datastore) => datastore.remove_recursive_pin(target, referenced).await, - Either::Right(ref datastore) => { + Either::Left(datastore) => datastore.remove_recursive_pin(target, referenced).await, + Either::Right(datastore) => { datastore.remove_recursive_pin(target, referenced).await } } @@ -103,8 +103,8 @@ impl PinStore for Either { mode: Option, ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> { match self { - Either::Left(ref datastore) => datastore.list(mode).await, - Either::Right(ref datastore) => datastore.list(mode).await, + Either::Left(datastore) => datastore.list(mode).await, + Either::Right(datastore) => datastore.list(mode).await, } } @@ -114,8 +114,8 @@ impl PinStore for Either { requirement: Option, ) -> Result)>, Error> { match self { - Either::Left(ref datastore) => datastore.query(ids, requirement).await, - Either::Right(ref datastore) => datastore.query(ids, requirement).await, + Either::Left(datastore) => datastore.query(ids, requirement).await, + Either::Right(datastore) => datastore.query(ids, requirement).await, } } } From 13ea31258f7f55955e4d22b0d4446c7c8ac32d37 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 7 Apr 2025 19:42:27 -0400 Subject: [PATCH 7/7] chore: update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cdf3f4f8a..5360e41fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - chore: use async-rt in place of rt utils. [PR 362](https://github.com/dariusc93/rust-ipfs/pull/362) - feat: Implement configuration for connecting to IPFS Private Network. [PR 398](https://github.com/dariusc93/rust-ipfs/pull/398) - refactor: use CommunicationTask to handle sending message to IpfsTask. [PR 404](https://github.com/dariusc93/rust-ipfs/pull/404) +- feat: Use RepoType in Repo and impl DefaultStorage. [PR 414](https://github.com/dariusc93/rust-ipfs/pull/414) # 0.14.1 - fix: remove expect when session failed to get next block.