Skip to content

Add a value-based lock in the CryptoStores #2049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/matrix-sdk-crypto/src/store/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use ruma::{IdParseError, OwnedDeviceId, OwnedUserId};
use serde_json::Error as SerdeError;
use thiserror::Error;

use super::locks::LockStoreError;
use crate::olm::SessionCreationError;

/// A `CryptoStore` specific result type.
Expand Down Expand Up @@ -78,6 +79,10 @@ pub enum CryptoStoreError {
/// A problem with the underlying database backend
#[error(transparent)]
Backend(Box<dyn std::error::Error + Send + Sync>),

/// An error due to locking.
#[error(transparent)]
Lock(#[from] LockStoreError),
}

impl CryptoStoreError {
Expand Down
84 changes: 84 additions & 0 deletions crates/matrix-sdk-crypto/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,90 @@ macro_rules! cryptostore_integration_tests {
let loaded_2 = store.get_custom_value("B").await.unwrap();
assert_eq!(None, loaded_2);
}

#[async_test]
async fn test_custom_value_insert_if_missing_remove() {
let (_account, store) = get_loaded_store("custom_value_insert_if_missing").await;

let val = "Hello".as_bytes().to_vec();

// Removing while the value wasn't present doesn't remove anything.
let removed = store.remove_custom_value("A").await.unwrap();
assert!(!removed);

// Inserting while the value wasn't present does something.
let inserted = store.insert_custom_value_if_missing("A", val.clone()).await.unwrap();
assert!(inserted);

let loaded = store.get_custom_value("A").await.unwrap();
assert_eq!(loaded, Some(val.clone()));

// Inserting while the value was present does nothing.
let inserted = store.insert_custom_value_if_missing("A", val.clone()).await.unwrap();
assert!(!inserted);

// …even if we try hard.
let inserted = store.insert_custom_value_if_missing("A", val.clone()).await.unwrap();
assert!(!inserted);

// Removing while the value was present does something.
let removed = store.remove_custom_value("A").await.unwrap();
assert!(removed);

let loaded = store.get_custom_value("A").await.unwrap();
assert_eq!(loaded, None);

// …only the first time.
let removed = store.remove_custom_value("A").await.unwrap();
assert!(!removed);
}

#[async_test]
async fn test_custom_value_multiple_stores() {
// Hey, have you heard about my second, mimic store?
let val1 = "Hello".as_bytes().to_vec();
let (_account, store1) = get_loaded_store("custom_value_multiple_stores").await;
let (_account, store2) = get_loaded_store("custom_value_multiple_stores").await;

// Store1 inserts...
let inserted = store1.insert_custom_value_if_missing("A", val1.clone()).await.unwrap();
assert!(inserted);

// Store2 can't!
let val2 = "Goodbye".as_bytes().to_vec();
let inserted = store2.insert_custom_value_if_missing("A", val2.clone()).await.unwrap();
assert!(!inserted);

// But when reading, both stores must agree.
let loaded = store1.get_custom_value("A").await.unwrap();
assert_eq!(loaded, Some(val1.clone()));

let loaded = store2.get_custom_value("A").await.unwrap();
assert_eq!(loaded, Some(val1.clone()));

// Clean up.
let removed = store1.remove_custom_value("A").await.unwrap();
assert!(removed);

let loaded = store1.get_custom_value("A").await.unwrap();
assert_eq!(loaded, None);

let loaded = store2.get_custom_value("A").await.unwrap();
assert_eq!(loaded, None);

// Now store2 can write, store1 can't, they agree on reading etc.
let inserted = store2.insert_custom_value_if_missing("A", val2.clone()).await.unwrap();
assert!(inserted);

let inserted = store1.insert_custom_value_if_missing("A", val1.clone()).await.unwrap();
assert!(!inserted);

let loaded = store1.get_custom_value("A").await.unwrap();
assert_eq!(loaded, Some(val2.clone()));

let loaded = store2.get_custom_value("A").await.unwrap();
assert_eq!(loaded, Some(val2.clone()));
}
}
};
}
178 changes: 178 additions & 0 deletions crates/matrix-sdk-crypto/src/store/locks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Collection of small helpers that implement store-based locks.
//!
//! Those locks are implemented as one value in the key-value crypto store, that
//! exists if and only if the lock has been taken. For this to work correctly,
//! we rely on multiple assumptions:
//!
//! - the store must allow concurrent reads and writes from multiple processes.
//! For instance, for
//! sqlite, this means that it is running in [WAL](https://www.sqlite.org/wal.html) mode.
//! - the two operations used in the store implementation,
//! `insert_custom_value_if_missing` and
//! `remove_custom_value`, must be atomic / implemented in a transaction.

use std::{sync::Arc, time::Duration};

use tokio::time::sleep;

use super::DynCryptoStore;
use crate::CryptoStoreError;

/// A store-based lock for the `CryptoStore`.
#[derive(Debug, Clone)]
pub struct CryptoStoreLock {
/// The store we're using to lock.
store: Arc<DynCryptoStore>,

/// The key used in the key/value mapping for the lock entry.
lock_key: String,

/// A specific value to identify the lock's holder.
lock_holder: String,

/// Backoff time, in milliseconds.
backoff: u32,

/// Maximum backoff time, between two attempts.
max_backoff: u32,
}

impl CryptoStoreLock {
/// Initial backoff, in milliseconds. This is the time we wait the first
/// time, if taking the lock initially failed.
const INITIAL_BACKOFF_MS: u32 = 10;

/// Maximal backoff, in milliseconds. This is the maximum amount of time
/// we'll wait for the lock, *between two attempts*.
const MAX_BACKOFF_MS: u32 = 1000;

/// Create a new store-based lock implemented as a value in the
/// crypto-store.
///
/// # Parameters
///
/// - `lock_key`: key in the key-value store to store the lock's state.
/// - `lock_holder`: identify the lock's holder with this given value.
/// - `max_backoff`: maximum time (in milliseconds) that should be waited
/// for, between two
/// attempts. When that time is reached a second time, the lock will stop
/// attempting to get the lock and will return a timeout error upon
/// locking. If not provided, will wait for [`Self::MAX_BACKOFF_MS`].
pub fn new(
store: Arc<DynCryptoStore>,
lock_key: String,
lock_holder: String,
max_backoff: Option<u32>,
) -> Self {
let max_backoff = max_backoff.unwrap_or(Self::MAX_BACKOFF_MS);
Self { store, lock_key, lock_holder, max_backoff, backoff: Self::INITIAL_BACKOFF_MS }
}

/// Attempt to take the lock, with exponential backoff if the lock has
/// already been taken before.
pub async fn lock(&mut self) -> Result<(), CryptoStoreError> {
loop {
let inserted = self
.store
.insert_custom_value_if_missing(
&self.lock_key,
self.lock_holder.as_bytes().to_vec(),
)
.await?;

if inserted {
// Reset backoff before returning, for the next attempt to lock.
self.backoff = Self::INITIAL_BACKOFF_MS;
return Ok(());
}

// Double-check that we were not interrupted last time we tried to take the
// lock, and forgot to release it; in that case, we *still* hold it.
let previous = self.store.get_custom_value(&self.lock_key).await?;
if previous.as_deref() == Some(self.lock_holder.as_bytes()) {
// At this point, the only possible value for backoff is the initial one, but
// better be safe than sorry.
tracing::warn!(
"Crypto-store lock {} was already taken by {}; let's pretend we just acquired it.",
self.lock_key,
self.lock_holder
);
self.backoff = Self::INITIAL_BACKOFF_MS;
return Ok(());
}

// Exponential backoff! Multiply by 2 the time we've waited before, cap it to
// max_backoff.
let wait = self.backoff;

// If we've set the sentinel value before, that means this wait would be longer
// than the max backoff, so abort.
if wait == u32::MAX {
// We've reached the maximum backoff, abandon.
return Err(LockStoreError::LockTimeout.into());
}

self.backoff = self.backoff.saturating_mul(2);
if self.backoff >= self.max_backoff {
// Set the sentinel value that indicates that we should timeout, were we to wait
// more.
self.backoff = u32::MAX;
}

sleep(Duration::from_millis(wait.into())).await;
}
}

/// Release the lock taken previously with [`lock()`].
///
/// Will return an error if the lock wasn't taken.
pub async fn unlock(&mut self) -> Result<(), CryptoStoreError> {
let read = self
.store
.get_custom_value(&self.lock_key)
.await?
.ok_or(CryptoStoreError::from(LockStoreError::MissingLockValue))?;

if read != self.lock_holder.as_bytes() {
return Err(LockStoreError::IncorrectLockValue.into());
}

let removed = self.store.remove_custom_value(&self.lock_key).await?;
if removed {
Ok(())
} else {
Err(LockStoreError::MissingLockValue.into())
}
}
}

/// Error related to the locking API of the crypto store.
#[derive(Debug, thiserror::Error)]
pub enum LockStoreError {
/// A lock value was to be removed, but it didn't contain the expected lock
/// value.
#[error("a lock value was to be removed, but it didn't contain the expected lock value")]
IncorrectLockValue,

/// A lock value was to be removed, but it was missing in the database.
#[error("a lock value was to be removed, but it was missing in the database")]
MissingLockValue,

/// Spent too long waiting for a database lock.
#[error("a lock timed out")]
LockTimeout,
}
10 changes: 10 additions & 0 deletions crates/matrix-sdk-crypto/src/store/memorystore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,16 @@ impl CryptoStore for MemoryStore {
warn!("Method not implemented");
Ok(())
}

async fn insert_custom_value_if_missing(&self, _key: &str, _new: Vec<u8>) -> Result<bool> {
warn!("Method insert_custom_value_if_missing not implemented");
Ok(false)
}

async fn remove_custom_value(&self, _key: &str) -> Result<bool> {
warn!("Method remove_custom_value not implemented");
Ok(false)
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions crates/matrix-sdk-crypto/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use crate::{

pub mod caches;
mod error;
pub mod locks;
mod memorystore;
mod traits;

Expand Down
Loading