-
Notifications
You must be signed in to change notification settings - Fork 417
Async Persister
trait and async OutputSweeper
persistence
#3905
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
👋 Thanks for assigning @tnull as a reviewer! |
1b95d30
to
21dc34c
Compare
3fb7d6b
to
1847e8d
Compare
1f59bbe
to
723a5a6
Compare
bc9c29a
to
90ab1ba
Compare
fn persist_state<'a>( | ||
&self, sweeper_state: &SweeperState, | ||
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'a + Send>> { | ||
let encoded = &sweeper_state.encode(); | ||
|
||
self.kv_store.write( | ||
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, | ||
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, | ||
OUTPUT_SWEEPER_PERSISTENCE_KEY, | ||
encoded, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The encoded
variable is captured by reference in the returned future, but it's a local variable that will be dropped when the function returns. This creates a potential use-after-free issue. Consider moving ownership of encoded
into the future instead:
fn persist_state<'a>(
&self, sweeper_state: &SweeperState,
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'a + Send>> {
let encoded = sweeper_state.encode();
self.kv_store.write(
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
OUTPUT_SWEEPER_PERSISTENCE_KEY,
&encoded,
)
}
This ensures the data remains valid for the lifetime of the future.
fn persist_state<'a>( | |
&self, sweeper_state: &SweeperState, | |
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'a + Send>> { | |
let encoded = &sweeper_state.encode(); | |
self.kv_store.write( | |
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, | |
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, | |
OUTPUT_SWEEPER_PERSISTENCE_KEY, | |
encoded, | |
) | |
fn persist_state<'a>( | |
&self, sweeper_state: &SweeperState, | |
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'a + Send>> { | |
let encoded = sweeper_state.encode(); | |
self.kv_store.write( | |
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, | |
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, | |
OUTPUT_SWEEPER_PERSISTENCE_KEY, | |
&encoded, | |
) | |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this real?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so as the compiler would likely optimize that away, given that encoded
will be an owned value (Vec
returned by encode()
). Still, the change that it suggests looks cleaner.
In general it will be super confusing that we encode
at the time of creating the future, but would only actually persist once we dropped the lock. Starting from now we'll need to be super cautious about the side-effects of interleaving persist calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that an async kv store store encodes the data and stores the write action in a queue at the moment the future is created. Things should still happen in the original order.
Can you show a specific scenario where we have to be super cautious even if we have that queue?
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3905 +/- ##
==========================================
+ Coverage 88.83% 89.33% +0.49%
==========================================
Files 166 166
Lines 119259 126008 +6749
Branches 119259 126008 +6749
==========================================
+ Hits 105940 112565 +6625
- Misses 10992 11089 +97
- Partials 2327 2354 +27 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
8602b77
to
077fb77
Compare
@@ -121,6 +121,58 @@ pub trait KVStoreSync { | |||
) -> Result<Vec<String>, io::Error>; | |||
} | |||
|
|||
/// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait. | |||
pub struct KVStoreSyncWrapper<K: Deref>(pub K) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed to make this public otherwise lightning-background-processor doesn't compile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To expose this only to the bg processor, we'd need to either move this to a separate file and symlink that, or add a feature that is only enabled in other ldk crates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed yesterday, we'll also need access to this type in LDK Node as it will be part of the OutputSweeper
type signature. Not sure if it then would preferable to have the user implement this wrapper. If we leave it here, it might make sense to split out all these wrappers into a separate util.rs
or helpers.rs
file, so they don't clutter up the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the new outputsweeper wrapper type, KVStoreSyncWrapper
isn't needed anymore in ldk-node. I updated lightningdevkit/ldk-node@main...joostjager:ldk-node:upgrade-to-async-kvstore and removed the imports that were remaining.
} | ||
|
||
/// Returns a reference to the underlying [`OutputSweeper`]. | ||
pub fn sweeper_async( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed to make this public otherwise lightning-background-processor doesn't compile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't you implement deref to solve this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then it is still publicly accessible?
Also provide a wrapper to allow a sync kvstore to be used.
077fb77
to
32e17b3
Compare
Persister
traitPersister
trait and async OutputSweeper
persistence
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a first pass. Changes look mostly good, although I'm not the biggest fan of the process_events_full_async
variant as well as the SyncOutputSweeperSyncKVStoreSync
in the last commit. I hope we can avoid them?
For the former, it would be useful to wait for #3688 (given it's close), and then the user could use the builder pattern to either configure an async or sync KVStore
.
@@ -319,7 +332,7 @@ macro_rules! define_run_body { | |||
$peer_manager: ident, $gossip_sync: ident, | |||
$process_sweeper: expr, | |||
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, | |||
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, | |||
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async: tt, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we make that name a bit more expressive? Maybe async_persist
for now?
/// # struct Store {} | ||
/// # impl lightning::util::persist::KVStoreSync for Store { | ||
/// # struct StoreSync {} | ||
/// # impl lightning::util::persist::KVStoreSync for StoreSync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should this renaming happen in the first commit renaming KVStore
to KVStoreSync
?
@@ -769,7 +789,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; | |||
#[cfg_attr(feature = "std", doc = " handle.await.unwrap()")] | |||
/// # } | |||
///``` | |||
pub async fn process_events_async< | |||
pub async fn process_events_full_async< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dislike to solve this by just doubling the API surface here. IMO, we should finally move forward with #3688 first, and then utilize the builder pattern to allow the user to plugin a sync or async KVStore
variant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should do #3688 in the form that it currently has. My opinion is that it is too much code for what it gets us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, we may need to conclude on that debate soon then.
Note that a) we had already decided to go this way on #3612 b) have a contributor that already did the work and c) this PR is a pretty good example why we'd want the builder pattern over just adding more constructors or optional fields on them.
But probably best to discuss elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also temporary, because we want to move over to full async eventually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
impl<'a, PS, CM, L, S> PersisterSyncWrapper<'a, PS, CM, L, S> { | ||
/// Constructs a new [`PersisterSyncWrapper`] from the given sync persister. | ||
pub fn new(inner: PS) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this constructor/pub/docs, etc., if we don't intend to to make the wrapper pub
as a whole.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd think comments are not only useful for documentation, but also for developers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this comment says exactly what the method definition says, provides no additional information or context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, you meant for this one specifically. I thought in general for private types/methods. Agreed
fn persist_manager( | ||
&self, channel_manager: &CM, | ||
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { | ||
self.write( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance to dedup/DRY up these default implementations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can. Using the shared constants is at least some form of DRY. Open to suggestions.
@@ -121,6 +121,58 @@ pub trait KVStoreSync { | |||
) -> Result<Vec<String>, io::Error>; | |||
} | |||
|
|||
/// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait. | |||
pub struct KVStoreSyncWrapper<K: Deref>(pub K) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed yesterday, we'll also need access to this type in LDK Node as it will be part of the OutputSweeper
type signature. Not sure if it then would preferable to have the user implement this wrapper. If we leave it here, it might make sense to split out all these wrappers into a separate util.rs
or helpers.rs
file, so they don't clutter up the codebase.
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
})?; | ||
state_lock.dirty = false; | ||
state_lock.dirty = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I think we should really unset the dirty flag once we're sure we persisted, not before. I.e., rather than unsetting and re-setting it, just set it to false in the success case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do it after persisting, another thread may also persist because it thinks the data is still dirty? What I've implemented is the outcome of a chat with @TheBlueMatt about it. I agree it isn't pretty, so open to other suggestions.
@@ -922,6 +930,173 @@ where | |||
} | |||
} | |||
|
|||
/// A wrapper around [`OutputSweeper`] to be used with a sync kv store. | |||
pub struct OutputSweeperSyncKVStore< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need yet another wrapper type for this? Do we then also need an OutputSweeperSyncKVStoreSync
to complete the matrix?
I think this is really messy. If this is only added to avoid exposing KVStoreSyncWrapper
, I'd much rather go this way or implement the wrapper in LDK Node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OutputSweeperSyncKVStoreSync
is what OutputSweeperSync
is, so that is already present.
Don't like this either, but we wanted to guarantee that users can't misuse the wrappers, and this is the result of that. If we have consensus on another approach, I am fine with that too.
} | ||
|
||
/// Returns a reference to the underlying [`OutputSweeper`]. | ||
pub fn sweeper_async( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't you implement deref to solve this?
fn persist_state<'a>( | ||
&self, sweeper_state: &SweeperState, | ||
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'a + Send>> { | ||
let encoded = &sweeper_state.encode(); | ||
|
||
self.kv_store.write( | ||
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, | ||
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, | ||
OUTPUT_SWEEPER_PERSISTENCE_KEY, | ||
encoded, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so as the compiler would likely optimize that away, given that encoded
will be an owned value (Vec
returned by encode()
). Still, the change that it suggests looks cleaner.
In general it will be super confusing that we encode
at the time of creating the future, but would only actually persist once we dropped the lock. Starting from now we'll need to be super cautious about the side-effects of interleaving persist calls.
👋 The first review has been submitted! Do you think this PR is ready for a second reviewer? If so, click here to assign a second reviewer. |
Stripped down version of #3778. It allows background persistence to be async, but channel monitor persistence remains sync. This means that for the time being, users wanting async background persistence would be required to implement both the sync and the async
KVStore
trait. This model is available throughprocess_events_full_async
.process_events_async
still takes a synchronous kv store to remain backwards compatible.Usage in ldk-node: lightningdevkit/ldk-node@main...joostjager:ldk-node:upgrade-to-async-kvstore