Skip to content

feat: Use RepoType in Repo and impl DefaultStorage #414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- chore: use async-rt in place of rt utils. [PR 362](https://github.com/dariusc93/rust-ipfs/pull/362)
- feat: Implement configuration for connecting to IPFS Private Network. [PR 398](https://github.com/dariusc93/rust-ipfs/pull/398)
- refactor: use CommunicationTask to handle sending message to IpfsTask. [PR 404](https://github.com/dariusc93/rust-ipfs/pull/404)
- feat: Use RepoType in Repo and impl DefaultStorage. [PR 414](https://github.com/dariusc93/rust-ipfs/pull/414)

# 0.14.1
- fix: remove expect when session failed to get next block.
Expand Down
7 changes: 4 additions & 3 deletions src/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::block::BlockCodec;
use crate::error::Error;
use crate::path::{IpfsPath, PathRoot, SlashedPath};
use crate::repo::default_impl::DefaultStorage;
use crate::repo::Repo;
use crate::{Block, Ipfs};
use bytes::Bytes;
Expand Down Expand Up @@ -179,11 +180,11 @@ impl RawResolveLocalError {
#[derive(Clone, Debug)]
pub struct IpldDag {
ipfs: Option<Ipfs>,
repo: Repo,
repo: Repo<DefaultStorage>,
}

impl From<Repo> for IpldDag {
fn from(repo: Repo) -> Self {
impl From<Repo<DefaultStorage>> for IpldDag {
fn from(repo: Repo<DefaultStorage>) -> Self {
IpldDag { ipfs: None, repo }
}
}
Expand Down
1 change: 1 addition & 0 deletions src/ipns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::borrow::Borrow;

use crate::p2p::DnsResolver;
use crate::path::{IpfsPath, PathRoot};
use crate::repo::DataStore;
use crate::Ipfs;

mod dnslink;
Expand Down
136 changes: 57 additions & 79 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use p2p::{
RelayConfig, RequestResponseConfig, SwarmConfig, TransportConfig,
};
use repo::{
BlockStore, DataStore, GCConfig, GCTrigger, Lock, RepoFetch, RepoInsertPin, RepoRemovePin,
default_impl::DefaultStorage, GCConfig, GCTrigger, RepoFetch, RepoInsertPin, RepoRemovePin,
};

use tracing::Span;
Expand All @@ -87,8 +87,8 @@ pub use self::{
use async_rt::{AbortableJoinHandle, CommunicationTask};
use ipld_core::cid::Cid;
use ipld_core::ipld::Ipld;
use std::borrow::Borrow;
use std::convert::Infallible;
use std::{borrow::Borrow, path::PathBuf};
use std::{
collections::{BTreeSet, HashMap, HashSet},
fmt,
Expand Down Expand Up @@ -126,51 +126,6 @@ use libp2p::{request_response::InboundRequestId, swarm::dial_opts::PeerCondition
pub use libp2p_connection_limits::ConnectionLimits;
use serde::Serialize;

#[allow(dead_code)]
#[deprecated(note = "Use `StoreageType` instead")]
type StoragePath = StorageType;

#[derive(Default, Debug)]
pub enum StorageType {
#[cfg(not(target_arch = "wasm32"))]
Disk(std::path::PathBuf),
#[default]
Memory,
#[cfg(target_arch = "wasm32")]
IndexedDb { namespace: Option<String> },
Custom {
blockstore: Option<Box<dyn BlockStore>>,
datastore: Option<Box<dyn DataStore>>,
lock: Option<Box<dyn Lock>>,
},
}

impl PartialEq for StorageType {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
#[cfg(not(target_arch = "wasm32"))]
(StorageType::Disk(left_path), StorageType::Disk(right_path)) => {
left_path.eq(right_path)
}
#[cfg(target_arch = "wasm32")]
(
StorageType::IndexedDb { namespace: left },
StorageType::IndexedDb { namespace: right },
) => left.eq(right),
(StorageType::Memory, StorageType::Memory) => true,
(StorageType::Custom { .. }, StorageType::Custom { .. }) => {
//Do we really care if they equal?
//TODO: Possibly implement PartialEq/Eq for the traits so we could make sure
// that they do or dont eq each other. For now this will always be true
true
}
_ => false,
}
}
}

impl Eq for StorageType {}

/// Ipfs node options used to configure the node to be created with [`UninitializedIpfs`].
struct IpfsOptions {
/// The path of the ipfs repo (blockstore and datastore).
Expand All @@ -182,7 +137,11 @@ struct IpfsOptions {
///
/// It is **not** recommended to set this to IPFS_PATH without first at least backing up your
/// existing repository.
pub ipfs_path: StorageType,
pub ipfs_path: Option<PathBuf>,

/// Enables and supply a name of the namespace used for indexeddb
#[cfg(target_arch = "wasm32")]
pub namespace: Option<Option<String>>,

/// Nodes used as bootstrap peers.
pub bootstrap: Vec<Multiaddr>,
Expand Down Expand Up @@ -285,7 +244,9 @@ pub enum RepoProvider {
impl Default for IpfsOptions {
fn default() -> Self {
Self {
ipfs_path: StorageType::Memory,
ipfs_path: None,
#[cfg(target_arch = "wasm32")]
namespace: None,
bootstrap: Default::default(),
relay_server_config: Default::default(),
kad_configuration: Either::Left(Default::default()),
Expand Down Expand Up @@ -333,7 +294,7 @@ impl fmt::Debug for IpfsOptions {
#[allow(clippy::type_complexity)]
pub struct Ipfs {
span: Span,
repo: Repo,
repo: Repo<DefaultStorage>,
key: Keypair,
keystore: Keystore,
identify_conf: IdentifyConfiguration,
Expand Down Expand Up @@ -557,7 +518,7 @@ pub struct UninitializedIpfs<C: NetworkBehaviour<ToSwarm = Infallible> + Send> {
keys: Option<Keypair>,
options: IpfsOptions,
fdlimit: Option<FDLimit>,
repo_handle: Option<Repo>,
repo_handle: Repo<DefaultStorage>,
local_external_addr: bool,
swarm_event: Option<TSwarmEventFn<C>>,
// record_validators: HashMap<String, Arc<dyn Fn(&str, &Record) -> bool + Sync + Send>>,
Expand All @@ -583,7 +544,7 @@ impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send> UninitializedIpfs<C> {
keys: None,
options: Default::default(),
fdlimit: None,
repo_handle: None,
repo_handle: Repo::new_memory(),
// record_validators: Default::default(),
record_key_validator: Default::default(),
local_external_addr: false,
Expand All @@ -604,10 +565,10 @@ impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send> UninitializedIpfs<C> {
}

/// Set storage type for the repo.
pub fn set_storage_type(mut self, storage_type: StorageType) -> Self {
self.options.ipfs_path = storage_type;
self
}
// pub fn set_storage_type(mut self, storage_type: StorageType) -> Self {
// self.options.ipfs_path = storage_type;
// self
// }

/// Adds a listening address
pub fn add_listening_addr(mut self, addr: Multiaddr) -> Self {
Expand Down Expand Up @@ -792,7 +753,14 @@ impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send> UninitializedIpfs<C> {
#[cfg(not(target_arch = "wasm32"))]
pub fn set_path<P: AsRef<Path>>(mut self, path: P) -> Self {
let path = path.as_ref().to_path_buf();
self.options.ipfs_path = StorageType::Disk(path);
self.options.ipfs_path = Some(path);
self
}

/// Sets a namespace
#[cfg(target_arch = "wasm32")]
pub fn set_namespace(mut self, ns: Option<String>) -> Self {
self.options.namespace = Some(ns);
self
}

Expand Down Expand Up @@ -853,8 +821,8 @@ impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send> UninitializedIpfs<C> {
}

/// Set block and data repo
pub fn set_repo(mut self, repo: &Repo) -> Self {
self.repo_handle = Some(repo.clone());
pub fn set_repo(mut self, repo: &Repo<DefaultStorage>) -> Self {
self.repo_handle = Repo::clone(repo);
self
}

Expand Down Expand Up @@ -940,23 +908,32 @@ impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send> UninitializedIpfs<C> {
// instruments the IpfsFuture, the background task.
let swarm_span = tracing::trace_span!(parent: &root_span, "swarm");

let repo = match repo_handle {
Some(repo) => {
if repo.is_online() {
anyhow::bail!("Repo is already initialized");
}
repo
}
None => {
#[cfg(not(target_arch = "wasm32"))]
if let StorageType::Disk(path) = &options.ipfs_path {
let mut repo = repo_handle;

if repo.is_online() {
anyhow::bail!("Repo is already initialized");
}

#[cfg(not(target_arch = "wasm32"))]
{
repo = match &options.ipfs_path {
Some(path) => {
if !path.is_dir() {
tokio::fs::create_dir_all(path).await?;
}
Repo::<DefaultStorage>::new_fs(path)
}
Repo::new(&mut options.ipfs_path)
}
};
None => repo,
};
}

#[cfg(target_arch = "wasm32")]
{
repo = match options.namespace.take() {
Some(ns) => Repo::<DefaultStorage>::new_idb(ns),
None => repo,
};
}

repo.init().instrument(init_span.clone()).await?;

Expand Down Expand Up @@ -1039,7 +1016,7 @@ impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send> UninitializedIpfs<C> {

let gc_handle = gc_config.map(|config| {
async_rt::task::spawn_abortable({
let repo = repo.clone();
let repo = Repo::clone(&repo);
async move {
let GCConfig { duration, trigger } = config;
let use_config_timer = duration != Duration::ZERO;
Expand Down Expand Up @@ -1176,7 +1153,7 @@ impl Ipfs {
}

/// Return an [`Repo`] to access the internal repo of the node
pub fn repo(&self) -> &Repo {
pub fn repo(&self) -> &Repo<DefaultStorage> {
&self.repo
}

Expand All @@ -1191,13 +1168,13 @@ impl Ipfs {
}

/// Puts a block into the ipfs repo.
pub fn put_block(&self, block: &Block) -> RepoPutBlock {
pub fn put_block(&self, block: &Block) -> RepoPutBlock<DefaultStorage> {
self.repo.put_block(block).span(self.span.clone())
}

/// Retrieves a block from the local blockstore, or starts fetching from the network or join an
/// already started fetch.
pub fn get_block(&self, cid: impl Borrow<Cid>) -> RepoGetBlock {
pub fn get_block(&self, cid: impl Borrow<Cid>) -> RepoGetBlock<DefaultStorage> {
self.repo.get_block(cid).span(self.span.clone())
}

Expand Down Expand Up @@ -1239,7 +1216,7 @@ impl Ipfs {
/// If a recursive `insert_pin` operation is interrupted because of a crash or the crash
/// prevents from synchronizing the data store to disk, this will leave the system in an inconsistent
/// state. The remedy is to re-pin recursive pins.
pub fn insert_pin(&self, cid: impl Borrow<Cid>) -> RepoInsertPin {
pub fn insert_pin(&self, cid: impl Borrow<Cid>) -> RepoInsertPin<DefaultStorage> {
self.repo().pin(cid).span(self.span.clone())
}

Expand All @@ -1249,7 +1226,7 @@ impl Ipfs {
///
/// Unpinning an indirectly pinned Cid is not possible other than through its recursively
/// pinned tree roots.
pub fn remove_pin(&self, cid: impl Borrow<Cid>) -> RepoRemovePin {
pub fn remove_pin(&self, cid: impl Borrow<Cid>) -> RepoRemovePin<DefaultStorage> {
self.repo().remove_pin(cid).span(self.span.clone())
}

Expand Down Expand Up @@ -2131,7 +2108,7 @@ impl Ipfs {
}

/// Fetches the block, and, if set, recursively walk the graph loading all the blocks to the blockstore.
pub fn fetch(&self, cid: &Cid) -> RepoFetch {
pub fn fetch(&self, cid: &Cid) -> RepoFetch<DefaultStorage> {
self.repo.fetch(cid).span(self.span.clone())
}

Expand Down Expand Up @@ -3085,6 +3062,7 @@ pub use node::Node;

/// Node module provides an easy to use interface used in `tests/`.
mod node {

use super::*;

/// Node encapsulates everything to setup a testing instance so that multi-node tests become
Expand Down
3 changes: 2 additions & 1 deletion src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use either::Either;
use serde::{Deserialize, Serialize};

use crate::error::Error;
use crate::repo::default_impl::DefaultStorage;
use crate::{IntoAddPeerOpt, IpfsOptions};

use crate::repo::Repo;
Expand Down Expand Up @@ -362,7 +363,7 @@ where
pub(crate) fn new(
keypair: &Keypair,
options: &IpfsOptions,
repo: &Repo,
repo: &Repo<DefaultStorage>,
custom: Option<C>,
) -> Result<(Self, Option<ClientTransport>), Error> {
let bootstrap = options.bootstrap.clone();
Expand Down
13 changes: 8 additions & 5 deletions src/p2p/bitswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ mod bitswap_pb {
}
}

use crate::{repo::Repo, Block};
use crate::{
repo::{default_impl::DefaultStorage, Repo},
Block,
};

use self::{
message::{BitswapMessage, BitswapRequest, BitswapResponse, RequestType},
Expand All @@ -62,14 +65,14 @@ pub struct Behaviour {
events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
connections: HashMap<PeerId, HashSet<ConnectionId>>,
blacklist_connections: HashMap<PeerId, BTreeSet<ConnectionId>>,
store: Repo,
store: Repo<DefaultStorage>,
want_session: StreamMap<Cid, WantSession>,
have_session: StreamMap<Cid, HaveSession>,
waker: Option<Waker>,
}

impl Behaviour {
pub fn new(store: &Repo) -> Self {
pub fn new(store: &Repo<DefaultStorage>) -> Self {
Self {
events: Default::default(),
connections: Default::default(),
Expand Down Expand Up @@ -559,7 +562,7 @@ impl NetworkBehaviour for Behaviour {
mod test {
use std::time::Duration;

use crate::block::BlockCodec;
use crate::{block::BlockCodec, repo::default_impl::DefaultStorage};
use futures::StreamExt;
use ipld_core::cid::Cid;
use libp2p::{
Expand Down Expand Up @@ -954,7 +957,7 @@ mod test {
Ok(())
}

async fn build_swarm() -> (PeerId, Multiaddr, Swarm<Behaviour>, Repo) {
async fn build_swarm() -> (PeerId, Multiaddr, Swarm<Behaviour>, Repo<DefaultStorage>) {
let repo = Repo::new_memory();

let mut swarm = SwarmBuilder::with_new_identity()
Expand Down
Loading