Skip to content

Commit 913c39a

Browse files
committed
feat: implement a generation counter for the CryptoStore lock
1 parent 49b1e87 commit 913c39a

File tree

5 files changed

+177
-53
lines changed

5 files changed

+177
-53
lines changed

bindings/matrix-sdk-ffi/src/encryption_sync.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,6 @@ impl EncryptionSync {
7373
error!("Error when stopping the encryption sync: {err}");
7474
}
7575
}
76-
77-
pub fn reload_caches(&self) {
78-
if let Err(err) = RUNTIME.block_on(self.sync.reload_caches()) {
79-
error!("Error when reloading caches: {err}");
80-
}
81-
}
8276
}
8377

8478
impl Client {

crates/matrix-sdk-crypto/src/machine.rs

Lines changed: 118 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ use crate::{
6767
requests::{IncomingResponse, OutgoingRequest, UploadSigningKeysRequest},
6868
session_manager::{GroupSessionManager, SessionManager},
6969
store::{
70-
Changes, DeviceChanges, DynCryptoStore, IdentityChanges, IntoCryptoStore, MemoryStore,
71-
Result as StoreResult, SecretImportError, Store,
70+
locks::LockStoreError, Changes, DeviceChanges, DynCryptoStore, IdentityChanges,
71+
IntoCryptoStore, MemoryStore, Result as StoreResult, SecretImportError, Store,
7272
},
7373
types::{
7474
events::{
@@ -129,6 +129,18 @@ pub struct OlmMachineInner {
129129
/// A state machine that handles creating room key backups.
130130
#[cfg(feature = "backups_v1")]
131131
backup_machine: BackupMachine,
132+
/// Latest "generation" of data known by the crypto store.
133+
///
134+
/// This is a counter that only increments, set in the database (and can
135+
/// wrap). It's incremented whenever some process acquires a lock for the
136+
/// first time. *This assumes the crypto store lock is being held, to
137+
/// avoid data races on writing to this value in the store*.
138+
///
139+
/// The current process will maintain this value in local memory and in the
140+
/// DB over time. Observing a different value than the one read in
141+
/// memory, when reading from the store indicates that somebody else has
142+
/// written into the database under our feet.
143+
pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
132144
}
133145

134146
#[cfg(not(tarpaulin_include))]
@@ -142,6 +154,8 @@ impl std::fmt::Debug for OlmMachine {
142154
}
143155

144156
impl OlmMachine {
157+
const CURRENT_GENERATION_STORE_KEY: &str = "generation-counter";
158+
145159
/// Create a new memory based OlmMachine.
146160
///
147161
/// The created machine will keep the encryption keys only in memory and
@@ -212,6 +226,7 @@ impl OlmMachine {
212226
identity_manager,
213227
#[cfg(feature = "backups_v1")]
214228
backup_machine,
229+
crypto_store_generation: Arc::new(Mutex::new(None)),
215230
});
216231

217232
Self { inner }
@@ -1728,6 +1743,107 @@ impl OlmMachine {
17281743
pub fn backup_machine(&self) -> &BackupMachine {
17291744
&self.inner.backup_machine
17301745
}
1746+
1747+
/// Syncs the database and in-memory generation counter.
1748+
///
1749+
/// This requires that the crypto store lock has been acquired already.
1750+
pub async fn initialize_crypto_store_generation(&self) -> StoreResult<()> {
1751+
// Avoid reentrant initialization by taking the lock for the entire's function
1752+
// scope.
1753+
let mut gen_guard = self.inner.crypto_store_generation.lock().await;
1754+
1755+
let prev_generation =
1756+
self.inner.store.get_custom_value(Self::CURRENT_GENERATION_STORE_KEY).await?;
1757+
1758+
let gen = match prev_generation {
1759+
None => {
1760+
// At the beginning, all there was was an 8-bytes long zero.
1761+
let zero_u64: u64 = 0;
1762+
self.inner
1763+
.store
1764+
.set_custom_value(
1765+
Self::CURRENT_GENERATION_STORE_KEY,
1766+
zero_u64.to_le_bytes().to_vec(),
1767+
)
1768+
.await?;
1769+
zero_u64
1770+
}
1771+
1772+
Some(val) => u64::from_le_bytes(
1773+
val.try_into().map_err(|_| LockStoreError::InvalidGenerationFormat)?,
1774+
),
1775+
};
1776+
1777+
*gen_guard = Some(gen);
1778+
Ok(())
1779+
}
1780+
1781+
/// If needs be, update the local and on-disk crypto store generation.
1782+
///
1783+
/// Returns true whether another user has modified the internal generation
1784+
/// counter, and as such we've incremented and updated it in the
1785+
/// database.
1786+
///
1787+
/// ## Requirements
1788+
///
1789+
/// - This assumes that `initialize_crypto_store_generation` has been called
1790+
/// beforehands.
1791+
/// - This requires that the crypto store lock has been acquired.
1792+
pub async fn maintain_crypto_store_generation(&self) -> StoreResult<bool> {
1793+
let mut gen_guard = self.inner.crypto_store_generation.lock().await;
1794+
1795+
// The database value must be there:
1796+
// - either we could initialize beforehands, thus write into the database,
1797+
// - or we couldn't, and then another process was holding onto the database's
1798+
// lock, thus
1799+
// has written a generation counter in there.
1800+
let actual_gen = self
1801+
.inner
1802+
.store
1803+
.get_custom_value(Self::CURRENT_GENERATION_STORE_KEY)
1804+
.await?
1805+
.ok_or(LockStoreError::MissingGeneration)?;
1806+
1807+
let actual_gen = u64::from_le_bytes(
1808+
actual_gen.try_into().map_err(|_| LockStoreError::InvalidGenerationFormat)?,
1809+
);
1810+
1811+
let expected_gen = match gen_guard.as_ref() {
1812+
Some(expected_gen) => {
1813+
if actual_gen == *expected_gen {
1814+
return Ok(false);
1815+
}
1816+
// Increment the biggest, and store it everywhere.
1817+
actual_gen.max(*expected_gen).wrapping_add(1)
1818+
}
1819+
None => {
1820+
// Some other process hold onto the lock when initializing, so we must reload.
1821+
// Increment database value, and store it everywhere.
1822+
actual_gen.wrapping_add(1)
1823+
}
1824+
};
1825+
1826+
tracing::debug!(
1827+
"Crypto store generation mismatch: previously known was {:?}, actual is {:?}, next is {}",
1828+
*gen_guard,
1829+
actual_gen,
1830+
expected_gen
1831+
);
1832+
1833+
// Update known value.
1834+
*gen_guard = Some(expected_gen);
1835+
1836+
// Update value in database.
1837+
self.inner
1838+
.store
1839+
.set_custom_value(
1840+
Self::CURRENT_GENERATION_STORE_KEY,
1841+
expected_gen.to_le_bytes().to_vec(),
1842+
)
1843+
.await?;
1844+
1845+
Ok(true)
1846+
}
17311847
}
17321848

17331849
#[cfg(any(feature = "testing", test))]

crates/matrix-sdk-crypto/src/store/locks.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,4 +239,13 @@ pub enum LockStoreError {
239239
/// Spent too long waiting for a database lock.
240240
#[error("a lock timed out")]
241241
LockTimeout,
242+
243+
/// The generation counter is missing, and should always be present.
244+
#[error("missing generation counter in the store")]
245+
MissingGeneration,
246+
247+
/// Unexpected format for the generation counter. Is someone tampering the
248+
/// database?
249+
#[error("invalid format of the generation counter")]
250+
InvalidGenerationFormat,
242251
}

crates/matrix-sdk-ui/src/encryption_sync/mod.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -194,19 +194,6 @@ impl EncryptionSync {
194194

195195
Ok(())
196196
}
197-
198-
/// Request a reload of the internal caches used by this sync.
199-
///
200-
/// This must be called every time the process running this loop was
201-
/// suspended and got back into the foreground, and another process may have
202-
/// written to the same underlying store (e.g. notification process vs
203-
/// main process).
204-
pub async fn reload_caches(&self) -> Result<(), Error> {
205-
// Regenerate the crypto store caches first.
206-
self.client.encryption().reload_caches().await.map_err(Error::ClientError)?;
207-
208-
Ok(())
209-
}
210197
}
211198

212199
/// Errors for the [`EncryptionSync`].

crates/matrix-sdk/src/encryption/mod.rs

Lines changed: 50 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use std::{
2020
collections::{BTreeMap, HashSet},
2121
io::{Cursor, Read, Write},
2222
iter,
23-
ops::Not as _,
2423
path::PathBuf,
2524
};
2625

@@ -194,18 +193,20 @@ impl Client {
194193
thumbnail_source,
195194
thumbnail_info
196195
});
197-
let content = assign!(ImageMessageEventContent::encrypted(body.to_owned(), file), {
198-
info: Some(Box::new(info))
199-
});
196+
let content =
197+
assign!(ImageMessageEventContent::encrypted(body.to_owned(), file), {
198+
info: Some(Box::new(info))
199+
});
200200
MessageType::Image(content)
201201
}
202202
mime::AUDIO => {
203203
let info = assign!(info.map(AudioInfo::from).unwrap_or_default(), {
204204
mimetype: Some(content_type.as_ref().to_owned()),
205205
});
206-
let content = assign!(AudioMessageEventContent::encrypted(body.to_owned(), file), {
207-
info: Some(Box::new(info))
208-
});
206+
let content =
207+
assign!(AudioMessageEventContent::encrypted(body.to_owned(), file), {
208+
info: Some(Box::new(info))
209+
});
209210
MessageType::Audio(content)
210211
}
211212
mime::VIDEO => {
@@ -214,9 +215,10 @@ impl Client {
214215
thumbnail_source,
215216
thumbnail_info
216217
});
217-
let content = assign!(VideoMessageEventContent::encrypted(body.to_owned(), file), {
218-
info: Some(Box::new(info))
219-
});
218+
let content =
219+
assign!(VideoMessageEventContent::encrypted(body.to_owned(), file), {
220+
info: Some(Box::new(info))
221+
});
220222
MessageType::Video(content)
221223
}
222224
_ => {
@@ -862,6 +864,16 @@ impl Encryption {
862864
let lock =
863865
olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
864866

867+
// Gently try to initialize the crypto store generation counter.
868+
//
869+
// If we don't get the lock immediately, then it is already acquired by another
870+
// process, and we'll get to reload next time we acquire the lock.
871+
if lock.try_lock_once().await? {
872+
let res = olm_machine.initialize_crypto_store_generation().await;
873+
lock.unlock().await?;
874+
res?;
875+
}
876+
865877
self.client
866878
.inner
867879
.cross_process_crypto_store_lock
@@ -871,26 +883,34 @@ impl Encryption {
871883
Ok(())
872884
}
873885

886+
/// Maybe reload the `OlmMachine` after acquiring the lock for the first
887+
/// time.
888+
async fn on_lock_newly_acquired(&self) -> Result<(), Error> {
889+
let olm_machine_guard = self.client.olm_machine().await;
890+
if let Some(olm_machine) = olm_machine_guard.as_ref() {
891+
// If the crypto store generation has changed,
892+
if olm_machine.maintain_crypto_store_generation().await? {
893+
// (get rid of the reference to the current crypto store first)
894+
drop(olm_machine_guard);
895+
// Recreate the OlmMachine.
896+
self.client.base_client().regenerate_olm().await?;
897+
}
898+
}
899+
Ok(())
900+
}
901+
874902
/// If a lock was created with [`Self::enable_cross_process_store_lock`],
875903
/// spin-waits until the lock is available.
876904
///
877905
/// May reload the `OlmMachine`, after obtaining the lock but not on the
878906
/// first time.
879907
pub async fn spin_lock_store(&self, max_backoff: Option<u32>) -> Result<(), Error> {
880908
if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() {
881-
if lock.try_lock_once().await?.not() {
882-
// We didn't get the lock on the first attempt, so that means that another
883-
// process is using it. Wait for it to release it.
884-
lock.spin_lock(max_backoff).await?;
885-
886-
// As we didn't get the lock on the first attempt, force-reload all the crypto
887-
// state caches at once, by recreating the OlmMachine from scratch.
888-
if let Err(err) = self.client.base_client().regenerate_olm().await {
889-
// First, give back the cross-process lock.
890-
lock.unlock().await?;
891-
// Then return the error to the caller.
892-
return Err(err.into());
893-
};
909+
lock.spin_lock(max_backoff).await?;
910+
911+
if let Err(err) = self.on_lock_newly_acquired().await {
912+
self.unlock_store().await?;
913+
return Err(err);
894914
}
895915
}
896916
Ok(())
@@ -902,7 +922,13 @@ impl Encryption {
902922
/// Returns whether the lock was obtained or not.
903923
pub async fn try_lock_store_once(&self) -> Result<bool, Error> {
904924
if let Some(lock) = self.client.inner.cross_process_crypto_store_lock.get() {
905-
return Ok(lock.try_lock_once().await?);
925+
if lock.try_lock_once().await? {
926+
if let Err(err) = self.on_lock_newly_acquired().await {
927+
self.unlock_store().await?;
928+
return Err(err);
929+
}
930+
}
931+
return Ok(true);
906932
}
907933
Ok(false)
908934
}
@@ -917,14 +943,6 @@ impl Encryption {
917943
}
918944
Ok(())
919945
}
920-
921-
/// Manually request that the internal crypto caches be reloaded.
922-
pub async fn reload_caches(&self) -> Result<(), Error> {
923-
// At this time, rleoading the `OlmMachine` ought to be sufficient.
924-
self.client.base_client().regenerate_olm().await?;
925-
926-
Ok(())
927-
}
928946
}
929947

930948
#[cfg(all(test, not(target_arch = "wasm32")))]

0 commit comments

Comments
 (0)