diff --git a/CHANGELOG.md b/CHANGELOG.md index cdf3f4f8a..5360e41fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - chore: use async-rt in place of rt utils. [PR 362](https://github.com/dariusc93/rust-ipfs/pull/362) - feat: Implement configuration for connecting to IPFS Private Network. [PR 398](https://github.com/dariusc93/rust-ipfs/pull/398) - refactor: use CommunicationTask to handle sending message to IpfsTask. [PR 404](https://github.com/dariusc93/rust-ipfs/pull/404) +- feat: Use RepoType in Repo and impl DefaultStorage. [PR 414](https://github.com/dariusc93/rust-ipfs/pull/414) # 0.14.1 - fix: remove expect when session failed to get next block. diff --git a/src/dag.rs b/src/dag.rs index f5b90b626..ede625f6d 100644 --- a/src/dag.rs +++ b/src/dag.rs @@ -3,6 +3,7 @@ use crate::block::BlockCodec; use crate::error::Error; use crate::path::{IpfsPath, PathRoot, SlashedPath}; +use crate::repo::default_impl::DefaultStorage; use crate::repo::Repo; use crate::{Block, Ipfs}; use bytes::Bytes; @@ -179,11 +180,11 @@ impl RawResolveLocalError { #[derive(Clone, Debug)] pub struct IpldDag { ipfs: Option, - repo: Repo, + repo: Repo, } -impl From for IpldDag { - fn from(repo: Repo) -> Self { +impl From> for IpldDag { + fn from(repo: Repo) -> Self { IpldDag { ipfs: None, repo } } } diff --git a/src/ipns/mod.rs b/src/ipns/mod.rs index c9daa8df4..94944a394 100644 --- a/src/ipns/mod.rs +++ b/src/ipns/mod.rs @@ -5,6 +5,7 @@ use std::borrow::Borrow; use crate::p2p::DnsResolver; use crate::path::{IpfsPath, PathRoot}; +use crate::repo::DataStore; use crate::Ipfs; mod dnslink; diff --git a/src/lib.rs b/src/lib.rs index 18ae334dd..90739cb18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,7 +61,7 @@ use p2p::{ RelayConfig, RequestResponseConfig, SwarmConfig, TransportConfig, }; use repo::{ - BlockStore, DataStore, GCConfig, GCTrigger, Lock, RepoFetch, RepoInsertPin, RepoRemovePin, + default_impl::DefaultStorage, GCConfig, GCTrigger, RepoFetch, RepoInsertPin, RepoRemovePin, }; use tracing::Span; @@ -87,8 +87,8 @@ pub use self::{ use async_rt::{AbortableJoinHandle, CommunicationTask}; use ipld_core::cid::Cid; use ipld_core::ipld::Ipld; -use std::borrow::Borrow; use std::convert::Infallible; +use std::{borrow::Borrow, path::PathBuf}; use std::{ collections::{BTreeSet, HashMap, HashSet}, fmt, @@ -126,51 +126,6 @@ use libp2p::{request_response::InboundRequestId, swarm::dial_opts::PeerCondition pub use libp2p_connection_limits::ConnectionLimits; use serde::Serialize; -#[allow(dead_code)] -#[deprecated(note = "Use `StoreageType` instead")] -type StoragePath = StorageType; - -#[derive(Default, Debug)] -pub enum StorageType { - #[cfg(not(target_arch = "wasm32"))] - Disk(std::path::PathBuf), - #[default] - Memory, - #[cfg(target_arch = "wasm32")] - IndexedDb { namespace: Option }, - Custom { - blockstore: Option>, - datastore: Option>, - lock: Option>, - }, -} - -impl PartialEq for StorageType { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - #[cfg(not(target_arch = "wasm32"))] - (StorageType::Disk(left_path), StorageType::Disk(right_path)) => { - left_path.eq(right_path) - } - #[cfg(target_arch = "wasm32")] - ( - StorageType::IndexedDb { namespace: left }, - StorageType::IndexedDb { namespace: right }, - ) => left.eq(right), - (StorageType::Memory, StorageType::Memory) => true, - (StorageType::Custom { .. }, StorageType::Custom { .. }) => { - //Do we really care if they equal? - //TODO: Possibly implement PartialEq/Eq for the traits so we could make sure - // that they do or dont eq each other. For now this will always be true - true - } - _ => false, - } - } -} - -impl Eq for StorageType {} - /// Ipfs node options used to configure the node to be created with [`UninitializedIpfs`]. struct IpfsOptions { /// The path of the ipfs repo (blockstore and datastore). @@ -182,7 +137,11 @@ struct IpfsOptions { /// /// It is **not** recommended to set this to IPFS_PATH without first at least backing up your /// existing repository. - pub ipfs_path: StorageType, + pub ipfs_path: Option, + + /// Enables and supply a name of the namespace used for indexeddb + #[cfg(target_arch = "wasm32")] + pub namespace: Option>, /// Nodes used as bootstrap peers. pub bootstrap: Vec, @@ -285,7 +244,9 @@ pub enum RepoProvider { impl Default for IpfsOptions { fn default() -> Self { Self { - ipfs_path: StorageType::Memory, + ipfs_path: None, + #[cfg(target_arch = "wasm32")] + namespace: None, bootstrap: Default::default(), relay_server_config: Default::default(), kad_configuration: Either::Left(Default::default()), @@ -333,7 +294,7 @@ impl fmt::Debug for IpfsOptions { #[allow(clippy::type_complexity)] pub struct Ipfs { span: Span, - repo: Repo, + repo: Repo, key: Keypair, keystore: Keystore, identify_conf: IdentifyConfiguration, @@ -557,7 +518,7 @@ pub struct UninitializedIpfs + Send> { keys: Option, options: IpfsOptions, fdlimit: Option, - repo_handle: Option, + repo_handle: Repo, local_external_addr: bool, swarm_event: Option>, // record_validators: HashMap bool + Sync + Send>>, @@ -583,7 +544,7 @@ impl + Send> UninitializedIpfs { keys: None, options: Default::default(), fdlimit: None, - repo_handle: None, + repo_handle: Repo::new_memory(), // record_validators: Default::default(), record_key_validator: Default::default(), local_external_addr: false, @@ -604,10 +565,10 @@ impl + Send> UninitializedIpfs { } /// Set storage type for the repo. - pub fn set_storage_type(mut self, storage_type: StorageType) -> Self { - self.options.ipfs_path = storage_type; - self - } + // pub fn set_storage_type(mut self, storage_type: StorageType) -> Self { + // self.options.ipfs_path = storage_type; + // self + // } /// Adds a listening address pub fn add_listening_addr(mut self, addr: Multiaddr) -> Self { @@ -792,7 +753,14 @@ impl + Send> UninitializedIpfs { #[cfg(not(target_arch = "wasm32"))] pub fn set_path>(mut self, path: P) -> Self { let path = path.as_ref().to_path_buf(); - self.options.ipfs_path = StorageType::Disk(path); + self.options.ipfs_path = Some(path); + self + } + + /// Sets a namespace + #[cfg(target_arch = "wasm32")] + pub fn set_namespace(mut self, ns: Option) -> Self { + self.options.namespace = Some(ns); self } @@ -853,8 +821,8 @@ impl + Send> UninitializedIpfs { } /// Set block and data repo - pub fn set_repo(mut self, repo: &Repo) -> Self { - self.repo_handle = Some(repo.clone()); + pub fn set_repo(mut self, repo: &Repo) -> Self { + self.repo_handle = Repo::clone(repo); self } @@ -940,23 +908,32 @@ impl + Send> UninitializedIpfs { // instruments the IpfsFuture, the background task. let swarm_span = tracing::trace_span!(parent: &root_span, "swarm"); - let repo = match repo_handle { - Some(repo) => { - if repo.is_online() { - anyhow::bail!("Repo is already initialized"); - } - repo - } - None => { - #[cfg(not(target_arch = "wasm32"))] - if let StorageType::Disk(path) = &options.ipfs_path { + let mut repo = repo_handle; + + if repo.is_online() { + anyhow::bail!("Repo is already initialized"); + } + + #[cfg(not(target_arch = "wasm32"))] + { + repo = match &options.ipfs_path { + Some(path) => { if !path.is_dir() { tokio::fs::create_dir_all(path).await?; } + Repo::::new_fs(path) } - Repo::new(&mut options.ipfs_path) - } - }; + None => repo, + }; + } + + #[cfg(target_arch = "wasm32")] + { + repo = match options.namespace.take() { + Some(ns) => Repo::::new_idb(ns), + None => repo, + }; + } repo.init().instrument(init_span.clone()).await?; @@ -1039,7 +1016,7 @@ impl + Send> UninitializedIpfs { let gc_handle = gc_config.map(|config| { async_rt::task::spawn_abortable({ - let repo = repo.clone(); + let repo = Repo::clone(&repo); async move { let GCConfig { duration, trigger } = config; let use_config_timer = duration != Duration::ZERO; @@ -1176,7 +1153,7 @@ impl Ipfs { } /// Return an [`Repo`] to access the internal repo of the node - pub fn repo(&self) -> &Repo { + pub fn repo(&self) -> &Repo { &self.repo } @@ -1191,13 +1168,13 @@ impl Ipfs { } /// Puts a block into the ipfs repo. - pub fn put_block(&self, block: &Block) -> RepoPutBlock { + pub fn put_block(&self, block: &Block) -> RepoPutBlock { self.repo.put_block(block).span(self.span.clone()) } /// Retrieves a block from the local blockstore, or starts fetching from the network or join an /// already started fetch. - pub fn get_block(&self, cid: impl Borrow) -> RepoGetBlock { + pub fn get_block(&self, cid: impl Borrow) -> RepoGetBlock { self.repo.get_block(cid).span(self.span.clone()) } @@ -1239,7 +1216,7 @@ impl Ipfs { /// If a recursive `insert_pin` operation is interrupted because of a crash or the crash /// prevents from synchronizing the data store to disk, this will leave the system in an inconsistent /// state. The remedy is to re-pin recursive pins. - pub fn insert_pin(&self, cid: impl Borrow) -> RepoInsertPin { + pub fn insert_pin(&self, cid: impl Borrow) -> RepoInsertPin { self.repo().pin(cid).span(self.span.clone()) } @@ -1249,7 +1226,7 @@ impl Ipfs { /// /// Unpinning an indirectly pinned Cid is not possible other than through its recursively /// pinned tree roots. - pub fn remove_pin(&self, cid: impl Borrow) -> RepoRemovePin { + pub fn remove_pin(&self, cid: impl Borrow) -> RepoRemovePin { self.repo().remove_pin(cid).span(self.span.clone()) } @@ -2131,7 +2108,7 @@ impl Ipfs { } /// Fetches the block, and, if set, recursively walk the graph loading all the blocks to the blockstore. - pub fn fetch(&self, cid: &Cid) -> RepoFetch { + pub fn fetch(&self, cid: &Cid) -> RepoFetch { self.repo.fetch(cid).span(self.span.clone()) } @@ -3085,6 +3062,7 @@ pub use node::Node; /// Node module provides an easy to use interface used in `tests/`. mod node { + use super::*; /// Node encapsulates everything to setup a testing instance so that multi-node tests become diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index ad304c205..bcbeb04c9 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -9,6 +9,7 @@ use either::Either; use serde::{Deserialize, Serialize}; use crate::error::Error; +use crate::repo::default_impl::DefaultStorage; use crate::{IntoAddPeerOpt, IpfsOptions}; use crate::repo::Repo; @@ -362,7 +363,7 @@ where pub(crate) fn new( keypair: &Keypair, options: &IpfsOptions, - repo: &Repo, + repo: &Repo, custom: Option, ) -> Result<(Self, Option), Error> { let bootstrap = options.bootstrap.clone(); diff --git a/src/p2p/bitswap.rs b/src/p2p/bitswap.rs index b3a8dfe0f..fe7451a3d 100644 --- a/src/p2p/bitswap.rs +++ b/src/p2p/bitswap.rs @@ -35,7 +35,10 @@ mod bitswap_pb { } } -use crate::{repo::Repo, Block}; +use crate::{ + repo::{default_impl::DefaultStorage, Repo}, + Block, +}; use self::{ message::{BitswapMessage, BitswapRequest, BitswapResponse, RequestType}, @@ -62,14 +65,14 @@ pub struct Behaviour { events: VecDeque::ToSwarm, THandlerInEvent>>, connections: HashMap>, blacklist_connections: HashMap>, - store: Repo, + store: Repo, want_session: StreamMap, have_session: StreamMap, waker: Option, } impl Behaviour { - pub fn new(store: &Repo) -> Self { + pub fn new(store: &Repo) -> Self { Self { events: Default::default(), connections: Default::default(), @@ -559,7 +562,7 @@ impl NetworkBehaviour for Behaviour { mod test { use std::time::Duration; - use crate::block::BlockCodec; + use crate::{block::BlockCodec, repo::default_impl::DefaultStorage}; use futures::StreamExt; use ipld_core::cid::Cid; use libp2p::{ @@ -954,7 +957,7 @@ mod test { Ok(()) } - async fn build_swarm() -> (PeerId, Multiaddr, Swarm, Repo) { + async fn build_swarm() -> (PeerId, Multiaddr, Swarm, Repo) { let repo = Repo::new_memory(); let mut swarm = SwarmBuilder::with_new_identity() diff --git a/src/p2p/bitswap/sessions.rs b/src/p2p/bitswap/sessions.rs index 358cf99e1..507878b88 100644 --- a/src/p2p/bitswap/sessions.rs +++ b/src/p2p/bitswap/sessions.rs @@ -14,7 +14,10 @@ use ipld_core::cid::Cid; use libp2p::PeerId; use std::fmt::Debug; -use crate::{repo::Repo, Block}; +use crate::{ + repo::{default_impl::DefaultStorage, Repo}, + Block, +}; const CAP_THRESHOLD: usize = 100; @@ -91,7 +94,7 @@ pub struct WantSession { discovery: WantDiscovery, received: bool, waker: Option, - repo: Repo, + repo: Repo, state: WantSessionState, timeout: Option, discovery_timeout: Duration, @@ -100,7 +103,7 @@ pub struct WantSession { } impl WantSession { - pub fn new(repo: &Repo, cid: Cid, timeout: Option) -> Self { + pub fn new(repo: &Repo, cid: Cid, timeout: Option) -> Self { Self { cid, wants: Default::default(), @@ -540,13 +543,13 @@ pub struct HaveSession { want: HashMap, send_dont_have: HashSet, have: Option, - repo: Repo, + repo: Repo, waker: Option, state: HaveSessionState, } impl HaveSession { - pub fn new(repo: &Repo, cid: Cid) -> Self { + pub fn new(repo: &Repo, cid: Cid) -> Self { let mut session = Self { cid, want: HashMap::new(), diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index 03cd635ce..b18277c13 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -1,5 +1,6 @@ //! P2P handling for IPFS nodes. use crate::error::Error; +use crate::repo::default_impl::DefaultStorage; use crate::repo::Repo; use crate::{IpfsOptions, TTransportFn}; use std::convert::TryInto; @@ -226,7 +227,7 @@ impl Default for SwarmConfig { pub(crate) fn create_swarm( keypair: &Keypair, options: &IpfsOptions, - repo: &Repo, + repo: &Repo, span: Span, (custom, custom_transport): (Option, Option), ) -> Result, Error> diff --git a/src/refs.rs b/src/refs.rs index b11451367..90d8a2497 100644 --- a/src/refs.rs +++ b/src/refs.rs @@ -2,6 +2,7 @@ use crate::block::BlockCodec; use crate::repo::Repo; +use crate::repo::RepoTypes; use async_stream::stream; use futures::stream::Stream; use ipld_core::{cid::Cid, ipld::Ipld}; @@ -107,13 +108,14 @@ impl IpldRefs { self } - pub fn refs_of_resolved<'a, MaybeOwned, Iter>( + pub fn refs_of_resolved<'a, S, MaybeOwned, Iter>( self, repo: MaybeOwned, iplds: Iter, ) -> impl Stream> + Send + 'a where - MaybeOwned: Borrow + Send + 'a, + S: RepoTypes, + MaybeOwned: Borrow> + Send + 'a, Iter: IntoIterator + Send + 'a, { iplds_refs_inner(repo, iplds, self) @@ -137,14 +139,15 @@ impl IpldRefs { /// /// Depending on how this function is called, the lifetime will be tied to the lifetime of given /// `&Ipfs` or `'static` when given ownership of `Ipfs`. -pub fn iplds_refs<'a, MaybeOwned, Iter>( +pub fn iplds_refs<'a, S, MaybeOwned, Iter>( repo: MaybeOwned, iplds: Iter, max_depth: Option, unique: bool, ) -> impl Stream> + Send + 'a where - MaybeOwned: Borrow + Send + 'a, + S: RepoTypes, + MaybeOwned: Borrow> + Send + 'a, Iter: IntoIterator + Send + 'a, { use futures::stream::TryStreamExt; @@ -165,13 +168,14 @@ where }) } -fn iplds_refs_inner<'a, MaybeOwned, Iter>( +fn iplds_refs_inner<'a, S, MaybeOwned, Iter>( repo: MaybeOwned, iplds: Iter, opts: IpldRefs, ) -> impl Stream> + Send + 'a where - MaybeOwned: Borrow + Send + 'a, + S: RepoTypes, + MaybeOwned: Borrow> + Send + 'a, Iter: IntoIterator, { let mut work = VecDeque::new(); diff --git a/src/repo/blockstore/arc.rs b/src/repo/blockstore/arc.rs new file mode 100644 index 000000000..98dea3f17 --- /dev/null +++ b/src/repo/blockstore/arc.rs @@ -0,0 +1,39 @@ +use std::sync::Arc; + +use futures::stream::BoxStream; +use ipld_core::cid::Cid; + +use crate::repo::{BlockPut, BlockStore}; + +use crate::error::Error; +use crate::Block; + +impl BlockStore for Arc { + async fn init(&self) -> Result<(), Error> { + (**self).init().await + } + async fn contains(&self, cid: &Cid) -> Result { + (**self).contains(cid).await + } + async fn get(&self, cid: &Cid) -> Result, Error> { + (**self).get(cid).await + } + async fn size(&self, cid: &[Cid]) -> Result, Error> { + (**self).size(cid).await + } + async fn total_size(&self) -> Result { + (**self).total_size().await + } + async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> { + (**self).put(block).await + } + async fn remove(&self, cid: &Cid) -> Result<(), Error> { + (**self).remove(cid).await + } + async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid> { + (**self).remove_many(blocks).await + } + async fn list(&self) -> BoxStream<'static, Cid> { + (**self).list().await + } +} diff --git a/src/repo/blockstore/either.rs b/src/repo/blockstore/either.rs new file mode 100644 index 000000000..accbaba2f --- /dev/null +++ b/src/repo/blockstore/either.rs @@ -0,0 +1,73 @@ +use crate::error::Error; +use crate::{ + repo::{BlockPut, BlockStore}, + Block, +}; +use either::Either; +use futures::stream::BoxStream; +use ipld_core::cid::Cid; + +impl BlockStore for Either { + async fn init(&self) -> Result<(), Error> { + match self { + Either::Left(blockstore) => blockstore.init().await, + Either::Right(blockstore) => blockstore.init().await, + } + } + + async fn contains(&self, cid: &Cid) -> Result { + match self { + Either::Left(blockstore) => blockstore.contains(cid).await, + Either::Right(blockstore) => blockstore.contains(cid).await, + } + } + + async fn get(&self, cid: &Cid) -> Result, Error> { + match self { + Either::Left(blockstore) => blockstore.get(cid).await, + Either::Right(blockstore) => blockstore.get(cid).await, + } + } + + async fn size(&self, cid: &[Cid]) -> Result, Error> { + match self { + Either::Left(blockstore) => blockstore.size(cid).await, + Either::Right(blockstore) => blockstore.size(cid).await, + } + } + + async fn total_size(&self) -> Result { + match self { + Either::Left(blockstore) => blockstore.total_size().await, + Either::Right(blockstore) => blockstore.total_size().await, + } + } + + async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> { + match self { + Either::Left(blockstore) => blockstore.put(block).await, + Either::Right(blockstore) => blockstore.put(block).await, + } + } + + async fn remove(&self, cid: &Cid) -> Result<(), Error> { + match self { + Either::Left(blockstore) => blockstore.remove(cid).await, + Either::Right(blockstore) => blockstore.remove(cid).await, + } + } + + async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid> { + match self { + Either::Left(blockstore) => blockstore.remove_many(blocks).await, + Either::Right(blockstore) => blockstore.remove_many(blocks).await, + } + } + + async fn list(&self) -> BoxStream<'static, Cid> { + match self { + Either::Left(blockstore) => blockstore.list().await, + Either::Right(blockstore) => blockstore.list().await, + } + } +} diff --git a/src/repo/blockstore/flatfs.rs b/src/repo/blockstore/flatfs.rs index aaccbc536..c9ebec02e 100644 --- a/src/repo/blockstore/flatfs.rs +++ b/src/repo/blockstore/flatfs.rs @@ -2,7 +2,6 @@ use crate::error::Error; use crate::repo::paths::{block_path, filestem_to_block_cid}; use crate::repo::{BlockPut, BlockStore}; use crate::Block; -use async_trait::async_trait; use futures::stream::{self, BoxStream}; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use ipld_core::cid::Cid; @@ -16,7 +15,7 @@ use tokio_stream::wrappers::ReadDirStream; /// File system backed block store. /// /// For information on path mangling, please see `block_path` and `filestem_to_block_cid`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FsBlockStore { inner: Arc>, } @@ -34,7 +33,6 @@ impl FsBlockStore { } } -#[async_trait] impl BlockStore for FsBlockStore { async fn init(&self) -> Result<(), Error> { let inner = &*self.inner.read().await; diff --git a/src/repo/blockstore/idb.rs b/src/repo/blockstore/idb.rs index 139680495..bd68249b3 100644 --- a/src/repo/blockstore/idb.rs +++ b/src/repo/blockstore/idb.rs @@ -4,7 +4,6 @@ use crate::{ repo::{BlockPut, BlockStore}, Block, Error, }; -use async_trait::async_trait; use futures::{channel::oneshot, stream::BoxStream, SinkExt, StreamExt}; use idb::{Database, DatabaseEvent, Factory, ObjectStoreParams, TransactionMode}; use ipld_core::cid::Cid; @@ -41,7 +40,6 @@ impl IdbBlockStore { } } -#[async_trait] impl BlockStore for IdbBlockStore { async fn init(&self) -> Result<(), Error> { let factory = self.factory.clone(); diff --git a/src/repo/blockstore/memory.rs b/src/repo/blockstore/memory.rs index 5d8a47f6e..a3a3c26cc 100644 --- a/src/repo/blockstore/memory.rs +++ b/src/repo/blockstore/memory.rs @@ -2,7 +2,6 @@ use crate::error::Error; use crate::repo::{BlockPut, BlockStore}; use crate::Block; -use async_trait::async_trait; use futures::stream::{self, BoxStream}; use futures::StreamExt; use ipld_core::cid::Cid; @@ -16,6 +15,7 @@ use std::sync::Arc; /// Describes an in-memory block store. /// /// Blocks are stored as a `HashMap` of the `Cid` and `Block`. +#[derive(Clone)] pub struct MemBlockStore { inner: Arc>, } @@ -42,7 +42,6 @@ impl MemBlockStore { } } -#[async_trait] impl BlockStore for MemBlockStore { async fn init(&self) -> Result<(), Error> { Ok(()) diff --git a/src/repo/blockstore/mod.rs b/src/repo/blockstore/mod.rs index fdaed1fc0..e4842b0c9 100644 --- a/src/repo/blockstore/mod.rs +++ b/src/repo/blockstore/mod.rs @@ -3,3 +3,6 @@ pub mod flatfs; #[cfg(target_arch = "wasm32")] pub mod idb; pub mod memory; + +pub mod arc; +pub mod either; diff --git a/src/repo/datastore/arc.rs b/src/repo/datastore/arc.rs new file mode 100644 index 000000000..832c13ee2 --- /dev/null +++ b/src/repo/datastore/arc.rs @@ -0,0 +1,78 @@ +use std::sync::Arc; + +use crate::error::Error; +use crate::repo::{DataStore, PinStore, References}; +use crate::{PinKind, PinMode}; +use futures::stream::BoxStream; +use ipld_core::cid::Cid; + +impl DataStore for Arc { + async fn init(&self) -> Result<(), Error> { + (**self).init().await + } + + async fn contains(&self, key: &[u8]) -> Result { + (**self).contains(key).await + } + + async fn get(&self, key: &[u8]) -> Result>, Error> { + (**self).get(key).await + } + + async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { + (**self).put(key, value).await + } + + async fn remove(&self, key: &[u8]) -> Result<(), Error> { + (**self).remove(key).await + } + + async fn iter(&self) -> BoxStream<'static, (Vec, Vec)> { + (**self).iter().await + } +} + +impl PinStore for Arc

{ + async fn is_pinned(&self, block: &Cid) -> Result { + (**self).is_pinned(block).await + } + + async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> { + (**self).insert_direct_pin(target).await + } + + async fn insert_recursive_pin( + &self, + target: &Cid, + referenced: References<'_>, + ) -> Result<(), Error> { + (**self).insert_recursive_pin(target, referenced).await + } + + async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> { + (**self).remove_direct_pin(target).await + } + + async fn remove_recursive_pin( + &self, + target: &Cid, + referenced: References<'_>, + ) -> Result<(), Error> { + (**self).remove_recursive_pin(target, referenced).await + } + + async fn list( + &self, + mode: Option, + ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> { + (**self).list(mode).await + } + + async fn query( + &self, + ids: Vec, + requirement: Option, + ) -> Result)>, Error> { + (**self).query(ids, requirement).await + } +} diff --git a/src/repo/datastore/either.rs b/src/repo/datastore/either.rs new file mode 100644 index 000000000..863659dc5 --- /dev/null +++ b/src/repo/datastore/either.rs @@ -0,0 +1,121 @@ +use either::Either; +use futures::stream::BoxStream; +use ipld_core::cid::Cid; + +use crate::error::Error; +use crate::repo::{DataStore, PinStore, References}; +use crate::{PinKind, PinMode}; + +impl DataStore for Either { + async fn init(&self) -> Result<(), Error> { + match self { + Either::Left(datastore) => datastore.init().await, + Either::Right(datastore) => datastore.init().await, + } + } + + async fn contains(&self, key: &[u8]) -> Result { + match self { + Either::Left(datastore) => datastore.contains(key).await, + Either::Right(datastore) => datastore.contains(key).await, + } + } + + async fn get(&self, key: &[u8]) -> Result>, Error> { + match self { + Either::Left(datastore) => datastore.get(key).await, + Either::Right(datastore) => datastore.get(key).await, + } + } + + async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { + match self { + Either::Left(datastore) => datastore.put(key, value).await, + Either::Right(datastore) => datastore.put(key, value).await, + } + } + + async fn remove(&self, key: &[u8]) -> Result<(), Error> { + match self { + Either::Left(datastore) => datastore.remove(key).await, + Either::Right(datastore) => datastore.remove(key).await, + } + } + + async fn iter(&self) -> BoxStream<'static, (Vec, Vec)> { + match self { + Either::Left(datastore) => datastore.iter().await, + Either::Right(datastore) => datastore.iter().await, + } + } +} + +impl PinStore for Either { + async fn is_pinned(&self, block: &Cid) -> Result { + match self { + Either::Left(datastore) => datastore.is_pinned(block).await, + Either::Right(datastore) => datastore.is_pinned(block).await, + } + } + + async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> { + match self { + Either::Left(datastore) => datastore.insert_direct_pin(target).await, + Either::Right(datastore) => datastore.insert_direct_pin(target).await, + } + } + + async fn insert_recursive_pin( + &self, + target: &Cid, + referenced: References<'_>, + ) -> Result<(), Error> { + match self { + Either::Left(datastore) => datastore.insert_recursive_pin(target, referenced).await, + Either::Right(datastore) => { + datastore.insert_recursive_pin(target, referenced).await + } + } + } + + async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> { + match self { + Either::Left(datastore) => datastore.remove_direct_pin(target).await, + Either::Right(datastore) => datastore.remove_direct_pin(target).await, + } + } + + async fn remove_recursive_pin( + &self, + target: &Cid, + referenced: References<'_>, + ) -> Result<(), Error> { + match self { + Either::Left(datastore) => datastore.remove_recursive_pin(target, referenced).await, + Either::Right(datastore) => { + datastore.remove_recursive_pin(target, referenced).await + } + } + } + + async fn list( + &self, + mode: Option, + ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> { + match self { + Either::Left(datastore) => datastore.list(mode).await, + Either::Right(datastore) => datastore.list(mode).await, + } + } + + async fn query( + &self, + ids: Vec, + requirement: Option, + ) -> Result)>, Error> { + match self { + Either::Left(datastore) => datastore.query(ids, requirement).await, + Either::Right(datastore) => datastore.query(ids, requirement).await, + } + } +} diff --git a/src/repo/datastore/flatfs.rs b/src/repo/datastore/flatfs.rs index 135505124..77803ed6b 100644 --- a/src/repo/datastore/flatfs.rs +++ b/src/repo/datastore/flatfs.rs @@ -2,7 +2,6 @@ use crate::error::Error; use crate::repo::paths::{filestem_to_pin_cid, pin_path}; use crate::repo::{DataStore, PinKind, PinMode, PinModeRequirement, PinStore, References}; -use async_trait::async_trait; use core::convert::TryFrom; use futures::stream::{BoxStream, TryStreamExt}; use futures::StreamExt; @@ -20,7 +19,7 @@ use tokio_util::either::Either; /// their indirect descendants. Pin files are separated by their file extensions. /// /// When modifying, single lock is used. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FsDataStore { /// The base directory under which we have a sharded directory structure, and the individual /// blocks are stored under the shard. See unixfs/examples/cat.rs for read example. @@ -176,7 +175,7 @@ fn build_kv, P: AsRef>( /// The column operations are all unimplemented pending at least downscoping of the /// DataStore trait itself. -#[async_trait] + impl DataStore for FsDataStore { async fn init(&self) -> Result<(), Error> { // Although `pins` directory is created when inserting a data, is it not created when there are any attempts at listing the pins (thus causing to fail) @@ -213,7 +212,7 @@ impl DataStore for FsDataStore { // PinStore is a trait from ipfs::repo implemented on FsDataStore defined at ipfs::repo::fs or // parent module. -#[async_trait] + impl PinStore for FsDataStore { async fn is_pinned(&self, cid: &Cid) -> Result { let path = pin_path(self.path.join("pins"), cid); diff --git a/src/repo/datastore/idb.rs b/src/repo/datastore/idb.rs index dab549c64..552d04091 100644 --- a/src/repo/datastore/idb.rs +++ b/src/repo/datastore/idb.rs @@ -1,7 +1,5 @@ use std::{collections::BTreeSet, rc::Rc, str::FromStr, sync::OnceLock}; -use async_trait::async_trait; - use crate::{ repo::{DataStore, PinModeRequirement, PinStore, References}, Error, PinKind, PinMode, @@ -44,7 +42,6 @@ impl IdbDataStore { } } -#[async_trait] impl DataStore for IdbDataStore { async fn init(&self) -> Result<(), Error> { let factory = self.factory.clone(); @@ -232,7 +229,6 @@ impl DataStore for IdbDataStore { // in the transactional parts of the [`Infallible`] is used to signal there is no additional // custom error, not that the transaction was infallible in itself. -#[async_trait] impl PinStore for IdbDataStore { async fn is_pinned(&self, cid: &Cid) -> Result { let cid = cid.to_owned(); diff --git a/src/repo/datastore/memory.rs b/src/repo/datastore/memory.rs index b2fda2170..b683b74ff 100644 --- a/src/repo/datastore/memory.rs +++ b/src/repo/datastore/memory.rs @@ -1,6 +1,5 @@ use crate::error::Error; use crate::repo::{DataStore, PinKind, PinMode, PinModeRequirement, PinStore}; -use async_trait::async_trait; use futures::StreamExt; use ipld_core::cid::{self, Cid}; use std::path::PathBuf; @@ -14,9 +13,9 @@ use std::collections::HashMap; use std::sync::Arc; /// Describes an in-memory `DataStore`. -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct MemDataStore { - inner: Mutex, Vec>>, + inner: Arc, Vec>>>, // this could also be PinDocument however doing any serialization allows to see the required // error types easier pin: Arc, Vec>>>, @@ -105,7 +104,6 @@ impl MemDataStore { } } -#[async_trait] impl PinStore for MemDataStore { async fn is_pinned(&self, block: &Cid) -> Result { let key = block.to_bytes(); @@ -312,7 +310,6 @@ impl PinStore for MemDataStore { } } -#[async_trait] impl DataStore for MemDataStore { async fn init(&self) -> Result<(), Error> { Ok(()) diff --git a/src/repo/datastore/mod.rs b/src/repo/datastore/mod.rs index 84dc68ecf..443dc6d98 100644 --- a/src/repo/datastore/mod.rs +++ b/src/repo/datastore/mod.rs @@ -4,3 +4,6 @@ pub mod memory; #[cfg(target_arch = "wasm32")] pub mod idb; + +pub mod arc; +pub mod either; diff --git a/src/repo/default_impl.rs b/src/repo/default_impl.rs new file mode 100644 index 000000000..0597a7475 --- /dev/null +++ b/src/repo/default_impl.rs @@ -0,0 +1,240 @@ +#[cfg(not(target_arch = "wasm32"))] +use crate::repo::blockstore::flatfs::FsBlockStore; +#[cfg(target_arch = "wasm32")] +use crate::repo::blockstore::idb::IdbBlockStore; +use crate::repo::blockstore::memory::MemBlockStore; +#[cfg(not(target_arch = "wasm32"))] +use crate::repo::datastore::flatfs::FsDataStore; +#[cfg(target_arch = "wasm32")] +use crate::repo::datastore::idb::IdbDataStore; +use crate::repo::datastore::memory::MemDataStore; +use crate::repo::{lock, RepoTypes}; +use crate::Block; + +#[cfg(target_arch = "wasm32")] +use std::sync::Arc; + +use either::Either; +use futures::stream::BoxStream; +use ipld_core::cid::Cid; + +use crate::error::Error; + +use super::{ + BlockPut, BlockStore, DataStore, Lock, LockError, PinKind, PinMode, PinStore, References, +}; + +#[derive(Debug)] +#[cfg(not(target_arch = "wasm32"))] +pub struct DefaultStorage { + blockstore: Either, + datastore: Either, + lockfile: Either, +} + +#[derive(Debug)] +#[cfg(target_arch = "wasm32")] +pub struct DefaultStorage { + blockstore: Either>, + datastore: Either>, + lockfile: Either, +} + +impl Default for DefaultStorage { + fn default() -> Self { + Self { + blockstore: Either::Left(MemBlockStore::new(Default::default())), + datastore: Either::Left(MemDataStore::new(Default::default())), + lockfile: Either::Left(lock::MemLock), + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +impl DefaultStorage { + /// Set path to trigger persistent storage + #[allow(dead_code)] + pub(crate) fn set_path(&mut self, path: impl AsRef) { + let path = path.as_ref().to_path_buf(); + self.blockstore = Either::Right(FsBlockStore::new(path.clone())); + self.datastore = Either::Right(FsDataStore::new(path.clone())); + self.lockfile = Either::Right(lock::FsLock::new(path.clone())); + } + + pub(crate) fn set_blockstore_path(&mut self, path: impl AsRef) { + let path = path.as_ref().to_path_buf(); + self.blockstore = Either::Right(FsBlockStore::new(path.clone())); + } + + pub(crate) fn set_datastore_path(&mut self, path: impl AsRef) { + let path = path.as_ref().to_path_buf(); + self.datastore = Either::Right(FsDataStore::new(path.clone())); + } + + pub(crate) fn set_lockfile(&mut self, path: impl AsRef) { + let path = path.as_ref().to_path_buf(); + self.lockfile = Either::Right(lock::FsLock::new(path.clone())); + } + + #[allow(dead_code)] + pub(crate) fn remove_paths(&mut self) { + self.blockstore = Either::Left(MemBlockStore::new(Default::default())); + self.datastore = Either::Left(MemDataStore::new(Default::default())); + self.lockfile = Either::Left(lock::MemLock); + } +} + +#[cfg(target_arch = "wasm32")] +impl DefaultStorage { + /// Set path to trigger persistent storage + pub(crate) fn set_namespace(&mut self, namespace: impl Into>) { + let namespace = namespace.into(); + self.blockstore = Either::Right(Arc::new(IdbBlockStore::new(namespace.clone()))); + self.datastore = Either::Right(Arc::new(IdbDataStore::new(namespace.clone()))); + self.lockfile = Either::Right(lock::MemLock); + } + + #[allow(dead_code)] + pub(crate) fn remove_namespace(&mut self) { + self.blockstore = Either::Left(MemBlockStore::new(Default::default())); + self.datastore = Either::Left(MemDataStore::new(Default::default())); + self.lockfile = Either::Left(lock::MemLock); + } +} + +impl Clone for DefaultStorage { + fn clone(&self) -> Self { + Self { + blockstore: self.blockstore.clone(), + datastore: self.datastore.clone(), + lockfile: self.lockfile.clone(), + } + } +} + +impl RepoTypes for DefaultStorage { + type TBlockStore = DefaultStorage; + type TDataStore = DefaultStorage; + type TLock = DefaultStorage; +} + +impl Unpin for DefaultStorage {} + +impl BlockStore for DefaultStorage { + async fn init(&self) -> Result<(), Error> { + self.blockstore.init().await + } + + async fn contains(&self, cid: &Cid) -> Result { + self.blockstore.contains(cid).await + } + + async fn get(&self, cid: &Cid) -> Result, Error> { + self.blockstore.get(cid).await + } + + async fn size(&self, cid: &[Cid]) -> Result, Error> { + self.blockstore.size(cid).await + } + + async fn total_size(&self) -> Result { + self.blockstore.total_size().await + } + + async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> { + self.blockstore.put(block).await + } + + async fn remove(&self, cid: &Cid) -> Result<(), Error> { + self.blockstore.remove(cid).await + } + + async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid> { + self.blockstore.remove_many(blocks).await + } + + async fn list(&self) -> BoxStream<'static, Cid> { + self.blockstore.list().await + } +} + +impl DataStore for DefaultStorage { + async fn init(&self) -> Result<(), Error> { + self.datastore.init().await + } + + async fn contains(&self, key: &[u8]) -> Result { + self.datastore.contains(key).await + } + + async fn get(&self, key: &[u8]) -> Result>, Error> { + self.datastore.get(key).await + } + + async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { + self.datastore.put(key, value).await + } + + async fn remove(&self, key: &[u8]) -> Result<(), Error> { + self.datastore.remove(key).await + } + + async fn iter(&self) -> BoxStream<'static, (Vec, Vec)> { + self.datastore.iter().await + } +} + +impl PinStore for DefaultStorage { + async fn is_pinned(&self, block: &Cid) -> Result { + self.datastore.is_pinned(block).await + } + + async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> { + self.datastore.insert_direct_pin(target).await + } + + async fn insert_recursive_pin( + &self, + target: &Cid, + referenced: References<'_>, + ) -> Result<(), Error> { + self.datastore + .insert_recursive_pin(target, referenced) + .await + } + + async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> { + self.datastore.remove_direct_pin(target).await + } + + async fn remove_recursive_pin( + &self, + target: &Cid, + referenced: References<'_>, + ) -> Result<(), Error> { + self.datastore + .remove_recursive_pin(target, referenced) + .await + } + + async fn list( + &self, + mode: Option, + ) -> BoxStream<'static, Result<(Cid, PinMode), Error>> { + self.datastore.list(mode).await + } + + async fn query( + &self, + ids: Vec, + requirement: Option, + ) -> Result)>, Error> { + self.datastore.query(ids, requirement).await + } +} + +impl Lock for DefaultStorage { + fn try_exclusive(&self) -> Result<(), LockError> { + self.lockfile.try_exclusive() + } +} diff --git a/src/repo/lock.rs b/src/repo/lock.rs index 2d238a34c..70d8de6c1 100644 --- a/src/repo/lock.rs +++ b/src/repo/lock.rs @@ -2,11 +2,22 @@ //! //! Consists of [`FsDataStore`] and [`FsBlockStore`]. +#[cfg(not(target_arch = "wasm32"))] +use std::sync::Arc; + +use either::Either; + use super::{Lock, LockError}; #[cfg(not(target_arch = "wasm32"))] -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct FsLock { + inner: Arc, +} + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Debug)] +struct InnerFsLock { file: parking_lot::Mutex>, path: std::path::PathBuf, state: parking_lot::Mutex, @@ -23,9 +34,11 @@ enum State { impl FsLock { pub fn new(path: std::path::PathBuf) -> Self { Self { - file: parking_lot::Mutex::new(None), - path, - state: parking_lot::Mutex::new(State::Unlocked), + inner: Arc::new(InnerFsLock { + file: parking_lot::Mutex::new(None), + path, + state: parking_lot::Mutex::new(State::Unlocked), + }), } } } @@ -41,18 +54,18 @@ impl Lock for FsLock { .write(true) .create(true) .truncate(true) - .open(&self.path)?; + .open(&self.inner.path)?; file.try_lock_exclusive()?; - *self.state.lock() = State::Exclusive; - *self.file.lock() = Some(file); + *self.inner.state.lock() = State::Exclusive; + *self.inner.file.lock() = Some(file); Ok(()) } } -#[derive(Debug, Default)] +#[derive(Clone, Copy, Debug, Default)] pub struct MemLock; impl Lock for MemLock { @@ -61,6 +74,15 @@ impl Lock for MemLock { } } +impl Lock for Either { + fn try_exclusive(&self) -> Result<(), LockError> { + match self { + Either::Left(lock) => lock.try_exclusive(), + Either::Right(lock) => lock.try_exclusive(), + } + } +} + #[cfg(test)] mod tests { use super::{FsLock, Lock}; diff --git a/src/repo/mod.rs b/src/repo/mod.rs index 82a74be87..ee1ba613e 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -1,8 +1,8 @@ //! Storage implementation(s) backing the [`crate::Ipfs`]. use crate::error::Error; -use crate::{Block, StorageType}; -use async_trait::async_trait; +use crate::Block; use core::fmt::Debug; +use default_impl::DefaultStorage; use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::future::{BoxFuture, Either}; use futures::sink::SinkExt; @@ -30,6 +30,8 @@ use tracing::{Instrument, Span}; #[cfg(test)] mod common_tests; +pub(crate) mod default_impl; + pub mod blockstore; pub mod datastore; pub mod lock; @@ -64,51 +66,50 @@ pub enum BlockRmError { NotFound(Cid), } +pub trait StoreOpt { + fn into_opt(self) -> Self; +} + /// This API is being discussed and evolved, which will likely lead to breakage. -#[async_trait] pub trait BlockStore: Debug + Send + Sync { - async fn init(&self) -> Result<(), Error>; + fn init(&self) -> impl Future> + Send; - #[deprecated] - async fn open(&self) -> Result<(), Error> { - Ok(()) - } /// Returns whether a block is present in the blockstore. - async fn contains(&self, cid: &Cid) -> Result; + fn contains(&self, cid: &Cid) -> impl Future> + Send; /// Returns a block from the blockstore. - async fn get(&self, cid: &Cid) -> Result, Error>; + fn get(&self, cid: &Cid) -> impl Future, Error>> + Send; /// Get the size of a single block - async fn size(&self, cid: &[Cid]) -> Result, Error>; + fn size(&self, cid: &[Cid]) -> impl Future, Error>> + Send; /// Get a total size of the block store - async fn total_size(&self) -> Result; + fn total_size(&self) -> impl Future> + Send; /// Inserts a block in the blockstore. - async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error>; + fn put(&self, block: &Block) -> impl Future> + Send; /// Removes a block from the blockstore. - async fn remove(&self, cid: &Cid) -> Result<(), Error>; + fn remove(&self, cid: &Cid) -> impl Future> + Send; /// Remove multiple blocks from the blockstore - async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid>; + fn remove_many( + &self, + blocks: BoxStream<'static, Cid>, + ) -> impl Future> + Send; /// Returns a list of the blocks (Cids), in the blockstore. - async fn list(&self) -> BoxStream<'static, Cid>; + fn list(&self) -> impl Future> + Send; } -#[async_trait] /// Generic layer of abstraction for a key-value data store. pub trait DataStore: PinStore + Debug + Send + Sync { - async fn init(&self) -> Result<(), Error>; - #[deprecated] - async fn open(&self) -> Result<(), Error> { - Ok(()) - } + fn init(&self) -> impl Future> + Send; /// Checks if a key is present in the datastore. - async fn contains(&self, key: &[u8]) -> Result; + fn contains(&self, key: &[u8]) -> impl Future> + Send; /// Returns the value associated with a key from the datastore. - async fn get(&self, key: &[u8]) -> Result>, Error>; + fn get(&self, key: &[u8]) -> impl Future>, Error>> + Send; /// Puts the value under the key in the datastore. - async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error>; + fn put(&self, key: &[u8], value: &[u8]) -> impl Future> + Send; /// Removes a key-value pair from the datastore. - async fn remove(&self, key: &[u8]) -> Result<(), Error>; + fn remove(&self, key: &[u8]) -> impl Future> + Send; /// Iterate over the k/v of the datastore - async fn iter(&self) -> futures::stream::BoxStream<'static, (Vec, Vec)>; + fn iter( + &self, + ) -> impl Future, Vec)>> + Send; } #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] @@ -194,43 +195,37 @@ pub trait Lock: Debug + Send + Sync { type References<'a> = BoxStream<'a, Result>; -#[async_trait] pub trait PinStore: Debug + Send + Sync { - async fn is_pinned(&self, block: &Cid) -> Result; + fn is_pinned(&self, block: &Cid) -> impl Future> + Send; - async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error>; + fn insert_direct_pin(&self, target: &Cid) -> impl Future> + Send; - async fn insert_recursive_pin( + fn insert_recursive_pin( &self, target: &Cid, referenced: References<'_>, - ) -> Result<(), Error>; + ) -> impl Future> + Send; - async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error>; + fn remove_direct_pin(&self, target: &Cid) -> impl Future> + Send; - async fn remove_recursive_pin( + fn remove_recursive_pin( &self, target: &Cid, referenced: References<'_>, - ) -> Result<(), Error>; + ) -> impl Future> + Send; - async fn list( + fn list( &self, mode: Option, - ) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>>; + ) -> impl Future>> + Send; - // here we should have resolved ids - // go-ipfs: doesnt start fetching the paths - // js-ipfs: starts fetching paths - // FIXME: there should probably be an additional Result<$inner, Error> here; the per pin error - // is serde OR cid::Error. /// Returns error if any of the ids isn't pinned in the required type, otherwise returns /// the pin details if all of the cids are pinned in one way or the another. - async fn query( + fn query( &self, ids: Vec, requirement: Option, - ) -> Result)>, Error>; + ) -> impl Future)>, Error>> + Send; } /// `PinMode` is the description of pin type for quering purposes. @@ -322,24 +317,36 @@ impl> PinKind { type SubscriptionsMap = HashMap>>>; +/// Represents the configuration of the Ipfs node, its backing blockstore and datastore. +pub trait StorageTypes: RepoTypes {} +impl StorageTypes for T {} + +pub trait RepoTypes: Clone + Send + Sync + 'static { + /// Describes a blockstore. + type TBlockStore: BlockStore; + /// Describes a datastore. + type TDataStore: DataStore; + type TLock: Lock; +} + /// Describes a repo. /// Consolidates a blockstore, a datastore and a subscription registry. #[allow(clippy::type_complexity)] #[derive(Debug, Clone)] -pub struct Repo { - pub(crate) inner: Arc, +pub struct Repo { + pub(crate) inner: Arc>, } #[derive(Debug)] -pub(crate) struct RepoInner { +pub(crate) struct RepoInner { online: AtomicBool, initialized: AtomicBool, max_storage_size: AtomicUsize, - block_store: Box, - data_store: Box, + block_store: S::TBlockStore, + data_store: S::TDataStore, events: RwLock>>, pub(crate) subscriptions: Mutex, - lockfile: Box, + lockfile: S::TLock, pub(crate) gclock: tokio::sync::RwLock<()>, } @@ -361,50 +368,31 @@ pub enum RepoEvent { RemovedBlock(Cid), } -impl Repo { - pub fn new(repo_type: &mut StorageType) -> Self { - match repo_type { - StorageType::Memory => Repo::new_memory(), - #[cfg(not(target_arch = "wasm32"))] - StorageType::Disk(path) => Repo::new_fs(path), - #[cfg(target_arch = "wasm32")] - StorageType::IndexedDb { namespace } => Repo::new_idb(namespace.take()), - StorageType::Custom { - blockstore, - datastore, - lock, - } => Repo::new_raw( - blockstore.take().expect("Requires blockstore"), - datastore.take().expect("Requires datastore"), - lock.take() - .expect("Requires lockfile for data and block store"), - ), - } - } - - pub fn new_raw( - block_store: Box, - data_store: Box, - lockfile: Box, - ) -> Self { - let inner = RepoInner { - initialized: AtomicBool::default(), - online: AtomicBool::default(), - block_store, - data_store, - events: Default::default(), - subscriptions: Default::default(), - lockfile, - max_storage_size: Default::default(), - gclock: Default::default(), - }; - Repo { - inner: Arc::new(inner), - } - } +impl Repo { + // pub fn new(repo_type: &mut StorageType) -> Self { + // match repo_type { + // StorageType::Memory => Repo::new_memory(), + // #[cfg(not(target_arch = "wasm32"))] + // StorageType::Disk(path) => Repo::new_fs(path), + // #[cfg(target_arch = "wasm32")] + // StorageType::IndexedDb { namespace } => Repo::new_idb(namespace.take()), + // StorageType::Custom { + // blockstore, + // datastore, + // lock, + // } => Repo::new_raw( + // blockstore.take().expect("Requires blockstore"), + // datastore.take().expect("Requires datastore"), + // lock.take() + // .expect("Requires lockfile for data and block store"), + // ), + // } + // } #[cfg(not(target_arch = "wasm32"))] pub fn new_fs(path: impl AsRef) -> Self { + let mut default = DefaultStorage::default(); + let path = path.as_ref().to_path_buf(); let mut blockstore_path = path.clone(); let mut datastore_path = path.clone(); @@ -413,25 +401,46 @@ impl Repo { datastore_path.push("datastore"); lockfile_path.push("repo_lock"); - let block_store = Box::new(blockstore::flatfs::FsBlockStore::new(blockstore_path)); - let data_store = Box::new(datastore::flatfs::FsDataStore::new(datastore_path)); - let lockfile = Box::new(lock::FsLock::new(lockfile_path)); - Self::new_raw(block_store, data_store, lockfile) + default.set_blockstore_path(blockstore_path); + default.set_datastore_path(datastore_path); + default.set_lockfile(lockfile_path); + + Self::new_raw(default.clone(), default.clone(), default) } pub fn new_memory() -> Self { - let block_store = Box::new(blockstore::memory::MemBlockStore::new(Default::default())); - let data_store = Box::new(datastore::memory::MemDataStore::new(Default::default())); - let lockfile = Box::new(lock::MemLock); - Self::new_raw(block_store, data_store, lockfile) + let default = DefaultStorage::default(); + Self::new_raw(default.clone(), default.clone(), default) } #[cfg(target_arch = "wasm32")] pub fn new_idb(namespace: Option) -> Self { - let block_store = Box::new(blockstore::idb::IdbBlockStore::new(namespace.clone())); - let data_store = Box::new(datastore::idb::IdbDataStore::new(namespace)); - let lockfile = Box::new(lock::MemLock); - Self::new_raw(block_store, data_store, lockfile) + let mut default = DefaultStorage::default(); + default.set_namespace(namespace); + Self::new_raw(default.clone(), default.clone(), default) + } +} + +impl Repo { + pub fn new_raw( + block_store: S::TBlockStore, + data_store: S::TDataStore, + lockfile: S::TLock, + ) -> Self { + let inner = RepoInner { + initialized: AtomicBool::default(), + online: AtomicBool::default(), + block_store, + data_store, + events: Default::default(), + subscriptions: Default::default(), + lockfile, + max_storage_size: Default::default(), + gclock: Default::default(), + }; + Repo { + inner: Arc::new(inner), + } } pub fn set_max_storage_size(&self, size: usize) { @@ -587,22 +596,22 @@ impl Repo { } /// Puts a block into the block store. - pub fn put_block(&self, block: &Block) -> RepoPutBlock { + pub fn put_block(&self, block: &Block) -> RepoPutBlock { RepoPutBlock::new(self, block).broadcast_on_new_block(true) } /// Retrives a block from the block store, or starts fetching it from the network and awaits /// until it has been fetched. #[inline] - pub fn get_block>(&self, cid: C) -> RepoGetBlock { - RepoGetBlock::new(self.clone(), cid) + pub fn get_block>(&self, cid: C) -> RepoGetBlock { + RepoGetBlock::new(Repo::clone(self), cid) } /// Retrives a set of blocks from the block store, or starts fetching them from the network and awaits /// until it has been fetched. #[inline] - pub fn get_blocks(&self, cids: impl IntoIterator>) -> RepoGetBlocks { - RepoGetBlocks::new(self.clone()).blocks(cids) + pub fn get_blocks(&self, cids: impl IntoIterator>) -> RepoGetBlocks { + RepoGetBlocks::new(Repo::clone(self)).blocks(cids) } /// Get the size of listed blocks @@ -720,8 +729,8 @@ impl Repo { /// /// Recursively pinned Cids cannot be re-pinned non-recursively but non-recursively pinned Cids /// can be "upgraded to" being recursively pinned. - pub fn pin>(&self, cid: C) -> RepoInsertPin { - RepoInsertPin::new(self.clone(), cid) + pub fn pin>(&self, cid: C) -> RepoInsertPin { + RepoInsertPin::new(Repo::clone(self), cid) } /// Unpins a given Cid recursively or only directly. @@ -730,12 +739,12 @@ impl Repo { /// /// Unpinning an indirectly pinned Cid is not possible other than through its recursively /// pinned tree roots. - pub fn remove_pin>(&self, cid: C) -> RepoRemovePin { - RepoRemovePin::new(self.clone(), cid) + pub fn remove_pin>(&self, cid: C) -> RepoRemovePin { + RepoRemovePin::new(Repo::clone(self), cid) } - pub fn fetch>(&self, cid: C) -> RepoFetch { - RepoFetch::new(self.clone(), cid) + pub fn fetch>(&self, cid: C) -> RepoFetch { + RepoFetch::new(Repo::clone(self), cid) } /// Pins a given Cid recursively or directly (non-recursively). @@ -845,7 +854,7 @@ pub struct GCGuard<'a> { _g: RwLockReadGuard<'a, ()>, } -impl Repo { +impl Repo { /// Hold a guard to prevent GC from running until this guard has dropped /// Note: Until this guard drops, the GC task, if enabled, would not perform any cleanup. /// If the GC task is running, this guard will await until GC finishes @@ -854,17 +863,17 @@ impl Repo { GCGuard { _g } } - pub fn data_store(&self) -> &dyn DataStore { - &*self.inner.data_store + pub fn data_store(&self) -> &S::TDataStore { + &self.inner.data_store } } -pub struct RepoGetBlock { - instance: RepoGetBlocks, +pub struct RepoGetBlock { + instance: RepoGetBlocks, } -impl RepoGetBlock { - pub fn new(repo: Repo, cid: impl Borrow) -> Self { +impl RepoGetBlock { + pub fn new(repo: Repo, cid: impl Borrow) -> Self { let instance = RepoGetBlocks::new(repo).block(cid); Self { instance } } @@ -902,7 +911,7 @@ impl RepoGetBlock { } } -impl Future for RepoGetBlock { +impl Future for RepoGetBlock { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut self; @@ -913,8 +922,8 @@ impl Future for RepoGetBlock { } } -pub struct RepoGetBlocks { - repo: Option, +pub struct RepoGetBlocks { + repo: Option>, cids: IndexSet, providers: IndexSet, local: bool, @@ -923,8 +932,8 @@ pub struct RepoGetBlocks { stream: Option>>, } -impl RepoGetBlocks { - pub fn new(repo: Repo) -> Self { +impl RepoGetBlocks { + pub fn new(repo: Repo) -> Self { Self { repo: Some(repo), cids: IndexSet::new(), @@ -977,7 +986,7 @@ impl RepoGetBlocks { } } -impl Stream for RepoGetBlocks { +impl Stream for RepoGetBlocks { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -1101,7 +1110,7 @@ impl Stream for RepoGetBlocks { } } -impl IntoFuture for RepoGetBlocks { +impl IntoFuture for RepoGetBlocks { type Output = Result, Error>; type IntoFuture = BoxFuture<'static, Self::Output>; fn into_future(self) -> Self::IntoFuture { @@ -1113,18 +1122,18 @@ impl IntoFuture for RepoGetBlocks { } } -pub struct RepoPutBlock { - repo: Repo, +pub struct RepoPutBlock { + repo: Repo, block: Option, span: Option, broadcast_on_new_block: bool, } -impl RepoPutBlock { - fn new(repo: &Repo, block: &Block) -> Self { +impl RepoPutBlock { + fn new(repo: &Repo, block: &Block) -> Self { let block = Some(block.clone()); Self { - repo: repo.clone(), + repo: Repo::clone(repo), block, span: None, broadcast_on_new_block: true, @@ -1142,7 +1151,7 @@ impl RepoPutBlock { } } -impl IntoFuture for RepoPutBlock { +impl IntoFuture for RepoPutBlock { type IntoFuture = BoxFuture<'static, Self::Output>; type Output = Result; fn into_future(mut self) -> Self::IntoFuture { @@ -1175,8 +1184,8 @@ impl IntoFuture for RepoPutBlock { } } -pub struct RepoFetch { - repo: Repo, +pub struct RepoFetch { + repo: Repo, cid: Cid, span: Option, providers: Vec, @@ -1185,8 +1194,8 @@ pub struct RepoFetch { refs: crate::refs::IpldRefs, } -impl RepoFetch { - pub fn new>(repo: Repo, cid: C) -> Self { +impl RepoFetch { + pub fn new>(repo: Repo, cid: C) -> Self { let cid = cid.borrow(); Self { repo, @@ -1245,7 +1254,7 @@ impl RepoFetch { } } -impl IntoFuture for RepoFetch { +impl IntoFuture for RepoFetch { type Output = Result<(), Error>; type IntoFuture = BoxFuture<'static, Self::Output>; @@ -1290,8 +1299,8 @@ impl IntoFuture for RepoFetch { } } -pub struct RepoInsertPin { - repo: Repo, +pub struct RepoInsertPin { + repo: Repo, cid: Cid, span: Option, providers: Vec, @@ -1301,8 +1310,8 @@ pub struct RepoInsertPin { refs: crate::refs::IpldRefs, } -impl RepoInsertPin { - pub fn new>(repo: Repo, cid: C) -> Self { +impl RepoInsertPin { + pub fn new>(repo: Repo, cid: C) -> Self { let cid = cid.borrow(); Self { repo, @@ -1378,7 +1387,7 @@ impl RepoInsertPin { } } -impl IntoFuture for RepoInsertPin { +impl IntoFuture for RepoInsertPin { type Output = Result<(), Error>; type IntoFuture = BoxFuture<'static, Self::Output>; @@ -1425,16 +1434,16 @@ impl IntoFuture for RepoInsertPin { } } -pub struct RepoRemovePin { - repo: Repo, +pub struct RepoRemovePin { + repo: Repo, cid: Cid, span: Option, recursive: bool, refs: crate::refs::IpldRefs, } -impl RepoRemovePin { - pub fn new>(repo: Repo, cid: C) -> Self { +impl RepoRemovePin { + pub fn new>(repo: Repo, cid: C) -> Self { let cid = cid.borrow(); Self { repo, @@ -1458,7 +1467,7 @@ impl RepoRemovePin { } } -impl IntoFuture for RepoRemovePin { +impl IntoFuture for RepoRemovePin { type Output = Result<(), Error>; type IntoFuture = BoxFuture<'static, Self::Output>; diff --git a/src/task.rs b/src/task.rs index f5091396c..d0e2a0230 100644 --- a/src/task.rs +++ b/src/task.rs @@ -10,7 +10,7 @@ use futures::{ }; use pollable_map::stream::optional::OptionalStream; -use crate::{p2p::MultiaddrExt, Channel, InnerPubsubEvent}; +use crate::{p2p::MultiaddrExt, repo::default_impl::DefaultStorage, Channel, InnerPubsubEvent}; use crate::{ConnectionEvents, PeerConnectionEvents, TSwarmEvent}; use crate::{config::BOOTSTRAP_NODES, IpfsEvent, TSwarmEventFn}; @@ -65,7 +65,7 @@ pub struct IpfsTask> { pub listening_addresses: HashMap>, pub provider_stream: HashMap>, pub record_stream: HashMap>, - pub repo: Repo, + pub repo: Repo, pub kad_subscriptions: HashMap>, pub dht_peer_lookup: HashMap>>, pub bootstraps: HashSet, @@ -92,7 +92,7 @@ pub struct IpfsTask> { } impl> IpfsTask { - pub fn new(swarm: TSwarm, repo: &Repo, event_capacity: usize) -> Self { + pub fn new(swarm: TSwarm, repo: &Repo, event_capacity: usize) -> Self { IpfsTask { repo_events: OptionalStream::default(), from_facade: OptionalStream::default(), @@ -106,7 +106,7 @@ impl> IpfsTask { bitswap_cancellable: Default::default(), repo: repo.clone(), bootstraps: Default::default(), - swarm_event: Default::default(), + swarm_event: None, timer: Default::default(), relay_listener: Default::default(), local_external_addr: false, diff --git a/src/unixfs/add.rs b/src/unixfs/add.rs index 7ebdf0dee..c0f2ada21 100644 --- a/src/unixfs/add.rs +++ b/src/unixfs/add.rs @@ -1,6 +1,9 @@ use std::task::{Context, Poll}; -use crate::{repo::Repo, Block}; +use crate::{ + repo::{default_impl::DefaultStorage, Repo}, + Block, +}; use bytes::Bytes; use either::Either; #[allow(unused_imports)] @@ -47,7 +50,7 @@ impl From<&Path> for AddOpt { #[must_use = "does nothing unless you `.await` or poll the stream"] pub struct UnixfsAdd { - core: Option>, + core: Option>>, opt: Option, span: Span, chunk: Chunker, @@ -62,11 +65,11 @@ impl UnixfsAdd { Self::with_either(Either::Left(ipfs.clone()), opt) } - pub fn with_repo(repo: &Repo, opt: impl Into) -> Self { + pub fn with_repo(repo: &Repo, opt: impl Into) -> Self { Self::with_either(Either::Right(repo.clone()), opt) } - fn with_either(core: Either, opt: impl Into) -> Self { + fn with_either(core: Either>, opt: impl Into) -> Self { let opt = opt.into(); Self { core: Some(core), diff --git a/src/unixfs/cat.rs b/src/unixfs/cat.rs index ea8d39b98..59369c963 100644 --- a/src/unixfs/cat.rs +++ b/src/unixfs/cat.rs @@ -1,3 +1,4 @@ +use crate::repo::default_impl::DefaultStorage; use crate::{dag::IpldDag, repo::Repo, Block, Ipfs}; use async_stream::try_stream; use bytes::Bytes; @@ -23,7 +24,7 @@ use super::TraversalFailed; /// Returns a stream of bytes on the file pointed with the Cid. #[must_use = "does nothing unless you `.await` or poll the stream"] pub struct UnixfsCat { - core: Option>, + core: Option>>, span: Span, length: Option, starting_point: Option, @@ -39,11 +40,17 @@ impl UnixfsCat { Self::with_either(Either::Left(ipfs.clone()), starting_point) } - pub fn with_repo(repo: &Repo, starting_point: impl Into) -> Self { + pub fn with_repo( + repo: &Repo, + starting_point: impl Into, + ) -> Self { Self::with_either(Either::Right(repo.clone()), starting_point) } - fn with_either(core: Either, starting_point: impl Into) -> Self { + fn with_either( + core: Either>, + starting_point: impl Into, + ) -> Self { let starting_point = starting_point.into(); Self { core: Some(core), diff --git a/src/unixfs/get.rs b/src/unixfs/get.rs index 5c1dab642..1db23c742 100644 --- a/src/unixfs/get.rs +++ b/src/unixfs/get.rs @@ -15,6 +15,7 @@ use std::{ use tokio::io::AsyncWriteExt; use tracing::{Instrument, Span}; +use crate::repo::default_impl::DefaultStorage; use crate::{dag::IpldDag, repo::Repo, Ipfs, IpfsPath}; #[allow(unused_imports)] @@ -22,7 +23,7 @@ use super::{TraversalFailed, UnixfsStatus}; #[must_use = "does nothing unless you `.await` or poll the stream"] pub struct UnixfsGet { - core: Option>, + core: Option>>, dest: PathBuf, span: Span, path: Option, @@ -37,12 +38,16 @@ impl UnixfsGet { Self::with_either(Either::Left(ipfs.clone()), path, dest) } - pub fn with_repo(repo: &Repo, path: impl Into, dest: impl AsRef) -> Self { + pub fn with_repo( + repo: &Repo, + path: impl Into, + dest: impl AsRef, + ) -> Self { Self::with_either(Either::Right(repo.clone()), path, dest) } fn with_either( - core: Either, + core: Either>, path: impl Into, dest: impl AsRef, ) -> Self { diff --git a/src/unixfs/ls.rs b/src/unixfs/ls.rs index 7a4cfca89..434e0a316 100644 --- a/src/unixfs/ls.rs +++ b/src/unixfs/ls.rs @@ -12,7 +12,11 @@ use std::task::Context; use std::{task::Poll, time::Duration}; use tracing::{Instrument, Span}; -use crate::{dag::IpldDag, repo::Repo, Ipfs, IpfsPath}; +use crate::{ + dag::IpldDag, + repo::{default_impl::DefaultStorage, Repo}, + Ipfs, IpfsPath, +}; #[derive(Debug)] pub enum Entry { @@ -24,7 +28,7 @@ pub enum Entry { #[must_use = "does nothing unless you `.await` or poll the stream"] pub struct UnixfsLs { - core: Option>, + core: Option>>, span: Span, path: Option, providers: Vec, @@ -38,11 +42,11 @@ impl UnixfsLs { Self::with_either(Either::Left(ipfs.clone()), path) } - pub fn with_repo(repo: &Repo, path: impl Into) -> Self { + pub fn with_repo(repo: &Repo, path: impl Into) -> Self { Self::with_either(Either::Right(repo.clone()), path) } - fn with_either(core: Either, path: impl Into) -> Self { + fn with_either(core: Either>, path: impl Into) -> Self { let path = path.into(); Self { core: Some(core),