Skip to content

Commit 801bde0

Browse files
committed
feat: implement a generation counter for the CryptoStore lock
1 parent 49b1e87 commit 801bde0

File tree

5 files changed

+164
-44
lines changed

5 files changed

+164
-44
lines changed

bindings/matrix-sdk-ffi/src/encryption_sync.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,6 @@ impl EncryptionSync {
7373
error!("Error when stopping the encryption sync: {err}");
7474
}
7575
}
76-
77-
pub fn reload_caches(&self) {
78-
if let Err(err) = RUNTIME.block_on(self.sync.reload_caches()) {
79-
error!("Error when reloading caches: {err}");
80-
}
81-
}
8276
}
8377

8478
impl Client {

crates/matrix-sdk-crypto/src/machine.rs

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ use crate::{
6767
requests::{IncomingResponse, OutgoingRequest, UploadSigningKeysRequest},
6868
session_manager::{GroupSessionManager, SessionManager},
6969
store::{
70-
Changes, DeviceChanges, DynCryptoStore, IdentityChanges, IntoCryptoStore, MemoryStore,
71-
Result as StoreResult, SecretImportError, Store,
70+
locks::LockStoreError, Changes, DeviceChanges, DynCryptoStore, IdentityChanges,
71+
IntoCryptoStore, MemoryStore, Result as StoreResult, SecretImportError, Store,
7272
},
7373
types::{
7474
events::{
@@ -129,6 +129,18 @@ pub struct OlmMachineInner {
129129
/// A state machine that handles creating room key backups.
130130
#[cfg(feature = "backups_v1")]
131131
backup_machine: BackupMachine,
132+
/// Latest "generation" of data known by the crypto store.
133+
///
134+
/// This is a counter that only increments, set in the database (and can
135+
/// wrap). It's incremented whenever some process acquires a lock for the
136+
/// first time. *This assumes the crypto store lock is being held, to
137+
/// avoid data races on writing to this value in the store*.
138+
///
139+
/// The current process will maintain this value in local memory and in the
140+
/// DB over time. Observing a different value than the one read in
141+
/// memory, when reading from the store indicates that somebody else has
142+
/// written into the database under our feet.
143+
pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
132144
}
133145

134146
#[cfg(not(tarpaulin_include))]
@@ -142,6 +154,8 @@ impl std::fmt::Debug for OlmMachine {
142154
}
143155

144156
impl OlmMachine {
157+
const CURRENT_GENERATION_STORE_KEY: &str = "generation-counter";
158+
145159
/// Create a new memory based OlmMachine.
146160
///
147161
/// The created machine will keep the encryption keys only in memory and
@@ -212,6 +226,7 @@ impl OlmMachine {
212226
identity_manager,
213227
#[cfg(feature = "backups_v1")]
214228
backup_machine,
229+
crypto_store_generation: Arc::new(Mutex::new(None)),
215230
});
216231

217232
Self { inner }
@@ -1728,6 +1743,106 @@ impl OlmMachine {
17281743
pub fn backup_machine(&self) -> &BackupMachine {
17291744
&self.inner.backup_machine
17301745
}
1746+
1747+
/// Syncs the database and in-memory generation counter.
1748+
///
1749+
/// This requires that the crypto store lock has been acquired already.
1750+
pub async fn initialize_crypto_store_generation(&self) -> StoreResult<()> {
1751+
// Avoid reentrant initialization by taking the lock for the entire's function
1752+
// scope.
1753+
let mut gen_guard = self.inner.crypto_store_generation.lock().await;
1754+
1755+
let prev_generation =
1756+
self.inner.store.get_custom_value(Self::CURRENT_GENERATION_STORE_KEY).await?;
1757+
1758+
let gen = match prev_generation {
1759+
Some(val) => {
1760+
// There was a value in the store. We need to signal that we're a different
1761+
// process, so we don't just reuse the value but increment it.
1762+
u64::from_le_bytes(
1763+
val.try_into().map_err(|_| LockStoreError::InvalidGenerationFormat)?,
1764+
)
1765+
.wrapping_add(1)
1766+
}
1767+
None => 0,
1768+
};
1769+
1770+
self.inner
1771+
.store
1772+
.set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, gen.to_le_bytes().to_vec())
1773+
.await?;
1774+
1775+
*gen_guard = Some(gen);
1776+
1777+
Ok(())
1778+
}
1779+
1780+
/// If needs be, update the local and on-disk crypto store generation.
1781+
///
1782+
/// Returns true whether another user has modified the internal generation
1783+
/// counter, and as such we've incremented and updated it in the
1784+
/// database.
1785+
///
1786+
/// ## Requirements
1787+
///
1788+
/// - This assumes that `initialize_crypto_store_generation` has been called
1789+
/// beforehands.
1790+
/// - This requires that the crypto store lock has been acquired.
1791+
pub async fn maintain_crypto_store_generation(&self) -> StoreResult<bool> {
1792+
let mut gen_guard = self.inner.crypto_store_generation.lock().await;
1793+
1794+
// The database value must be there:
1795+
// - either we could initialize beforehands, thus write into the database,
1796+
// - or we couldn't, and then another process was holding onto the database's
1797+
// lock, thus
1798+
// has written a generation counter in there.
1799+
let actual_gen = self
1800+
.inner
1801+
.store
1802+
.get_custom_value(Self::CURRENT_GENERATION_STORE_KEY)
1803+
.await?
1804+
.ok_or(LockStoreError::MissingGeneration)?;
1805+
1806+
let actual_gen = u64::from_le_bytes(
1807+
actual_gen.try_into().map_err(|_| LockStoreError::InvalidGenerationFormat)?,
1808+
);
1809+
1810+
let expected_gen = match gen_guard.as_ref() {
1811+
Some(expected_gen) => {
1812+
if actual_gen == *expected_gen {
1813+
return Ok(false);
1814+
}
1815+
// Increment the biggest, and store it everywhere.
1816+
actual_gen.max(*expected_gen).wrapping_add(1)
1817+
}
1818+
None => {
1819+
// Some other process hold onto the lock when initializing, so we must reload.
1820+
// Increment database value, and store it everywhere.
1821+
actual_gen.wrapping_add(1)
1822+
}
1823+
};
1824+
1825+
tracing::debug!(
1826+
"Crypto store generation mismatch: previously known was {:?}, actual is {:?}, next is {}",
1827+
*gen_guard,
1828+
actual_gen,
1829+
expected_gen
1830+
);
1831+
1832+
// Update known value.
1833+
*gen_guard = Some(expected_gen);
1834+
1835+
// Update value in database.
1836+
self.inner
1837+
.store
1838+
.set_custom_value(
1839+
Self::CURRENT_GENERATION_STORE_KEY,
1840+
expected_gen.to_le_bytes().to_vec(),
1841+
)
1842+
.await?;
1843+
1844+
Ok(true)
1845+
}
17311846
}
17321847

17331848
#[cfg(any(feature = "testing", test))]

crates/matrix-sdk-crypto/src/store/locks.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,4 +239,13 @@ pub enum LockStoreError {
239239
/// Spent too long waiting for a database lock.
240240
#[error("a lock timed out")]
241241
LockTimeout,
242+
243+
/// The generation counter is missing, and should always be present.
244+
#[error("missing generation counter in the store")]
245+
MissingGeneration,
246+
247+
/// Unexpected format for the generation counter. Is someone tampering the
248+
/// database?
249+
#[error("invalid format of the generation counter")]
250+
InvalidGenerationFormat,
242251
}

crates/matrix-sdk-ui/src/encryption_sync/mod.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -194,19 +194,6 @@ impl EncryptionSync {
194194

195195
Ok(())
196196
}
197-
198-
/// Request a reload of the internal caches used by this sync.
199-
///
200-
/// This must be called every time the process running this loop was
201-
/// suspended and got back into the foreground, and another process may have
202-
/// written to the same underlying store (e.g. notification process vs
203-
/// main process).
204-
pub async fn reload_caches(&self) -> Result<(), Error> {
205-
// Regenerate the crypto store caches first.
206-
self.client.encryption().reload_caches().await.map_err(Error::ClientError)?;
207-
208-
Ok(())
209-
}
210197
}
211198

212199
/// Errors for the [`EncryptionSync`].

crates/matrix-sdk/src/encryption/mod.rs

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use std::{
2020
collections::{BTreeMap, HashSet},
2121
io::{Cursor, Read, Write},
2222
iter,
23-
ops::Not as _,
2423
path::PathBuf,
2524
};
2625

@@ -862,6 +861,16 @@ impl Encryption {
862861
let lock =
863862
olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
864863

864+
// Gently try to initialize the crypto store generation counter.
865+
//
866+
// If we don't get the lock immediately, then it is already acquired by another
867+
// process, and we'll get to reload next time we acquire the lock.
868+
if lock.try_lock_once().await? {
869+
let res = olm_machine.initialize_crypto_store_generation().await;
870+
lock.unlock().await?;
871+
res?;
872+
}
873+
865874
self.client
866875
.inner
867876
.cross_process_crypto_store_lock
@@ -871,26 +880,34 @@ impl Encryption {
871880
Ok(())
872881
}
873882

883+
/// Maybe reload the `OlmMachine` after acquiring the lock for the first
884+
/// time.
885+
async fn on_lock_newly_acquired(&self) -> Result<(), Error> {
886+
let olm_machine_guard = self.client.olm_machine().await;
887+
if let Some(olm_machine) = olm_machine_guard.as_ref() {
888+
// If the crypto store generation has changed,
889+
if olm_machine.maintain_crypto_store_generation().await? {
890+
// (get rid of the reference to the current crypto store first)
891+
drop(olm_machine_guard);
892+
// Recreate the OlmMachine.
893+
self.client.base_client().regenerate_olm().await?;
894+
}
895+
}
896+
Ok(())
897+
}
898+
874899
/// If a lock was created with [`Self::enable_cross_process_store_lock`],
875900
/// spin-waits until the lock is available.
876901
///
877902
/// May reload the `OlmMachine`, after obtaining the lock but not on the
878903
/// first time.
879904
pub async fn spin_lock_store(&self, max_backoff: Option<u32>) -> Result<(), Error> {
880905
if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() {
881-
if lock.try_lock_once().await?.not() {
882-
// We didn't get the lock on the first attempt, so that means that another
883-
// process is using it. Wait for it to release it.
884-
lock.spin_lock(max_backoff).await?;
885-
886-
// As we didn't get the lock on the first attempt, force-reload all the crypto
887-
// state caches at once, by recreating the OlmMachine from scratch.
888-
if let Err(err) = self.client.base_client().regenerate_olm().await {
889-
// First, give back the cross-process lock.
890-
lock.unlock().await?;
891-
// Then return the error to the caller.
892-
return Err(err.into());
893-
};
906+
lock.spin_lock(max_backoff).await?;
907+
908+
if let Err(err) = self.on_lock_newly_acquired().await {
909+
self.unlock_store().await?;
910+
return Err(err);
894911
}
895912
}
896913
Ok(())
@@ -902,7 +919,13 @@ impl Encryption {
902919
/// Returns whether the lock was obtained or not.
903920
pub async fn try_lock_store_once(&self) -> Result<bool, Error> {
904921
if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() {
905-
return Ok(lock.try_lock_once().await?);
922+
if lock.try_lock_once().await? {
923+
if let Err(err) = self.on_lock_newly_acquired().await {
924+
self.unlock_store().await?;
925+
return Err(err);
926+
}
927+
}
928+
return Ok(true);
906929
}
907930
Ok(false)
908931
}
@@ -917,14 +940,6 @@ impl Encryption {
917940
}
918941
Ok(())
919942
}
920-
921-
/// Manually request that the internal crypto caches be reloaded.
922-
pub async fn reload_caches(&self) -> Result<(), Error> {
923-
// At this time, rleoading the `OlmMachine` ought to be sufficient.
924-
self.client.base_client().regenerate_olm().await?;
925-
926-
Ok(())
927-
}
928943
}
929944

930945
#[cfg(all(test, not(target_arch = "wasm32")))]

0 commit comments

Comments
 (0)