Skip to content

feat: implement time lease based locks for the CryptoStore #2140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 29, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 53 additions & 46 deletions crates/matrix-sdk-crypto/src/store/locks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,57 +177,64 @@ impl CryptoStoreLock {

let mut renew_task = self.renew_task.lock().await;

// Only spawn the task if it's missing or done.
let spawn = renew_task.as_ref().map_or(true, |join_handle| join_handle.is_finished());

if spawn {
*renew_task = Some(matrix_sdk_common::executor::spawn(async move {
loop {
{
// First, check if there are still users of this lock.
//
// This is not racy, because:
// - the `locking_attempt` mutex makes sure we don't have unexpected
// interactions with the non-atomic sequence above in `try_lock_once`
// (check > 0, then add 1).
// - other entities holding onto the `num_holders` atomic will only
// decrease it over time.
let _guard = this.locking_attempt.lock().await;

// If there are no more users, we can quit.
if this.num_holders.load(atomic::Ordering::SeqCst) == 0 {
tracing::info!("exiting the lease extension loop");

// Cancel the lease with another 0ms lease.
// If we don't get the lock, that's (weird but) fine.
let _ = this
.store
.try_take_leased_lock(0, &this.lock_key, &this.lock_holder)
.await;

// Exit the loop.
break;
}
}
// Cancel the previous task, if any. That's safe to do, because:
// - either the task was done,
// - or it was still running, but taking a lock in the db has to be an atomic
// operation
// running in a transaction.

drop(renew_task.take());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful, it doesn't abort the task!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, TIL. I've added an abort() that's guarded by #[cfg(not(target_arch = "wasm32"))]; this code wouldn't work on wasm32 anyways because it's using tokio::sleep.


// Restart a new one.
*renew_task = Some(matrix_sdk_common::executor::spawn(async move {
loop {
{
// First, check if there are still users of this lock.
//
// This is not racy, because:
// - the `locking_attempt` mutex makes sure we don't have unexpected
// interactions with the non-atomic sequence above in `try_lock_once`
// (check > 0, then add 1).
// - other entities holding onto the `num_holders` atomic will only
// decrease it over time.

let _guard = this.locking_attempt.lock().await;

// If there are no more users, we can quit.
if this.num_holders.load(atomic::Ordering::SeqCst) == 0 {
tracing::info!("exiting the lease extension loop");

// Cancel the lease with another 0ms lease.
// If we don't get the lock, that's (weird but) fine.
let _ = this
.store
.try_take_leased_lock(0, &this.lock_key, &this.lock_holder)
.await;

sleep(Duration::from_millis(Self::EXTEND_LEASE_EVERY_MS)).await;

if let Err(err) = this
.store
.try_take_leased_lock(
Self::LEASE_DURATION_MS,
&this.lock_key,
&this.lock_holder,
)
.await
{
tracing::error!("error when extending lock lease: {err:#}");
// Exit the loop.
break;
}
}
}));
}

sleep(Duration::from_millis(Self::EXTEND_LEASE_EVERY_MS)).await;

if let Err(err) = this
.store
.try_take_leased_lock(
Self::LEASE_DURATION_MS,
&this.lock_key,
&this.lock_holder,
)
.await
{
tracing::error!("error when extending lock lease: {err:#}");
// Exit the loop.
break;
}
}
}));

self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);

let guard = CryptoStoreLockGuard { num_holders: self.num_holders.clone() };
Ok(Some(guard))
Expand Down