diff --git a/Cargo.lock b/Cargo.lock index 53908908420..d22acd7e800 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1075,7 +1075,7 @@ dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", - "memoffset", + "memoffset 0.8.0", "scopeguard", ] @@ -2963,6 +2963,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.8.0" @@ -3141,6 +3150,8 @@ dependencies = [ "bitflags 1.3.2", "cfg-if", "libc", + "memoffset 0.7.1", + "pin-utils", "static_assertions", ] @@ -4681,6 +4692,27 @@ dependencies = [ "winapi", ] +[[package]] +name = "speedy" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7642861d1a1569b105c57008df95fb12ba648f7f90358f46e99bdfbf26729b84" +dependencies = [ + "memoffset 0.8.0", + "speedy-derive", +] + +[[package]] +name = "speedy-derive" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d395866cb6778625150f77a430cc0af764ce0300f6a3d3413477785fa34b6c7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.16", +] + [[package]] name = "spin" version = "0.5.2" @@ -4696,6 +4728,19 @@ dependencies = [ "der", ] +[[package]] +name = "sqlite-lock" +version = "0.1.0" +dependencies = [ + "anyhow", + "futures", + "matrix-sdk-crypto", + "matrix-sdk-sqlite", + "nix", + "speedy", + "tokio", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" diff --git a/crates/matrix-sdk-crypto/src/store/error.rs b/crates/matrix-sdk-crypto/src/store/error.rs index 81f721b19e7..0a9285ba149 100644 --- a/crates/matrix-sdk-crypto/src/store/error.rs +++ b/crates/matrix-sdk-crypto/src/store/error.rs @@ -18,6 +18,7 @@ use ruma::{IdParseError, OwnedDeviceId, OwnedUserId}; use serde_json::Error as SerdeError; use thiserror::Error; +use super::locks::LockStoreError; use crate::olm::SessionCreationError; /// A `CryptoStore` specific result type. @@ -78,6 +79,10 @@ pub enum CryptoStoreError { /// A problem with the underlying database backend #[error(transparent)] Backend(Box), + + /// An error due to locking. + #[error(transparent)] + Lock(#[from] LockStoreError), } impl CryptoStoreError { diff --git a/crates/matrix-sdk-crypto/src/store/integration_tests.rs b/crates/matrix-sdk-crypto/src/store/integration_tests.rs index 1f2da1a9397..4db7de65b38 100644 --- a/crates/matrix-sdk-crypto/src/store/integration_tests.rs +++ b/crates/matrix-sdk-crypto/src/store/integration_tests.rs @@ -768,6 +768,90 @@ macro_rules! cryptostore_integration_tests { let loaded_2 = store.get_custom_value("B").await.unwrap(); assert_eq!(None, loaded_2); } + + #[async_test] + async fn test_custom_value_insert_if_missing_remove() { + let (_account, store) = get_loaded_store("custom_value_insert_if_missing").await; + + let val = "Hello".as_bytes().to_vec(); + + // Removing while the value wasn't present doesn't remove anything. + let removed = store.remove_custom_value("A").await.unwrap(); + assert!(!removed); + + // Inserting while the value wasn't present does something. + let inserted = store.insert_custom_value_if_missing("A", val.clone()).await.unwrap(); + assert!(inserted); + + let loaded = store.get_custom_value("A").await.unwrap(); + assert_eq!(loaded, Some(val.clone())); + + // Inserting while the value was present does nothing. + let inserted = store.insert_custom_value_if_missing("A", val.clone()).await.unwrap(); + assert!(!inserted); + + // …even if we try hard. + let inserted = store.insert_custom_value_if_missing("A", val.clone()).await.unwrap(); + assert!(!inserted); + + // Removing while the value was present does something. + let removed = store.remove_custom_value("A").await.unwrap(); + assert!(removed); + + let loaded = store.get_custom_value("A").await.unwrap(); + assert_eq!(loaded, None); + + // …only the first time. + let removed = store.remove_custom_value("A").await.unwrap(); + assert!(!removed); + } + + #[async_test] + async fn test_custom_value_multiple_stores() { + // Hey, have you heard about my second, mimic store? + let val1 = "Hello".as_bytes().to_vec(); + let (_account, store1) = get_loaded_store("custom_value_multiple_stores").await; + let (_account, store2) = get_loaded_store("custom_value_multiple_stores").await; + + // Store1 inserts... + let inserted = store1.insert_custom_value_if_missing("A", val1.clone()).await.unwrap(); + assert!(inserted); + + // Store2 can't! + let val2 = "Goodbye".as_bytes().to_vec(); + let inserted = store2.insert_custom_value_if_missing("A", val2.clone()).await.unwrap(); + assert!(!inserted); + + // But when reading, both stores must agree. + let loaded = store1.get_custom_value("A").await.unwrap(); + assert_eq!(loaded, Some(val1.clone())); + + let loaded = store2.get_custom_value("A").await.unwrap(); + assert_eq!(loaded, Some(val1.clone())); + + // Clean up. + let removed = store1.remove_custom_value("A").await.unwrap(); + assert!(removed); + + let loaded = store1.get_custom_value("A").await.unwrap(); + assert_eq!(loaded, None); + + let loaded = store2.get_custom_value("A").await.unwrap(); + assert_eq!(loaded, None); + + // Now store2 can write, store1 can't, they agree on reading etc. + let inserted = store2.insert_custom_value_if_missing("A", val2.clone()).await.unwrap(); + assert!(inserted); + + let inserted = store1.insert_custom_value_if_missing("A", val1.clone()).await.unwrap(); + assert!(!inserted); + + let loaded = store1.get_custom_value("A").await.unwrap(); + assert_eq!(loaded, Some(val2.clone())); + + let loaded = store2.get_custom_value("A").await.unwrap(); + assert_eq!(loaded, Some(val2.clone())); + } } }; } diff --git a/crates/matrix-sdk-crypto/src/store/locks.rs b/crates/matrix-sdk-crypto/src/store/locks.rs new file mode 100644 index 00000000000..19dac4c1023 --- /dev/null +++ b/crates/matrix-sdk-crypto/src/store/locks.rs @@ -0,0 +1,192 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Collection of small helpers that implement store-based locks. +//! +//! Those locks are implemented as one value in the key-value crypto store, that +//! exists if and only if the lock has been taken. For this to work correctly, +//! we rely on multiple assumptions: +//! +//! - the store must allow concurrent reads and writes from multiple processes. +//! For instance, for +//! sqlite, this means that it is running in [WAL](https://www.sqlite.org/wal.html) mode. +//! - the two operations used in the store implementation, +//! `insert_custom_value_if_missing` and +//! `remove_custom_value`, must be atomic / implemented in a transaction. + +use std::{sync::Arc, time::Duration}; + +use tokio::time::sleep; + +use super::DynCryptoStore; +use crate::CryptoStoreError; + +/// Small state machine to handle wait times. +#[derive(Clone, Debug)] +enum WaitingTime { + /// Some time to wait, in milliseconds. + Some(u32), + /// Stop waiting when seeing this value. + Stop, +} + +/// A store-based lock for the `CryptoStore`. +#[derive(Clone, Debug)] +pub struct CryptoStoreLock { + /// The store we're using to lock. + store: Arc, + + /// The key used in the key/value mapping for the lock entry. + lock_key: String, + + /// A specific value to identify the lock's holder. + lock_holder: String, + + /// Backoff time, in milliseconds. + backoff: WaitingTime, + + /// Maximum backoff time, between two attempts. + max_backoff: u32, +} + +impl CryptoStoreLock { + /// Initial backoff, in milliseconds. This is the time we wait the first + /// time, if taking the lock initially failed. + const INITIAL_BACKOFF_MS: u32 = 10; + + /// Maximal backoff, in milliseconds. This is the maximum amount of time + /// we'll wait for the lock, *between two attempts*. + const MAX_BACKOFF_MS: u32 = 1000; + + /// Create a new store-based lock implemented as a value in the + /// crypto-store. + /// + /// # Parameters + /// + /// - `lock_key`: key in the key-value store to store the lock's state. + /// - `lock_holder`: identify the lock's holder with this given value. + /// - `max_backoff`: maximum time (in milliseconds) that should be waited + /// for, between two + /// attempts. When that time is reached a second time, the lock will stop + /// attempting to get the lock and will return a timeout error upon + /// locking. If not provided, will wait for `Self::MAX_BACKOFF_MS`. + pub fn new( + store: Arc, + lock_key: String, + lock_holder: String, + max_backoff: Option, + ) -> Self { + let max_backoff = max_backoff.unwrap_or(Self::MAX_BACKOFF_MS); + Self { + store, + lock_key, + lock_holder, + max_backoff, + backoff: WaitingTime::Some(Self::INITIAL_BACKOFF_MS), + } + } + + /// Attempt to take the lock, with exponential backoff if the lock has + /// already been taken before. + pub async fn lock(&mut self) -> Result<(), CryptoStoreError> { + loop { + let inserted = self + .store + .insert_custom_value_if_missing( + &self.lock_key, + self.lock_holder.as_bytes().to_vec(), + ) + .await?; + + if inserted { + // Reset backoff before returning, for the next attempt to lock. + self.backoff = WaitingTime::Some(Self::INITIAL_BACKOFF_MS); + return Ok(()); + } + + // Double-check that we were not interrupted last time we tried to take the + // lock, and forgot to release it; in that case, we *still* hold it. + let previous = self.store.get_custom_value(&self.lock_key).await?; + if previous.as_deref() == Some(self.lock_holder.as_bytes()) { + // At this point, the only possible value for backoff is the initial one, but + // better be safe than sorry. + tracing::warn!( + "Crypto-store lock {} was already taken by {}; let's pretend we just acquired it.", + self.lock_key, + self.lock_holder + ); + self.backoff = WaitingTime::Some(Self::INITIAL_BACKOFF_MS); + return Ok(()); + } + + let wait = match self.backoff { + WaitingTime::Some(val) => val, + WaitingTime::Stop => { + // We've reached the maximum backoff, abandon. + return Err(LockStoreError::LockTimeout.into()); + } + }; + + // Exponential backoff! Multiply by 2 the time we've waited before, cap it to + // max_backoff. + let next_value = wait.saturating_mul(2); + self.backoff = if next_value >= self.max_backoff { + WaitingTime::Stop + } else { + WaitingTime::Some(next_value) + }; + + sleep(Duration::from_millis(wait.into())).await; + } + } + + /// Release the lock taken previously with [`Self::lock()`]. + /// + /// Will return an error if the lock wasn't taken. + pub async fn unlock(&mut self) -> Result<(), CryptoStoreError> { + let read = self + .store + .get_custom_value(&self.lock_key) + .await? + .ok_or(CryptoStoreError::from(LockStoreError::MissingLockValue))?; + + if read != self.lock_holder.as_bytes() { + return Err(LockStoreError::IncorrectLockValue.into()); + } + + let removed = self.store.remove_custom_value(&self.lock_key).await?; + if removed { + Ok(()) + } else { + Err(LockStoreError::MissingLockValue.into()) + } + } +} + +/// Error related to the locking API of the crypto store. +#[derive(Debug, thiserror::Error)] +pub enum LockStoreError { + /// A lock value was to be removed, but it didn't contain the expected lock + /// value. + #[error("a lock value was to be removed, but it didn't contain the expected lock value")] + IncorrectLockValue, + + /// A lock value was to be removed, but it was missing in the database. + #[error("a lock value was to be removed, but it was missing in the database")] + MissingLockValue, + + /// Spent too long waiting for a database lock. + #[error("a lock timed out")] + LockTimeout, +} diff --git a/crates/matrix-sdk-crypto/src/store/memorystore.rs b/crates/matrix-sdk-crypto/src/store/memorystore.rs index 092b847a0af..40138b7490b 100644 --- a/crates/matrix-sdk-crypto/src/store/memorystore.rs +++ b/crates/matrix-sdk-crypto/src/store/memorystore.rs @@ -311,6 +311,16 @@ impl CryptoStore for MemoryStore { warn!("Method not implemented"); Ok(()) } + + async fn insert_custom_value_if_missing(&self, _key: &str, _new: Vec) -> Result { + warn!("Method insert_custom_value_if_missing not implemented"); + Ok(false) + } + + async fn remove_custom_value(&self, _key: &str) -> Result { + warn!("Method remove_custom_value not implemented"); + Ok(false) + } } #[cfg(test)] diff --git a/crates/matrix-sdk-crypto/src/store/mod.rs b/crates/matrix-sdk-crypto/src/store/mod.rs index 0c451a671e8..5452953fd8f 100644 --- a/crates/matrix-sdk-crypto/src/store/mod.rs +++ b/crates/matrix-sdk-crypto/src/store/mod.rs @@ -80,6 +80,7 @@ use crate::{ pub mod caches; mod error; +pub mod locks; mod memorystore; mod traits; diff --git a/crates/matrix-sdk-crypto/src/store/traits.rs b/crates/matrix-sdk-crypto/src/store/traits.rs index 2e4e24b6bd8..5c7ad6c9cef 100644 --- a/crates/matrix-sdk-crypto/src/store/traits.rs +++ b/crates/matrix-sdk-crypto/src/store/traits.rs @@ -226,6 +226,23 @@ pub trait CryptoStore: AsyncTraitDeps { /// /// * `value` - The value to insert async fn set_custom_value(&self, key: &str, value: Vec) -> Result<(), Self::Error>; + + /// Insert a custom value only if it's missing from the database. + /// + /// In other words, doesn't do an upsert (insert or update). + /// + /// Guaranteed to be atomic. + async fn insert_custom_value_if_missing( + &self, + key: &str, + new: Vec, + ) -> Result; + + /// Removes a custom value from the store. + /// + /// Returns a boolean indicating whether the value was actually present in + /// the store. + async fn remove_custom_value(&self, key: &str) -> Result; } #[repr(transparent)] @@ -372,6 +389,18 @@ impl CryptoStore for EraseCryptoStoreError { async fn set_custom_value(&self, key: &str, value: Vec) -> Result<(), Self::Error> { self.0.set_custom_value(key, value).await.map_err(Into::into) } + + async fn insert_custom_value_if_missing( + &self, + key: &str, + new: Vec, + ) -> Result { + self.0.insert_custom_value_if_missing(key, new).await.map_err(Into::into) + } + + async fn remove_custom_value(&self, key: &str) -> Result { + self.0.remove_custom_value(key).await.map_err(Into::into) + } } /// A type-erased [`CryptoStore`]. diff --git a/crates/matrix-sdk-indexeddb/src/crypto_store.rs b/crates/matrix-sdk-indexeddb/src/crypto_store.rs index 98f4813f5ad..96c1e14e6d0 100644 --- a/crates/matrix-sdk-indexeddb/src/crypto_store.rs +++ b/crates/matrix-sdk-indexeddb/src/crypto_store.rs @@ -1056,6 +1056,40 @@ impl_crypto_store! { .put_key_val(&JsValue::from_str(key), &self.serialize_value(&value)?)?; Ok(()) } + + async fn insert_custom_value_if_missing( + &self, + key: &str, + value: Vec, + ) -> Result { + let key = JsValue::from_str(key); + let txn = self + .inner + .transaction_on_one_with_mode(keys::CORE, IdbTransactionMode::Readwrite)?; + let object_store = txn + .object_store(keys::CORE)?; + if object_store.get(&key)?.await?.is_none() { + object_store.put_key_val(&key, &self.serialize_value(&value)?)?; + Ok(true) + } else { + Ok(false) + } + } + + async fn remove_custom_value(&self, key: &str) -> Result { + let key = JsValue::from_str(key); + let txn = self + .inner + .transaction_on_one_with_mode(keys::CORE, IdbTransactionMode::Readwrite)?; + let object_store = txn + .object_store(keys::CORE)?; + if object_store.get(&key)?.await?.is_some() { + object_store.delete(&key)?; + Ok(true) + } else { + Ok(false) + } + } } impl Drop for IndexeddbCryptoStore { diff --git a/crates/matrix-sdk-sqlite/src/crypto_store.rs b/crates/matrix-sdk-sqlite/src/crypto_store.rs index 2e64245d2c0..214b68d95fb 100644 --- a/crates/matrix-sdk-sqlite/src/crypto_store.rs +++ b/crates/matrix-sdk-sqlite/src/crypto_store.rs @@ -1108,6 +1108,43 @@ impl CryptoStore for SqliteCryptoStore { self.acquire().await?.set_kv(key, serialized).await?; Ok(()) } + + async fn insert_custom_value_if_missing(&self, key: &str, value: Vec) -> Result { + let key = key.to_owned(); + let serialized = if let Some(cipher) = &self.store_cipher { + let encrypted = cipher.encrypt_value_data(value)?; + rmp_serde::to_vec_named(&encrypted)? + } else { + value + }; + + let num_touched = self + .acquire() + .await? + .with_transaction(move |txn| { + txn.execute( + "INSERT INTO kv VALUES(?1, ?2) ON CONFLICT (key) DO NOTHING", + (key, serialized), + ) + }) + .await?; + + assert!(num_touched <= 1); + + Ok(num_touched != 0) + } + + async fn remove_custom_value(&self, key: &str) -> Result { + let key = key.to_owned(); + + let num_touched = self + .acquire() + .await? + .with_transaction(move |txn| txn.execute("DELETE FROM kv WHERE key = ?", (key,))) + .await?; + + Ok(num_touched == 1) + } } #[cfg(test)] diff --git a/labs/sqlite-lock/Cargo.toml b/labs/sqlite-lock/Cargo.toml new file mode 100644 index 00000000000..4da7bb60707 --- /dev/null +++ b/labs/sqlite-lock/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "sqlite-lock" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +matrix-sdk-sqlite = { path = "../../crates/matrix-sdk-sqlite", features = ["crypto-store"] } +matrix-sdk-crypto = { path = "../../crates/matrix-sdk-crypto" } +nix = "0.26.2" +anyhow.workspace = true +speedy = "0.8.6" +futures = "0.3.28" +tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } diff --git a/labs/sqlite-lock/src/main.rs b/labs/sqlite-lock/src/main.rs new file mode 100644 index 00000000000..1143e387160 --- /dev/null +++ b/labs/sqlite-lock/src/main.rs @@ -0,0 +1,241 @@ +use std::{env::temp_dir, path::Path, sync::Arc, time::Duration}; + +use anyhow::Result; +use matrix_sdk_crypto::store::{locks::CryptoStoreLock, DynCryptoStore, IntoCryptoStore as _}; +use matrix_sdk_sqlite::SqliteCryptoStore; +use nix::{ + libc::rand, + sys::wait::wait, + unistd::{close, fork, pipe, read, write}, +}; +use speedy::{Readable, Writable}; +use tokio::{runtime::Runtime, time::sleep}; + +#[derive(Clone, Debug, Writable, Readable)] +enum Command { + WriteValue(String), + ReadValue(String), + Quit, +} + +impl Command { + async fn assert(&self, store: &Arc) -> Result<()> { + match self { + Command::WriteValue(written) => { + // The child must have written the value. + let read = store.get_custom_value(KEY).await?; + assert_eq!(read, Some(written.as_bytes().to_vec())); + } + Command::ReadValue(_read) => { + // The child removes the value after validating it. + let read = store.get_custom_value(KEY).await?; + assert_eq!(read, None); + } + Command::Quit => {} + } + Ok(()) + } +} + +fn write_command(fd: i32, command: Command) -> Result<()> { + eprintln!("parent send: {command:?}"); + let serialized = command.write_to_vec()?; + let len = write(fd, &serialized)?; + assert_eq!(len, serialized.len()); + Ok(()) +} + +fn read_command(fd: i32) -> Result { + let mut buf = vec![0; 1024]; // 1024 bytes enough for anyone + read(fd, &mut buf)?; + let command = Command::read_from_buffer(&buf)?; + eprintln!("child received: {command:?}"); + Ok(command) +} + +const LOCK_KEY: &str = "lock_key"; +const KEY: &str = "custom_key"; +const GENERATION_KEY: &str = "generation"; + +struct CloseChildGuard { + write_pipe: i32, +} + +impl Drop for CloseChildGuard { + fn drop(&mut self) { + write_command(self.write_pipe, Command::Quit).unwrap(); + + let status = wait().unwrap(); + eprintln!("Child status: {status:?}"); + + let _ = close(self.write_pipe); + } +} + +async fn parent_main(path: &Path, write_pipe: i32) -> Result<()> { + let store = SqliteCryptoStore::open(path, None).await?.into_crypto_store(); + let lock_key = LOCK_KEY.to_string(); + + let _close_child_guard = CloseChildGuard { write_pipe }; + + let mut generation = 0; + + // Set initial generation to 0. + store.set_custom_value(GENERATION_KEY, vec![generation]).await?; + + let mut lock = CryptoStoreLock::new(store.clone(), lock_key.clone(), "parent".to_owned(), None); + + loop { + // Write a command. + let val = unsafe { rand() } % 2; + + let id = unsafe { rand() }; + let str = format!("hello {id}"); + + let cmd = match val { + 0 => { + // the child will write, so we remove the value + let cmd = Command::WriteValue(str); + + store.remove_custom_value(KEY).await?; + + write_command(write_pipe, cmd.clone())?; + cmd + } + + 1 => { + // the child will read, so we write the value + store.set_custom_value(KEY, str.as_bytes().to_vec()).await?; + + let cmd = Command::ReadValue(str); + write_command(write_pipe, cmd.clone())?; + cmd + } + + _ => unreachable!(), + }; + + loop { + // Compete with the child to take the lock! + lock.lock().await?; + + let read_generation = + store.get_custom_value(GENERATION_KEY).await?.expect("there's always a generation") + [0]; + + // Check that if we've seen the latest result, based on the generation number + // (any write to the generation indicates somebody else wrote to the + // database). + if generation != read_generation { + // Run assertions here. + cmd.assert(&store).await?; + + generation = read_generation; + + lock.unlock().await?; + break; + } + + println!("parent: waiting..."); + lock.unlock().await?; + + sleep(Duration::from_millis(1)).await; + } + } + + #[allow(unreachable_code)] + Ok(()) +} + +async fn child_main(path: &Path, read_pipe: i32) -> Result<()> { + let store = SqliteCryptoStore::open(path, None).await?.into_crypto_store(); + let lock_key = LOCK_KEY.to_string(); + + let mut lock = CryptoStoreLock::new(store.clone(), lock_key.clone(), "child".to_owned(), None); + + loop { + eprintln!("child waits for command"); + match read_command(read_pipe)? { + Command::WriteValue(val) => { + eprintln!("child received command: write {val}; waiting for lock"); + + lock.lock().await?; + + eprintln!("child got the lock"); + + store.set_custom_value(KEY, val.as_bytes().to_vec()).await?; + + let generation = store.get_custom_value(GENERATION_KEY).await?.unwrap()[0]; + let generation = generation.wrapping_add(1); + store.set_custom_value(GENERATION_KEY, vec![generation]).await?; + + lock.unlock().await?; + } + + Command::ReadValue(expected) => { + eprintln!("child received command: read {expected}; waiting for lock"); + + lock.lock().await?; + + eprintln!("child got the lock"); + + let val = store.get_custom_value(KEY).await?.expect("missing value in child"); + assert_eq!(val, expected.as_bytes()); + + store.remove_custom_value(KEY).await?; + + let generation = store.get_custom_value(GENERATION_KEY).await?.unwrap()[0]; + store.set_custom_value(GENERATION_KEY, vec![generation + 1]).await?; + + lock.unlock().await?; + } + + Command::Quit => { + break; + } + } + } + + close(read_pipe)?; + + Ok(()) +} + +fn main() -> Result<()> { + let path = temp_dir().join("db.sqlite"); + + if path.exists() { + std::fs::remove_dir_all(&path)?; + } + + // Force running migrations first. + { + let rt = Runtime::new()?; + let path = path.clone(); + rt.block_on(async { + let _store = SqliteCryptoStore::open(path, None).await?; + anyhow::Ok(()) + })?; + } + + let (read_pipe, write_pipe) = pipe()?; + + let pid = unsafe { fork() }?; + match pid { + nix::unistd::ForkResult::Parent { .. } => { + // Parent doesn't read. + close(read_pipe)?; + + let rt = Runtime::new()?; + rt.block_on(parent_main(&path, write_pipe)) + } + + nix::unistd::ForkResult::Child => { + // Child doesn't write. + close(write_pipe)?; + + let rt = Runtime::new()?; + rt.block_on(child_main(&path, read_pipe)) + } + } +}