Skip to content

Commit 32e17b3

Browse files
committed
Use async kv store with OutputSweeper
1 parent 2014cf5 commit 32e17b3

File tree

3 files changed

+376
-111
lines changed

3 files changed

+376
-111
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ use lightning::sign::ChangeDestinationSourceSync;
4040
use lightning::sign::EntropySource;
4141
use lightning::sign::OutputSpender;
4242
use lightning::util::logger::Logger;
43+
use lightning::util::persist::KVStore;
4344
use lightning::util::persist::KVStoreSync;
4445
use lightning::util::persist::Persister;
4546
use lightning::util::persist::PersisterSync;
4647
use lightning::util::sweep::OutputSweeper;
4748
#[cfg(feature = "std")]
4849
use lightning::util::sweep::OutputSweeperSync;
50+
use lightning::util::sweep::OutputSweeperSyncKVStore;
4951
#[cfg(feature = "std")]
5052
use lightning::util::wakers::Sleeper;
5153
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -698,7 +700,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
698700
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
699701
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
700702
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
701-
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O>>;
703+
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
702704
///
703705
/// # struct Node<
704706
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
@@ -842,7 +844,7 @@ where
842844
LM::Target: ALiquidityManager,
843845
O::Target: 'static + OutputSpender,
844846
D::Target: 'static + ChangeDestinationSource,
845-
K::Target: 'static + KVStoreSync,
847+
K::Target: 'static + KVStore,
846848
{
847849
let mut should_break = false;
848850
let async_event_handler = |event| {
@@ -1020,7 +1022,7 @@ pub async fn process_events_async<
10201022
D: 'static + Deref,
10211023
O: 'static + Deref,
10221024
K: 'static + Deref,
1023-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
1025+
OS: 'static + Deref<Target = OutputSweeperSyncKVStore<T, D, F, CF, K, L, O>>,
10241026
S: 'static + Deref<Target = SC> + Send + Sync,
10251027
SC: for<'b> WriteableScore<'b>,
10261028
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -1050,6 +1052,7 @@ where
10501052
K::Target: 'static + KVStoreSync,
10511053
{
10521054
let persister = PersisterSyncWrapper::<'static, PS, CM, L, S>::new(persister);
1055+
let sweeper = sweeper.map(|s| s.sweeper_async());
10531056
process_events_full_async(
10541057
persister,
10551058
event_handler,
@@ -1302,6 +1305,7 @@ impl Drop for BackgroundProcessor {
13021305
#[cfg(all(feature = "std", test))]
13031306
mod tests {
13041307
use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
1308+
use crate::PersisterSyncWrapper;
13051309
use bitcoin::constants::{genesis_block, ChainHash};
13061310
use bitcoin::hashes::Hash;
13071311
use bitcoin::locktime::absolute::LockTime;
@@ -2255,11 +2259,12 @@ mod tests {
22552259
open_channel!(nodes[0], nodes[1], 100000);
22562260

22572261
let data_dir = nodes[0].kv_store.get_data_dir();
2258-
let persister = Arc::new(
2262+
let persister_sync = Arc::new(
22592263
PersisterSync::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
22602264
);
2265+
let persister = PersisterSyncWrapper::new(persister_sync);
22612266

2262-
let bp_future = super::process_events_async(
2267+
let bp_future = super::process_events_full_async(
22632268
persister,
22642269
|_: _| async { Ok(()) },
22652270
Arc::clone(&nodes[0].chain_monitor),
@@ -2766,11 +2771,12 @@ mod tests {
27662771
let (_, nodes) =
27672772
create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
27682773
let data_dir = nodes[0].kv_store.get_data_dir();
2769-
let persister =
2774+
let persister_sync =
27702775
Arc::new(PersisterSync::new(data_dir).with_graph_persistence_notifier(sender));
2776+
let persister = PersisterSyncWrapper::new(persister_sync);
27712777

27722778
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2773-
let bp_future = super::process_events_async(
2779+
let bp_future = super::process_events_full_async(
27742780
persister,
27752781
|_: _| async { Ok(()) },
27762782
Arc::clone(&nodes[0].chain_monitor),
@@ -2983,11 +2989,12 @@ mod tests {
29832989

29842990
let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
29852991
let data_dir = nodes[0].kv_store.get_data_dir();
2986-
let persister = Arc::new(PersisterSync::new(data_dir));
2992+
let persister_sync = Arc::new(PersisterSync::new(data_dir));
2993+
let persister = PersisterSyncWrapper::new(persister_sync);
29872994

29882995
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
29892996

2990-
let bp_future = super::process_events_async(
2997+
let bp_future = super::process_events_full_async(
29912998
persister,
29922999
event_handler,
29933000
Arc::clone(&nodes[0].chain_monitor),

lightning/src/util/persist.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,58 @@ pub trait KVStoreSync {
121121
) -> Result<Vec<String>, io::Error>;
122122
}
123123

124+
/// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait.
125+
pub struct KVStoreSyncWrapper<K: Deref>(pub K)
126+
where
127+
K::Target: KVStoreSync;
128+
129+
impl<K: Deref> Deref for KVStoreSyncWrapper<K>
130+
where
131+
K::Target: KVStoreSync,
132+
{
133+
type Target = Self;
134+
fn deref(&self) -> &Self {
135+
self
136+
}
137+
}
138+
139+
impl<K: Deref> KVStore for KVStoreSyncWrapper<K>
140+
where
141+
K::Target: KVStoreSync,
142+
{
143+
fn read(
144+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
145+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> {
146+
let res = self.0.read(primary_namespace, secondary_namespace, key);
147+
148+
Box::pin(async move { res })
149+
}
150+
151+
fn write(
152+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
153+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
154+
let res = self.0.write(primary_namespace, secondary_namespace, key, buf);
155+
156+
Box::pin(async move { res })
157+
}
158+
159+
fn remove(
160+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
161+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
162+
let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy);
163+
164+
Box::pin(async move { res })
165+
}
166+
167+
fn list(
168+
&self, primary_namespace: &str, secondary_namespace: &str,
169+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> {
170+
let res = self.0.list(primary_namespace, secondary_namespace);
171+
172+
Box::pin(async move { res })
173+
}
174+
}
175+
124176
/// A trait that provides a key-value store interface for persisting data.
125177
pub trait KVStore {
126178
/// Returns the data stored for the given `primary_namespace`, `secondary_namespace`, and

0 commit comments

Comments
 (0)