@@ -42,7 +42,7 @@ use std::{
42
42
time:: Duration ,
43
43
} ;
44
44
45
- use tokio:: { sync:: Mutex , time:: sleep} ;
45
+ use tokio:: { sync:: Mutex , task :: JoinHandle , time:: sleep} ;
46
46
use tracing:: { instrument, trace} ;
47
47
48
48
use super :: DynCryptoStore ;
@@ -91,6 +91,9 @@ pub struct CryptoStoreLock {
91
91
/// reentrant.
92
92
locking_attempt : Arc < Mutex < ( ) > > ,
93
93
94
+ /// Current renew task spawned by `try_lock_once`.
95
+ renew_task : Arc < Mutex < Option < JoinHandle < ( ) > > > > ,
96
+
94
97
/// The key used in the key/value mapping for the lock entry.
95
98
lock_key : String ,
96
99
@@ -135,6 +138,7 @@ impl CryptoStoreLock {
135
138
backoff : Arc :: new ( Mutex :: new ( WaitingTime :: Some ( Self :: INITIAL_BACKOFF_MS ) ) ) ,
136
139
num_holders : Arc :: new ( 0 . into ( ) ) ,
137
140
locking_attempt : Arc :: new ( Mutex :: new ( ( ) ) ) ,
141
+ renew_task : Default :: default ( ) ,
138
142
}
139
143
}
140
144
@@ -170,52 +174,59 @@ impl CryptoStoreLock {
170
174
// Clone data to be owned by the task.
171
175
let this = self . clone ( ) ;
172
176
173
- matrix_sdk_common:: executor:: spawn ( async move {
174
- loop {
175
- {
176
- // First, check if there are still users of this lock.
177
- //
178
- // This is not racy, because:
179
- // - the `locking_attempt` mutex makes sure we don't have unexpected
180
- // interactions with the non-atomic sequence above in `try_lock_once`
181
- // (check > 0, then add 1).
182
- // - other entities holding onto the `num_holders` atomic will only
183
- // decrease it over time.
184
- let _guard = this. locking_attempt . lock ( ) . await ;
185
-
186
- // If there are no more users, we can quit.
187
- if this. num_holders . load ( atomic:: Ordering :: SeqCst ) == 0 {
188
- tracing:: info!( "exiting the lease extension loop" ) ;
189
-
190
- // Cancel the lease with another 0ms lease.
191
- // If we don't get the lock, that's (weird but) fine.
192
- let _ = this
193
- . store
194
- . try_take_leased_lock ( 0 , & this. lock_key , & this. lock_holder )
195
- . await ;
177
+ let mut renew_task = self . renew_task . lock ( ) . await ;
178
+
179
+ // Only spawn the task if it's missing or done.
180
+ let spawn = renew_task. as_ref ( ) . map_or ( true , |join_handle| join_handle. is_finished ( ) ) ;
181
+
182
+ if spawn {
183
+ * renew_task = Some ( matrix_sdk_common:: executor:: spawn ( async move {
184
+ loop {
185
+ {
186
+ // First, check if there are still users of this lock.
187
+ //
188
+ // This is not racy, because:
189
+ // - the `locking_attempt` mutex makes sure we don't have unexpected
190
+ // interactions with the non-atomic sequence above in `try_lock_once`
191
+ // (check > 0, then add 1).
192
+ // - other entities holding onto the `num_holders` atomic will only
193
+ // decrease it over time.
194
+ let _guard = this. locking_attempt . lock ( ) . await ;
195
+
196
+ // If there are no more users, we can quit.
197
+ if this. num_holders . load ( atomic:: Ordering :: SeqCst ) == 0 {
198
+ tracing:: info!( "exiting the lease extension loop" ) ;
199
+
200
+ // Cancel the lease with another 0ms lease.
201
+ // If we don't get the lock, that's (weird but) fine.
202
+ let _ = this
203
+ . store
204
+ . try_take_leased_lock ( 0 , & this. lock_key , & this. lock_holder )
205
+ . await ;
206
+
207
+ // Exit the loop.
208
+ break ;
209
+ }
210
+ }
196
211
212
+ sleep ( Duration :: from_millis ( Self :: EXTEND_LEASE_EVERY_MS ) ) . await ;
213
+
214
+ if let Err ( err) = this
215
+ . store
216
+ . try_take_leased_lock (
217
+ Self :: LEASE_DURATION_MS ,
218
+ & this. lock_key ,
219
+ & this. lock_holder ,
220
+ )
221
+ . await
222
+ {
223
+ tracing:: error!( "error when extending lock lease: {err:#}" ) ;
197
224
// Exit the loop.
198
225
break ;
199
226
}
200
227
}
201
-
202
- sleep ( Duration :: from_millis ( Self :: EXTEND_LEASE_EVERY_MS ) ) . await ;
203
-
204
- if let Err ( err) = this
205
- . store
206
- . try_take_leased_lock (
207
- Self :: LEASE_DURATION_MS ,
208
- & this. lock_key ,
209
- & this. lock_holder ,
210
- )
211
- . await
212
- {
213
- tracing:: error!( "error when extending lock lease: {err:#}" ) ;
214
- // Exit the loop.
215
- break ;
216
- }
217
- }
218
- } ) ;
228
+ } ) ) ;
229
+ }
219
230
220
231
let guard = CryptoStoreLockGuard { num_holders : self . num_holders . clone ( ) } ;
221
232
Ok ( Some ( guard) )
0 commit comments