Skip to content

Commit db1c889

Browse files
authored
refactor: use CommunicationTask to handle sending message to IpfsTask. (#404)
1 parent df813b4 commit db1c889

File tree

4 files changed

+49
-61
lines changed

4 files changed

+49
-61
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
- feat: Add reconnect option to address book. [PR 356](https://github.com/dariusc93/rust-ipfs/pull/356)
77
- chore: use async-rt in place of rt utils. [PR 362](https://github.com/dariusc93/rust-ipfs/pull/362)
88
- feat: Implement configuration for connecting to IPFS Private Network. [PR 398](https://github.com/dariusc93/rust-ipfs/pull/398)
9+
- refactor: use CommunicationTask to handle sending message to IpfsTask. [PR 404](https://github.com/dariusc93/rust-ipfs/pull/404)
910

1011
# 0.14.1
1112
- fix: remove expect when session failed to get next block.

packages/libp2p-relay-manager/src/handler.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ impl ConnectionHandler for Handler {
5151
false
5252
}
5353

54-
fn on_behaviour_event(&mut self, _event: Self::FromBehaviour) {
55-
}
54+
fn on_behaviour_event(&mut self, _event: Self::FromBehaviour) {}
5655

5756
#[allow(clippy::wildcard_in_or_patterns)]
5857
fn on_connection_event(

src/lib.rs

Lines changed: 35 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,10 @@ use dag::{DagGet, DagPut};
4545
use either::Either;
4646
use futures::{
4747
channel::{
48-
mpsc::{channel, Sender, UnboundedReceiver},
48+
mpsc::UnboundedReceiver,
4949
oneshot::{self, channel as oneshot_channel, Sender as OneshotSender},
5050
},
5151
future::BoxFuture,
52-
sink::SinkExt,
5352
stream::{BoxStream, Stream},
5453
FutureExt, StreamExt, TryStreamExt,
5554
};
@@ -85,7 +84,7 @@ pub use self::{
8584
path::IpfsPath,
8685
repo::{PinKind, PinMode},
8786
};
88-
use async_rt::AbortableJoinHandle;
87+
use async_rt::{AbortableJoinHandle, CommunicationTask};
8988
use ipld_core::cid::Cid;
9089
use ipld_core::ipld::Ipld;
9190
use std::borrow::Borrow;
@@ -338,9 +337,8 @@ pub struct Ipfs {
338337
key: Keypair,
339338
keystore: Keystore,
340339
identify_conf: IdentifyConfiguration,
341-
to_task: Sender<IpfsEvent>,
340+
to_task: CommunicationTask<IpfsEvent>,
342341
record_key_validator: HashMap<String, Arc<dyn Fn(&str) -> anyhow::Result<Key> + Sync + Send>>,
343-
_guard: AbortableJoinHandle<()>,
344342
_gc_guard: AbortableJoinHandle<()>,
345343
}
346344

@@ -986,35 +984,18 @@ impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send> UninitializedIpfs<C> {
986984
}
987985
}
988986

989-
let mut _guard = AbortableJoinHandle::empty();
990-
let mut _gc_guard = AbortableJoinHandle::empty();
991-
992-
let (to_task, receiver) = channel::<IpfsEvent>(1);
993987
let id_conf = options.identify_configuration.clone();
994988

995989
let keystore = options.keystore.clone();
996990

997-
let mut ipfs = Ipfs {
998-
span: facade_span,
999-
repo,
1000-
identify_conf: id_conf,
1001-
key: keys.clone(),
1002-
keystore,
1003-
to_task,
1004-
record_key_validator,
1005-
_guard,
1006-
_gc_guard,
1007-
};
1008-
1009991
//Note: If `All` or `Pinned` are used, we would have to auto adjust the amount of
1010992
// provider records by adding the amount of blocks to the config.
1011993
//TODO: Add persistent layer for kad store
1012994
let blocks = match options.provider {
1013995
RepoProvider::None => vec![],
1014-
RepoProvider::All => ipfs.repo.list_blocks().await.collect::<Vec<_>>().await,
996+
RepoProvider::All => repo.list_blocks().await.collect::<Vec<_>>().await,
1015997
RepoProvider::Pinned => {
1016-
ipfs.repo
1017-
.list_pins(None)
998+
repo.list_pins(None)
1018999
.await
10191000
.filter_map(|result| futures::future::ready(result.map(|(cid, _)| cid).ok()))
10201001
.collect()
@@ -1047,7 +1028,7 @@ impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send> UninitializedIpfs<C> {
10471028
let swarm = create_swarm(
10481029
&keys,
10491030
&options,
1050-
&ipfs.repo,
1031+
&repo,
10511032
exec_span,
10521033
(custom_behaviour, custom_transport),
10531034
)?;
@@ -1058,7 +1039,7 @@ impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send> UninitializedIpfs<C> {
10581039

10591040
let gc_handle = gc_config.map(|config| {
10601041
async_rt::task::spawn_abortable({
1061-
let repo = ipfs.repo.clone();
1042+
let repo = repo.clone();
10621043
async move {
10631044
let GCConfig { duration, trigger } = config;
10641045
let use_config_timer = duration != Duration::ZERO;
@@ -1127,13 +1108,7 @@ impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send> UninitializedIpfs<C> {
11271108
})
11281109
}).unwrap_or(AbortableJoinHandle::empty());
11291110

1130-
let mut fut = task::IpfsTask::new(
1131-
swarm,
1132-
repo_events.fuse(),
1133-
receiver.fuse(),
1134-
&ipfs.repo,
1135-
options.connection_event_cap,
1136-
);
1111+
let mut fut = task::IpfsTask::new(swarm, &repo, options.connection_event_cap);
11371112
fut.swarm_event = swarm_event;
11381113
fut.local_external_addr = local_external_addr;
11391114

@@ -1163,26 +1138,33 @@ impl<C: NetworkBehaviour<ToSwarm = Infallible> + Send> UninitializedIpfs<C> {
11631138
}
11641139
}
11651140

1166-
let main_handle = async_rt::task::spawn_abortable({
1167-
async move {
1141+
let main_handle = async_rt::task::spawn_coroutine_with_context(
1142+
(repo_events, swarm_span, fut),
1143+
|(r_events, swarm_span, mut fut), recv: futures::channel::mpsc::Receiver<IpfsEvent>| async move {
1144+
fut.from_facade.replace(recv.fuse());
1145+
fut.repo_events.replace(r_events.fuse());
11681146
//Note: For now this is not configurable as its meant for internal testing purposes but may change in the future
11691147
let as_fut = false;
1170-
11711148
let fut = if as_fut {
11721149
fut.boxed()
11731150
} else {
11741151
fut.run().boxed()
11751152
};
1153+
fut.instrument(swarm_span).await
1154+
},
1155+
);
11761156

1177-
fut.await
1178-
}
1179-
.instrument(swarm_span)
1180-
});
1157+
let ipfs = Ipfs {
1158+
span: facade_span,
1159+
repo,
1160+
identify_conf: id_conf,
1161+
key: keys.clone(),
1162+
keystore,
1163+
to_task: main_handle,
1164+
record_key_validator,
1165+
_gc_guard: gc_handle,
1166+
};
11811167

1182-
unsafe {
1183-
ipfs._guard.replace(main_handle);
1184-
ipfs._gc_guard.replace(gc_handle);
1185-
}
11861168
Ok(ipfs)
11871169
}
11881170
}
@@ -2633,17 +2615,23 @@ impl Ipfs {
26332615
}
26342616

26352617
/// Exit daemon.
2636-
pub async fn exit_daemon(mut self) {
2618+
pub async fn exit_daemon(self) {
26372619
// FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of
26382620
// the background task or stream. After that this could be handled by dropping.
26392621
self.repo.shutdown();
26402622

26412623
// ignoring the error because it'd mean that the background task had already been dropped
26422624
let _ = self.to_task.try_send(IpfsEvent::Exit);
26432625

2644-
// terminte task that handles GC and spawn task
2626+
// TODO: Determine if we want to kill the task directly or let it gracefully close after completing all events
2627+
// self.to_task.abort();;
2628+
2629+
// terminte task that handles GC
26452630
self._gc_guard.abort();
2646-
self._guard.abort();
2631+
2632+
// yield to the runtime to allow runtime to process pending tasks
2633+
// TODO: Possibly remove along with async signature
2634+
// tokio::task::yield_now().await;
26472635
}
26482636
}
26492637

src/task.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use futures::{
88
stream::Fuse,
99
FutureExt, StreamExt,
1010
};
11+
use pollable_map::stream::optional::OptionalStream;
1112

1213
use crate::{p2p::MultiaddrExt, Channel, InnerPubsubEvent};
1314
use crate::{ConnectionEvents, PeerConnectionEvents, TSwarmEvent};
@@ -58,8 +59,8 @@ use tokio::sync::Notify;
5859
#[allow(dead_code)]
5960
pub struct IpfsTask<C: NetworkBehaviour<ToSwarm = Infallible>> {
6061
pub swarm: TSwarm<C>,
61-
pub repo_events: Fuse<Receiver<RepoEvent>>,
62-
pub from_facade: Fuse<Receiver<IpfsEvent>>,
62+
pub repo_events: OptionalStream<Fuse<Receiver<RepoEvent>>>,
63+
pub from_facade: OptionalStream<Fuse<Receiver<IpfsEvent>>>,
6364
pub bitswap_cancellable: HashMap<Cid, Vec<Arc<Notify>>>,
6465
pub listening_addresses: HashMap<ListenerId, Vec<Multiaddr>>,
6566
pub provider_stream: HashMap<QueryId, UnboundedSender<PeerId>>,
@@ -91,16 +92,10 @@ pub struct IpfsTask<C: NetworkBehaviour<ToSwarm = Infallible>> {
9192
}
9293

9394
impl<C: NetworkBehaviour<ToSwarm = Infallible>> IpfsTask<C> {
94-
pub fn new(
95-
swarm: TSwarm<C>,
96-
repo_events: Fuse<Receiver<RepoEvent>>,
97-
from_facade: Fuse<Receiver<IpfsEvent>>,
98-
repo: &Repo,
99-
event_capacity: usize,
100-
) -> Self {
95+
pub fn new(swarm: TSwarm<C>, repo: &Repo, event_capacity: usize) -> Self {
10196
IpfsTask {
102-
repo_events,
103-
from_facade,
97+
repo_events: OptionalStream::default(),
98+
from_facade: OptionalStream::default(),
10499
swarm,
105100
event_capacity,
106101
provider_stream: HashMap::new(),
@@ -154,7 +149,12 @@ impl<C: NetworkBehaviour<ToSwarm = Infallible>> futures::Future for IpfsTask<C>
154149
}
155150
loop {
156151
match self.from_facade.poll_next_unpin(cx) {
157-
Poll::Ready(Some(event)) => self.handle_event(event),
152+
Poll::Ready(Some(event)) => {
153+
if matches!(event, IpfsEvent::Exit) {
154+
return Poll::Ready(());
155+
}
156+
self.handle_event(event)
157+
}
158158
Poll::Ready(None) => return Poll::Ready(()),
159159
Poll::Pending => break,
160160
}

0 commit comments

Comments
 (0)