Skip to content

Commit dd9584c

Browse files
committed
feat: implement a generation counter for the CryptoStore lock
1 parent 757716c commit dd9584c

File tree

5 files changed

+165
-44
lines changed

5 files changed

+165
-44
lines changed

bindings/matrix-sdk-ffi/src/encryption_sync.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,6 @@ impl EncryptionSync {
7373
error!("Error when stopping the encryption sync: {err}");
7474
}
7575
}
76-
77-
pub fn reload_caches(&self) {
78-
if let Err(err) = RUNTIME.block_on(self.sync.reload_caches()) {
79-
error!("Error when reloading caches: {err}");
80-
}
81-
}
8276
}
8377

8478
impl Client {

crates/matrix-sdk-crypto/src/machine.rs

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ use crate::{
6767
requests::{IncomingResponse, OutgoingRequest, UploadSigningKeysRequest},
6868
session_manager::{GroupSessionManager, SessionManager},
6969
store::{
70-
Changes, DeviceChanges, DynCryptoStore, IdentityChanges, IntoCryptoStore, MemoryStore,
71-
Result as StoreResult, SecretImportError, Store,
70+
locks::LockStoreError, Changes, DeviceChanges, DynCryptoStore, IdentityChanges,
71+
IntoCryptoStore, MemoryStore, Result as StoreResult, SecretImportError, Store,
7272
},
7373
types::{
7474
events::{
@@ -129,6 +129,18 @@ pub struct OlmMachineInner {
129129
/// A state machine that handles creating room key backups.
130130
#[cfg(feature = "backups_v1")]
131131
backup_machine: BackupMachine,
132+
/// Latest "generation" of data known by the crypto store.
133+
///
134+
/// This is a counter that only increments, set in the database (and can
135+
/// wrap). It's incremented whenever some process acquires a lock for the
136+
/// first time. *This assumes the crypto store lock is being held, to
137+
/// avoid data races on writing to this value in the store*.
138+
///
139+
/// The current process will maintain this value in local memory and in the
140+
/// DB over time. Observing a different value than the one read in
141+
/// memory, when reading from the store indicates that somebody else has
142+
/// written into the database under our feet.
143+
pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
132144
}
133145

134146
#[cfg(not(tarpaulin_include))]
@@ -142,6 +154,8 @@ impl std::fmt::Debug for OlmMachine {
142154
}
143155

144156
impl OlmMachine {
157+
const CURRENT_GENERATION_STORE_KEY: &str = "generation-counter";
158+
145159
/// Create a new memory based OlmMachine.
146160
///
147161
/// The created machine will keep the encryption keys only in memory and
@@ -212,6 +226,7 @@ impl OlmMachine {
212226
identity_manager,
213227
#[cfg(feature = "backups_v1")]
214228
backup_machine,
229+
crypto_store_generation: Arc::new(Mutex::new(None)),
215230
});
216231

217232
Self { inner }
@@ -1728,6 +1743,106 @@ impl OlmMachine {
17281743
pub fn backup_machine(&self) -> &BackupMachine {
17291744
&self.inner.backup_machine
17301745
}
1746+
1747+
/// Syncs the database and in-memory generation counter.
1748+
///
1749+
/// This requires that the crypto store lock has been acquired already.
1750+
pub async fn initialize_crypto_store_generation(&self) -> StoreResult<()> {
1751+
// Avoid reentrant initialization by taking the lock for the entire's function
1752+
// scope.
1753+
let mut gen_guard = self.inner.crypto_store_generation.lock().await;
1754+
1755+
let prev_generation =
1756+
self.inner.store.get_custom_value(Self::CURRENT_GENERATION_STORE_KEY).await?;
1757+
1758+
let gen = match prev_generation {
1759+
Some(val) => {
1760+
// There was a value in the store. We need to signal that we're a different
1761+
// process, so we don't just reuse the value but increment it.
1762+
u64::from_le_bytes(
1763+
val.try_into().map_err(|_| LockStoreError::InvalidGenerationFormat)?,
1764+
)
1765+
.wrapping_add(1)
1766+
}
1767+
None => 0,
1768+
};
1769+
1770+
self.inner
1771+
.store
1772+
.set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, gen.to_le_bytes().to_vec())
1773+
.await?;
1774+
1775+
*gen_guard = Some(gen);
1776+
1777+
Ok(())
1778+
}
1779+
1780+
/// If needs be, update the local and on-disk crypto store generation.
1781+
///
1782+
/// Returns true whether another user has modified the internal generation
1783+
/// counter, and as such we've incremented and updated it in the
1784+
/// database.
1785+
///
1786+
/// ## Requirements
1787+
///
1788+
/// - This assumes that `initialize_crypto_store_generation` has been called
1789+
/// beforehand.
1790+
/// - This requires that the crypto store lock has been acquired.
1791+
pub async fn maintain_crypto_store_generation(&self) -> StoreResult<bool> {
1792+
let mut gen_guard = self.inner.crypto_store_generation.lock().await;
1793+
1794+
// The database value must be there:
1795+
// - either we could initialize beforehand, thus write into the database,
1796+
// - or we couldn't, and then another process was holding onto the database's
1797+
// lock, thus
1798+
// has written a generation counter in there.
1799+
let actual_gen = self
1800+
.inner
1801+
.store
1802+
.get_custom_value(Self::CURRENT_GENERATION_STORE_KEY)
1803+
.await?
1804+
.ok_or(LockStoreError::MissingGeneration)?;
1805+
1806+
let actual_gen = u64::from_le_bytes(
1807+
actual_gen.try_into().map_err(|_| LockStoreError::InvalidGenerationFormat)?,
1808+
);
1809+
1810+
let expected_gen = match gen_guard.as_ref() {
1811+
Some(expected_gen) => {
1812+
if actual_gen == *expected_gen {
1813+
return Ok(false);
1814+
}
1815+
// Increment the biggest, and store it everywhere.
1816+
actual_gen.max(*expected_gen).wrapping_add(1)
1817+
}
1818+
None => {
1819+
// Some other process hold onto the lock when initializing, so we must reload.
1820+
// Increment database value, and store it everywhere.
1821+
actual_gen.wrapping_add(1)
1822+
}
1823+
};
1824+
1825+
tracing::debug!(
1826+
"Crypto store generation mismatch: previously known was {:?}, actual is {:?}, next is {}",
1827+
*gen_guard,
1828+
actual_gen,
1829+
expected_gen
1830+
);
1831+
1832+
// Update known value.
1833+
*gen_guard = Some(expected_gen);
1834+
1835+
// Update value in database.
1836+
self.inner
1837+
.store
1838+
.set_custom_value(
1839+
Self::CURRENT_GENERATION_STORE_KEY,
1840+
expected_gen.to_le_bytes().to_vec(),
1841+
)
1842+
.await?;
1843+
1844+
Ok(true)
1845+
}
17311846
}
17321847

17331848
#[cfg(any(feature = "testing", test))]

crates/matrix-sdk-crypto/src/store/locks.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,15 @@ pub enum LockStoreError {
314314
/// Spent too long waiting for a database lock.
315315
#[error("a lock timed out")]
316316
LockTimeout,
317+
318+
/// The generation counter is missing, and should always be present.
319+
#[error("missing generation counter in the store")]
320+
MissingGeneration,
321+
322+
/// Unexpected format for the generation counter. Is someone tampering the
323+
/// database?
324+
#[error("invalid format of the generation counter")]
325+
InvalidGenerationFormat,
317326
}
318327

319328
#[cfg(test)]

crates/matrix-sdk-ui/src/encryption_sync/mod.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -213,19 +213,6 @@ impl EncryptionSync {
213213

214214
Ok(())
215215
}
216-
217-
/// Request a reload of the internal caches used by this sync.
218-
///
219-
/// This must be called every time the process running this loop was
220-
/// suspended and got back into the foreground, and another process may have
221-
/// written to the same underlying store (e.g. notification process vs
222-
/// main process).
223-
pub async fn reload_caches(&self) -> Result<(), Error> {
224-
// Regenerate the crypto store caches first.
225-
self.client.encryption().reload_caches().await.map_err(Error::ClientError)?;
226-
227-
Ok(())
228-
}
229216
}
230217

231218
/// Errors for the [`EncryptionSync`].

crates/matrix-sdk/src/encryption/mod.rs

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,17 @@ impl Encryption {
864864
let lock =
865865
olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
866866

867+
// Gently try to initialize the crypto store generation counter.
868+
//
869+
// If we don't get the lock immediately, then it is already acquired by another
870+
// process, and we'll get to reload next time we acquire the lock.
871+
{
872+
let guard = lock.try_lock_once().await?;
873+
if guard.is_some() {
874+
olm_machine.initialize_crypto_store_generation().await?;
875+
}
876+
}
877+
867878
self.client
868879
.inner
869880
.cross_process_crypto_store_lock
@@ -873,6 +884,22 @@ impl Encryption {
873884
Ok(())
874885
}
875886

887+
/// Maybe reload the `OlmMachine` after acquiring the lock for the first
888+
/// time.
889+
async fn on_lock_newly_acquired(&self) -> Result<(), Error> {
890+
let olm_machine_guard = self.client.olm_machine().await;
891+
if let Some(olm_machine) = olm_machine_guard.as_ref() {
892+
// If the crypto store generation has changed,
893+
if olm_machine.maintain_crypto_store_generation().await? {
894+
// (get rid of the reference to the current crypto store first)
895+
drop(olm_machine_guard);
896+
// Recreate the OlmMachine.
897+
self.client.base_client().regenerate_olm().await?;
898+
}
899+
}
900+
Ok(())
901+
}
902+
876903
/// If a lock was created with [`Self::enable_cross_process_store_lock`],
877904
/// spin-waits until the lock is available.
878905
///
@@ -883,20 +910,11 @@ impl Encryption {
883910
max_backoff: Option<u32>,
884911
) -> Result<Option<CryptoStoreLockGuard>, Error> {
885912
if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() {
886-
match lock.try_lock_once().await? {
887-
Some(guard) => Ok(Some(guard)),
888-
None => {
889-
// We didn't get the lock on the first attempt, so that means that another
890-
// process is using it. Wait for it to release it.
891-
let guard = lock.spin_lock(max_backoff).await?;
892-
893-
// As we didn't get the lock on the first attempt, force-reload all the crypto
894-
// state caches at once, by recreating the OlmMachine from scratch.
895-
self.client.base_client().regenerate_olm().await?;
896-
897-
Ok(Some(guard))
898-
}
899-
}
913+
let guard = lock.spin_lock(max_backoff).await?;
914+
915+
self.on_lock_newly_acquired().await?;
916+
917+
Ok(Some(guard))
900918
} else {
901919
Ok(None)
902920
}
@@ -908,19 +926,17 @@ impl Encryption {
908926
/// Returns a guard to the lock, if it was obtained.
909927
pub async fn try_lock_store_once(&self) -> Result<Option<CryptoStoreLockGuard>, Error> {
910928
if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() {
911-
Ok(lock.try_lock_once().await?)
929+
let maybe_guard = lock.try_lock_once().await?;
930+
931+
if maybe_guard.is_some() {
932+
self.on_lock_newly_acquired().await?;
933+
}
934+
935+
Ok(maybe_guard)
912936
} else {
913937
Ok(None)
914938
}
915939
}
916-
917-
/// Manually request that the internal crypto caches be reloaded.
918-
pub async fn reload_caches(&self) -> Result<(), Error> {
919-
// At this time, rleoading the `OlmMachine` ought to be sufficient.
920-
self.client.base_client().regenerate_olm().await?;
921-
922-
Ok(())
923-
}
924940
}
925941

926942
#[cfg(all(test, not(target_arch = "wasm32")))]

0 commit comments

Comments
 (0)