Skip to content

Commit 077fb77

Browse files
committed
Use async kv store with OutputSweeper
1 parent 6cb779c commit 077fb77

File tree

3 files changed

+376
-113
lines changed

3 files changed

+376
-113
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ use lightning::sign::EntropySource;
4141
use lightning::sign::OutputSpender;
4242
use lightning::util::logger::Logger;
4343
#[cfg(feature = "std")]
44-
use lightning::util::persist::KVStoreSync;
45-
use lightning::util::persist::PersisterSync;
4644
use lightning::util::persist::{KVStore, Persister};
45+
use lightning::util::persist::{KVStoreSync, PersisterSync};
4746
use lightning::util::sweep::OutputSweeper;
4847
#[cfg(feature = "std")]
4948
use lightning::util::sweep::OutputSweeperSync;
49+
use lightning::util::sweep::OutputSweeperSyncKVStore;
5050
#[cfg(feature = "std")]
5151
use lightning::util::wakers::Sleeper;
5252
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -699,7 +699,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
699699
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
700700
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
701701
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
702-
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O>>;
702+
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
703703
///
704704
/// # struct Node<
705705
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
@@ -843,7 +843,7 @@ where
843843
LM::Target: ALiquidityManager,
844844
O::Target: 'static + OutputSpender,
845845
D::Target: 'static + ChangeDestinationSource,
846-
K::Target: 'static + KVStoreSync,
846+
K::Target: 'static + KVStore,
847847
{
848848
let mut should_break = false;
849849
let async_event_handler = |event| {
@@ -1021,7 +1021,7 @@ pub async fn process_events_async<
10211021
D: 'static + Deref,
10221022
O: 'static + Deref,
10231023
K: 'static + Deref,
1024-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
1024+
OS: 'static + Deref<Target = OutputSweeperSyncKVStore<T, D, F, CF, K, L, O>>,
10251025
S: 'static + Deref<Target = SC> + Send + Sync,
10261026
SC: for<'b> WriteableScore<'b>,
10271027
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -1051,6 +1051,7 @@ where
10511051
K::Target: 'static + KVStoreSync,
10521052
{
10531053
let persister = PersisterSyncWrapper::<'static, PS, CM, L, S>::new(persister);
1054+
let sweeper = sweeper.map(|s| s.sweeper_async());
10541055
process_events_full_async(
10551056
persister,
10561057
event_handler,
@@ -1303,6 +1304,7 @@ impl Drop for BackgroundProcessor {
13031304
#[cfg(all(feature = "std", test))]
13041305
mod tests {
13051306
use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
1307+
use crate::PersisterSyncWrapper;
13061308
use bitcoin::constants::{genesis_block, ChainHash};
13071309
use bitcoin::hashes::Hash;
13081310
use bitcoin::locktime::absolute::LockTime;
@@ -2256,11 +2258,12 @@ mod tests {
22562258
open_channel!(nodes[0], nodes[1], 100000);
22572259

22582260
let data_dir = nodes[0].kv_store.get_data_dir();
2259-
let persister = Arc::new(
2261+
let persister_sync = Arc::new(
22602262
PersisterSync::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
22612263
);
2264+
let persister = PersisterSyncWrapper::new(persister_sync);
22622265

2263-
let bp_future = super::process_events_async(
2266+
let bp_future = super::process_events_full_async(
22642267
persister,
22652268
|_: _| async { Ok(()) },
22662269
Arc::clone(&nodes[0].chain_monitor),
@@ -2767,11 +2770,12 @@ mod tests {
27672770
let (_, nodes) =
27682771
create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
27692772
let data_dir = nodes[0].kv_store.get_data_dir();
2770-
let persister =
2773+
let persister_sync =
27712774
Arc::new(PersisterSync::new(data_dir).with_graph_persistence_notifier(sender));
2775+
let persister = PersisterSyncWrapper::new(persister_sync);
27722776

27732777
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2774-
let bp_future = super::process_events_async(
2778+
let bp_future = super::process_events_full_async(
27752779
persister,
27762780
|_: _| async { Ok(()) },
27772781
Arc::clone(&nodes[0].chain_monitor),
@@ -2984,11 +2988,12 @@ mod tests {
29842988

29852989
let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
29862990
let data_dir = nodes[0].kv_store.get_data_dir();
2987-
let persister = Arc::new(PersisterSync::new(data_dir));
2991+
let persister_sync = Arc::new(PersisterSync::new(data_dir));
2992+
let persister = PersisterSyncWrapper::new(persister_sync);
29882993

29892994
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
29902995

2991-
let bp_future = super::process_events_async(
2996+
let bp_future = super::process_events_full_async(
29922997
persister,
29932998
event_handler,
29942999
Arc::clone(&nodes[0].chain_monitor),

lightning/src/util/persist.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,58 @@ pub trait KVStoreSync {
121121
) -> Result<Vec<String>, io::Error>;
122122
}
123123

124+
/// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait.
125+
pub struct KVStoreSyncWrapper<K: Deref>(pub K)
126+
where
127+
K::Target: KVStoreSync;
128+
129+
impl<K: Deref> Deref for KVStoreSyncWrapper<K>
130+
where
131+
K::Target: KVStoreSync,
132+
{
133+
type Target = Self;
134+
fn deref(&self) -> &Self {
135+
self
136+
}
137+
}
138+
139+
impl<K: Deref> KVStore for KVStoreSyncWrapper<K>
140+
where
141+
K::Target: KVStoreSync,
142+
{
143+
fn read(
144+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
145+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> {
146+
let res = self.0.read(primary_namespace, secondary_namespace, key);
147+
148+
Box::pin(async move { res })
149+
}
150+
151+
fn write(
152+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
153+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
154+
let res = self.0.write(primary_namespace, secondary_namespace, key, buf);
155+
156+
Box::pin(async move { res })
157+
}
158+
159+
fn remove(
160+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
161+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
162+
let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy);
163+
164+
Box::pin(async move { res })
165+
}
166+
167+
fn list(
168+
&self, primary_namespace: &str, secondary_namespace: &str,
169+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> {
170+
let res = self.0.list(primary_namespace, secondary_namespace);
171+
172+
Box::pin(async move { res })
173+
}
174+
}
175+
124176
/// A trait that provides a key-value store interface for persisting data.
125177
pub trait KVStore {
126178
/// Returns the data stored for the given `primary_namespace`, `secondary_namespace`, and

0 commit comments

Comments
 (0)