Skip to content

feat: add a new NotificationClient API 📬 #2235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 7 additions & 45 deletions bindings/matrix-sdk-ffi/src/encryption_sync.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use futures_util::{pin_mut, StreamExt as _};
use matrix_sdk_ui::encryption_sync::{EncryptionSync as MatrixEncryptionSync, EncryptionSyncMode};
use matrix_sdk_ui::encryption_sync::EncryptionSync as MatrixEncryptionSync;
use tracing::{error, warn};

use crate::{client::Client, error::ClientError, task_handle::TaskHandle, RUNTIME};
Expand Down Expand Up @@ -75,23 +75,6 @@ impl EncryptionSync {
}
}

impl Client {
fn encryption_sync(
&self,
id: String,
listener: Box<dyn EncryptionSyncListener>,
mode: EncryptionSyncMode,
) -> Result<Arc<EncryptionSync>, ClientError> {
RUNTIME.block_on(async move {
let inner = Arc::new(MatrixEncryptionSync::new(id, self.inner.clone(), mode).await?);

let handle = EncryptionSync::start(inner.clone(), listener);

Ok(Arc::new(EncryptionSync { _handle: handle, sync: inner }))
})
}
}

#[uniffi::export]
impl Client {
/// Must be called to get the encryption loop running.
Expand All @@ -110,33 +93,12 @@ impl Client {
id: String,
listener: Box<dyn EncryptionSyncListener>,
) -> Result<Arc<EncryptionSync>, ClientError> {
self.encryption_sync(id, listener, EncryptionSyncMode::Loop)
}
RUNTIME.block_on(async move {
let inner = Arc::new(MatrixEncryptionSync::new(id, self.inner.clone(), None).await?);

/// Encryption loop for a notification process.
///
/// A fixed number of iterations can be given, to limit the time spent in
/// that loop.
///
/// This should be avoided, whenever possible, and be used only in
/// situations where the encryption sync loop needs to run from multiple
/// processes at the same time (on iOS for instance, where notifications
/// are handled in a separate process). If you aren't in such a
/// situation, prefer using `Client::room_list(true)`.
pub fn notification_encryption_sync(
&self,
id: String,
listener: Box<dyn EncryptionSyncListener>,
num_iters: u8,
) -> Result<Arc<EncryptionSync>, ClientError> {
self.encryption_sync(
id,
listener,
EncryptionSyncMode::LimitedMode {
num_iterations: num_iters,
proxy_timeout: std::time::Duration::ZERO,
network_timeout: std::time::Duration::ZERO,
},
)
let handle = EncryptionSync::start(inner.clone(), listener);

Ok(Arc::new(EncryptionSync { _handle: handle, sync: inner }))
})
}
}
149 changes: 76 additions & 73 deletions crates/matrix-sdk-ui/src/encryption_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,12 @@ use matrix_sdk_crypto::store::locks::CryptoStoreLock;
use ruma::{api::client::sync::sync_events::v4, assign};
use tracing::{error, trace};

#[derive(Clone, Copy)]
pub enum EncryptionSyncMode {
/// Run the loop for a fixed amount of iterations, for very short durations.
///
/// This should be used only for a short-lived process (like iOS's
/// [NSE](https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension)),
/// and should be only used internally. For that reason, the durations can
/// be customized.
LimitedMode { num_iterations: u8, proxy_timeout: Duration, network_timeout: Duration },

/// Never stop running the loop, except if asked to stop.
Loop,
}

/// High-level helper for synchronizing encryption events using sliding sync.
///
/// See the module's documentation for more details.
pub struct EncryptionSync {
client: Client,
sliding_sync: SlidingSync,
mode: EncryptionSyncMode,
}

impl EncryptionSync {
Expand All @@ -69,7 +54,7 @@ impl EncryptionSync {
pub async fn new(
process_id: String,
client: Client,
mode: EncryptionSyncMode,
proxy_and_network_timeouts: Option<(Duration, Duration)>,
) -> Result<Self, Error> {
// Make sure to use the same `conn_id` and caching store identifier, whichever
// process is running this sliding sync. There must be at most one
Expand All @@ -82,7 +67,7 @@ impl EncryptionSync {
)
.with_e2ee_extension(assign!(v4::E2EEConfig::default(), { enabled: Some(true)}));

if let EncryptionSyncMode::LimitedMode { proxy_timeout, network_timeout, .. } = mode {
if let Some((proxy_timeout, network_timeout)) = proxy_and_network_timeouts {
builder = builder.with_timeouts(proxy_timeout, network_timeout);
}

Expand All @@ -101,7 +86,74 @@ impl EncryptionSync {
}
};

Ok(Self { client, sliding_sync, mode })
Ok(Self { client, sliding_sync })
}

pub async fn run_fixed_iterations(self, num_iterations: u8) -> Result<(), Error> {
let sync = self.sliding_sync.sync();

pin_mut!(sync);

let mut lock_guard =
self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?;

// Try to take the lock at the beginning; if it's busy, that means that another
// process already holds onto it, and as such we won't try to run the
// encryption sync loop at all (because we expect the other process to
// do so).

if lock_guard.is_none() {
// If we can't acquire the cross-process lock on the first attempt,
// that means the main process is running, or its lease hasn't expired
// yet. In case it's the latter, wait a bit and retry.
tracing::debug!(
"Lock was already taken, and we're not the main loop; retrying in {}ms...",
CryptoStoreLock::LEASE_DURATION_MS
);

tokio::time::sleep(Duration::from_millis(CryptoStoreLock::LEASE_DURATION_MS.into()))
.await;

lock_guard =
self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?;

if lock_guard.is_none() {
tracing::debug!(
"Second attempt at locking outside the main app failed, so aborting."
);
return Ok(());
}
}

for _ in 0..num_iterations {
match sync.next().await {
Some(Ok(update_summary)) => {
// This API is only concerned with the e2ee and to-device extensions.
// Warn if anything weird has been received from the proxy.
if !update_summary.lists.is_empty() {
error!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
}
if !update_summary.rooms.is_empty() {
error!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API");
}

// Cool cool, let's do it again.
trace!("Encryption sync received an update!");
}

Some(Err(err)) => {
trace!("Encryption sync stopped because of an error: {err:#}");
return Err(Error::SlidingSync(err));
}

None => {
trace!("Encryption sync properly terminated.");
break;
}
}
}

Ok(())
}

/// Start synchronization.
Expand All @@ -113,62 +165,13 @@ impl EncryptionSync {

pin_mut!(sync);

let mut mode = self.mode;

loop {
let guard = match &mut mode {
EncryptionSyncMode::LimitedMode { num_iterations: ref mut val, .. } => {
if *val == 0 {
// The previous attempt was the last one, stop now.
break;
}
// Soon.
*val -= 1;

let mut guard = self
.client
.encryption()
.try_lock_store_once()
.await
.map_err(Error::LockError)?;

if guard.is_none() {
// If we can't acquire the cross-process lock on the first attempt,
// that means the main process is running, or its lease hasn't expired
// yet. In case it's the latter, wait a bit and retry.
tracing::debug!(
"Lock was already taken, and we're not the main loop; retrying in {}ms...",
CryptoStoreLock::LEASE_DURATION_MS
);

tokio::time::sleep(Duration::from_millis(
CryptoStoreLock::LEASE_DURATION_MS.into(),
))
.await;

guard = self
.client
.encryption()
.try_lock_store_once()
.await
.map_err(Error::LockError)?;

if guard.is_none() {
tracing::debug!("Second attempt at locking outside the main app failed, so aborting.");
return;
}
}

guard
}

EncryptionSyncMode::Loop => self
.client
.encryption()
.spin_lock_store(Some(60000))
.await
.map_err(Error::LockError)?,
};
let guard = self
.client
.encryption()
.spin_lock_store(Some(60000))
.await
.map_err(Error::LockError)?;

match sync.next().await {
Some(Ok(update_summary)) => {
Expand Down
Loading