Skip to content

Commit 76ed351

Browse files
authored
Add a value-based lock in the CryptoStores (#2049)
This implements a value-based lock in the crypto stores. The intent is to use that for multiple processes to be able to make writes into the store concurrently, while still cooperating on who does them. In particular, we need this for #1928, since we may have up to two different processes trying to write into the crypto store at the same time. ## New methods in the `CryptoStore` trait The idea is to introduce two new methods touching **custom values** in the crypto store: - one to atomically insert a value, only if it was missing (so, not following the semantics of `upsert` used in the `set_custom_value`) - one to atomically remove a custom value Those two operations match the semantics we want: - take the lock only if it ain't taken already == insert an entry only if it was missing - release the lock = remove the entry By looking at the number of lines affected by the query, we can infer whether the insert/remove happened or not, that is, if we managed to take the lock or not. ## High-level APIs I've also added an high-level API, `CryptoStoreLock`, that helps managing such a lock, and adds some niceties on top of that: - exponential backoff to retry attempts at acquiring the lock, when it was already taken - attempt to gracefully recover when the lock has been taken by an app that's been killed by the environment - full configuration of the key / value / backoff parameters While it'd be nice to have something like a `CryptoStoreLockGuard`, it's hard to implement without being racy, because of the `async` statements that would happen in the `Drop` method (and async drop isn't stable yet). ## Test program There's also a test program in which I shamelessly show my rudimentary unix skills; I've put it in the `labs/` directory but this could as well be a large integration test. A parent program initially fills a custom crypto store, then creates a `pipe()` for 1-way communication with a child created with `fork()`; then the parent sends commands to the child. These commands consist in reading and writing into the crypto store, using a lock. And while the child attempts to perform these operations, the parent tries hard to get the lock at the same time. This helps figuring out a few issues and making sure that cross-process locking would work as intended.
1 parent 1660c71 commit 76ed351

File tree

11 files changed

+694
-1
lines changed

11 files changed

+694
-1
lines changed

Cargo.lock

Lines changed: 46 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/matrix-sdk-crypto/src/store/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use ruma::{IdParseError, OwnedDeviceId, OwnedUserId};
1818
use serde_json::Error as SerdeError;
1919
use thiserror::Error;
2020

21+
use super::locks::LockStoreError;
2122
use crate::olm::SessionCreationError;
2223

2324
/// A `CryptoStore` specific result type.
@@ -78,6 +79,10 @@ pub enum CryptoStoreError {
7879
/// A problem with the underlying database backend
7980
#[error(transparent)]
8081
Backend(Box<dyn std::error::Error + Send + Sync>),
82+
83+
/// An error due to locking.
84+
#[error(transparent)]
85+
Lock(#[from] LockStoreError),
8186
}
8287

8388
impl CryptoStoreError {

crates/matrix-sdk-crypto/src/store/integration_tests.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,90 @@ macro_rules! cryptostore_integration_tests {
768768
let loaded_2 = store.get_custom_value("B").await.unwrap();
769769
assert_eq!(None, loaded_2);
770770
}
771+
772+
#[async_test]
773+
async fn test_custom_value_insert_if_missing_remove() {
774+
let (_account, store) = get_loaded_store("custom_value_insert_if_missing").await;
775+
776+
let val = "Hello".as_bytes().to_vec();
777+
778+
// Removing while the value wasn't present doesn't remove anything.
779+
let removed = store.remove_custom_value("A").await.unwrap();
780+
assert!(!removed);
781+
782+
// Inserting while the value wasn't present does something.
783+
let inserted = store.insert_custom_value_if_missing("A", val.clone()).await.unwrap();
784+
assert!(inserted);
785+
786+
let loaded = store.get_custom_value("A").await.unwrap();
787+
assert_eq!(loaded, Some(val.clone()));
788+
789+
// Inserting while the value was present does nothing.
790+
let inserted = store.insert_custom_value_if_missing("A", val.clone()).await.unwrap();
791+
assert!(!inserted);
792+
793+
// …even if we try hard.
794+
let inserted = store.insert_custom_value_if_missing("A", val.clone()).await.unwrap();
795+
assert!(!inserted);
796+
797+
// Removing while the value was present does something.
798+
let removed = store.remove_custom_value("A").await.unwrap();
799+
assert!(removed);
800+
801+
let loaded = store.get_custom_value("A").await.unwrap();
802+
assert_eq!(loaded, None);
803+
804+
// …only the first time.
805+
let removed = store.remove_custom_value("A").await.unwrap();
806+
assert!(!removed);
807+
}
808+
809+
#[async_test]
810+
async fn test_custom_value_multiple_stores() {
811+
// Hey, have you heard about my second, mimic store?
812+
let val1 = "Hello".as_bytes().to_vec();
813+
let (_account, store1) = get_loaded_store("custom_value_multiple_stores").await;
814+
let (_account, store2) = get_loaded_store("custom_value_multiple_stores").await;
815+
816+
// Store1 inserts...
817+
let inserted = store1.insert_custom_value_if_missing("A", val1.clone()).await.unwrap();
818+
assert!(inserted);
819+
820+
// Store2 can't!
821+
let val2 = "Goodbye".as_bytes().to_vec();
822+
let inserted = store2.insert_custom_value_if_missing("A", val2.clone()).await.unwrap();
823+
assert!(!inserted);
824+
825+
// But when reading, both stores must agree.
826+
let loaded = store1.get_custom_value("A").await.unwrap();
827+
assert_eq!(loaded, Some(val1.clone()));
828+
829+
let loaded = store2.get_custom_value("A").await.unwrap();
830+
assert_eq!(loaded, Some(val1.clone()));
831+
832+
// Clean up.
833+
let removed = store1.remove_custom_value("A").await.unwrap();
834+
assert!(removed);
835+
836+
let loaded = store1.get_custom_value("A").await.unwrap();
837+
assert_eq!(loaded, None);
838+
839+
let loaded = store2.get_custom_value("A").await.unwrap();
840+
assert_eq!(loaded, None);
841+
842+
// Now store2 can write, store1 can't, they agree on reading etc.
843+
let inserted = store2.insert_custom_value_if_missing("A", val2.clone()).await.unwrap();
844+
assert!(inserted);
845+
846+
let inserted = store1.insert_custom_value_if_missing("A", val1.clone()).await.unwrap();
847+
assert!(!inserted);
848+
849+
let loaded = store1.get_custom_value("A").await.unwrap();
850+
assert_eq!(loaded, Some(val2.clone()));
851+
852+
let loaded = store2.get_custom_value("A").await.unwrap();
853+
assert_eq!(loaded, Some(val2.clone()));
854+
}
771855
}
772856
};
773857
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// Copyright 2023 The Matrix.org Foundation C.I.C.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Collection of small helpers that implement store-based locks.
16+
//!
17+
//! Those locks are implemented as one value in the key-value crypto store, that
18+
//! exists if and only if the lock has been taken. For this to work correctly,
19+
//! we rely on multiple assumptions:
20+
//!
21+
//! - the store must allow concurrent reads and writes from multiple processes.
22+
//! For instance, for
23+
//! sqlite, this means that it is running in [WAL](https://www.sqlite.org/wal.html) mode.
24+
//! - the two operations used in the store implementation,
25+
//! `insert_custom_value_if_missing` and
26+
//! `remove_custom_value`, must be atomic / implemented in a transaction.
27+
28+
use std::{sync::Arc, time::Duration};
29+
30+
use tokio::time::sleep;
31+
32+
use super::DynCryptoStore;
33+
use crate::CryptoStoreError;
34+
35+
/// Small state machine to handle wait times.
36+
#[derive(Clone, Debug)]
37+
enum WaitingTime {
38+
/// Some time to wait, in milliseconds.
39+
Some(u32),
40+
/// Stop waiting when seeing this value.
41+
Stop,
42+
}
43+
44+
/// A store-based lock for the `CryptoStore`.
45+
#[derive(Clone, Debug)]
46+
pub struct CryptoStoreLock {
47+
/// The store we're using to lock.
48+
store: Arc<DynCryptoStore>,
49+
50+
/// The key used in the key/value mapping for the lock entry.
51+
lock_key: String,
52+
53+
/// A specific value to identify the lock's holder.
54+
lock_holder: String,
55+
56+
/// Backoff time, in milliseconds.
57+
backoff: WaitingTime,
58+
59+
/// Maximum backoff time, between two attempts.
60+
max_backoff: u32,
61+
}
62+
63+
impl CryptoStoreLock {
64+
/// Initial backoff, in milliseconds. This is the time we wait the first
65+
/// time, if taking the lock initially failed.
66+
const INITIAL_BACKOFF_MS: u32 = 10;
67+
68+
/// Maximal backoff, in milliseconds. This is the maximum amount of time
69+
/// we'll wait for the lock, *between two attempts*.
70+
const MAX_BACKOFF_MS: u32 = 1000;
71+
72+
/// Create a new store-based lock implemented as a value in the
73+
/// crypto-store.
74+
///
75+
/// # Parameters
76+
///
77+
/// - `lock_key`: key in the key-value store to store the lock's state.
78+
/// - `lock_holder`: identify the lock's holder with this given value.
79+
/// - `max_backoff`: maximum time (in milliseconds) that should be waited
80+
/// for, between two
81+
/// attempts. When that time is reached a second time, the lock will stop
82+
/// attempting to get the lock and will return a timeout error upon
83+
/// locking. If not provided, will wait for `Self::MAX_BACKOFF_MS`.
84+
pub fn new(
85+
store: Arc<DynCryptoStore>,
86+
lock_key: String,
87+
lock_holder: String,
88+
max_backoff: Option<u32>,
89+
) -> Self {
90+
let max_backoff = max_backoff.unwrap_or(Self::MAX_BACKOFF_MS);
91+
Self {
92+
store,
93+
lock_key,
94+
lock_holder,
95+
max_backoff,
96+
backoff: WaitingTime::Some(Self::INITIAL_BACKOFF_MS),
97+
}
98+
}
99+
100+
/// Attempt to take the lock, with exponential backoff if the lock has
101+
/// already been taken before.
102+
pub async fn lock(&mut self) -> Result<(), CryptoStoreError> {
103+
loop {
104+
let inserted = self
105+
.store
106+
.insert_custom_value_if_missing(
107+
&self.lock_key,
108+
self.lock_holder.as_bytes().to_vec(),
109+
)
110+
.await?;
111+
112+
if inserted {
113+
// Reset backoff before returning, for the next attempt to lock.
114+
self.backoff = WaitingTime::Some(Self::INITIAL_BACKOFF_MS);
115+
return Ok(());
116+
}
117+
118+
// Double-check that we were not interrupted last time we tried to take the
119+
// lock, and forgot to release it; in that case, we *still* hold it.
120+
let previous = self.store.get_custom_value(&self.lock_key).await?;
121+
if previous.as_deref() == Some(self.lock_holder.as_bytes()) {
122+
// At this point, the only possible value for backoff is the initial one, but
123+
// better be safe than sorry.
124+
tracing::warn!(
125+
"Crypto-store lock {} was already taken by {}; let's pretend we just acquired it.",
126+
self.lock_key,
127+
self.lock_holder
128+
);
129+
self.backoff = WaitingTime::Some(Self::INITIAL_BACKOFF_MS);
130+
return Ok(());
131+
}
132+
133+
let wait = match self.backoff {
134+
WaitingTime::Some(val) => val,
135+
WaitingTime::Stop => {
136+
// We've reached the maximum backoff, abandon.
137+
return Err(LockStoreError::LockTimeout.into());
138+
}
139+
};
140+
141+
// Exponential backoff! Multiply by 2 the time we've waited before, cap it to
142+
// max_backoff.
143+
let next_value = wait.saturating_mul(2);
144+
self.backoff = if next_value >= self.max_backoff {
145+
WaitingTime::Stop
146+
} else {
147+
WaitingTime::Some(next_value)
148+
};
149+
150+
sleep(Duration::from_millis(wait.into())).await;
151+
}
152+
}
153+
154+
/// Release the lock taken previously with [`Self::lock()`].
155+
///
156+
/// Will return an error if the lock wasn't taken.
157+
pub async fn unlock(&mut self) -> Result<(), CryptoStoreError> {
158+
let read = self
159+
.store
160+
.get_custom_value(&self.lock_key)
161+
.await?
162+
.ok_or(CryptoStoreError::from(LockStoreError::MissingLockValue))?;
163+
164+
if read != self.lock_holder.as_bytes() {
165+
return Err(LockStoreError::IncorrectLockValue.into());
166+
}
167+
168+
let removed = self.store.remove_custom_value(&self.lock_key).await?;
169+
if removed {
170+
Ok(())
171+
} else {
172+
Err(LockStoreError::MissingLockValue.into())
173+
}
174+
}
175+
}
176+
177+
/// Error related to the locking API of the crypto store.
178+
#[derive(Debug, thiserror::Error)]
179+
pub enum LockStoreError {
180+
/// A lock value was to be removed, but it didn't contain the expected lock
181+
/// value.
182+
#[error("a lock value was to be removed, but it didn't contain the expected lock value")]
183+
IncorrectLockValue,
184+
185+
/// A lock value was to be removed, but it was missing in the database.
186+
#[error("a lock value was to be removed, but it was missing in the database")]
187+
MissingLockValue,
188+
189+
/// Spent too long waiting for a database lock.
190+
#[error("a lock timed out")]
191+
LockTimeout,
192+
}

0 commit comments

Comments
 (0)