12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- use std:: sync:: Arc ;
15
+ use std:: { iter , pin :: pin , sync:: Arc } ;
16
16
17
17
use async_std:: sync:: Mutex ;
18
+ use futures_util:: StreamExt ;
18
19
use imbl:: Vector ;
19
20
use matrix_sdk:: {
20
- deserialized_responses:: SyncTimelineEvent , executor:: spawn, room, sync:: RoomUpdate ,
21
+ deserialized_responses:: SyncTimelineEvent , executor:: spawn, room, sync:: RoomUpdate , Client ,
22
+ } ;
23
+ use ruma:: {
24
+ events:: receipt:: { ReceiptThread , ReceiptType } ,
25
+ OwnedRoomId ,
21
26
} ;
22
- use ruma:: events:: receipt:: { ReceiptThread , ReceiptType } ;
23
27
use tokio:: sync:: { broadcast, mpsc} ;
24
28
use tracing:: { error, info, warn} ;
25
29
26
- #[ cfg( feature = "e2e-encryption" ) ]
27
- use super :: to_device:: { handle_forwarded_room_key_event, handle_room_key_event} ;
28
30
use super :: { inner:: TimelineInner , queue:: send_queued_messages, Timeline , TimelineDropHandle } ;
29
31
30
32
/// Builder that allows creating and configuring various parts of a
@@ -189,24 +191,30 @@ impl TimelineBuilder {
189
191
}
190
192
} ) ;
191
193
192
- // Not using room.add_event_handler here because RoomKey events are
193
- // to-device events that are not received in the context of a room.
194
-
195
- #[ cfg( feature = "e2e-encryption" ) ]
196
- let room_key_handle = client
197
- . add_event_handler ( handle_room_key_event ( inner. clone ( ) , room. room_id ( ) . to_owned ( ) ) ) ;
198
194
#[ cfg( feature = "e2e-encryption" ) ]
199
- let forwarded_room_key_handle = client. add_event_handler ( handle_forwarded_room_key_event (
200
- inner. clone ( ) ,
201
- room. room_id ( ) . to_owned ( ) ,
202
- ) ) ;
203
-
204
- let handles = vec ! [
205
- #[ cfg( feature = "e2e-encryption" ) ]
206
- room_key_handle,
207
- #[ cfg( feature = "e2e-encryption" ) ]
208
- forwarded_room_key_handle,
209
- ] ;
195
+ let room_key_update_join_handle = {
196
+ let inner = inner. clone ( ) ;
197
+ match client. encryption ( ) . room_keys_for_room_received_stream ( room. room_id ( ) ) . await {
198
+ Some ( stream) => spawn ( async move {
199
+ let mut stream = pin ! ( stream) ;
200
+ while let Some ( vec) = stream. next ( ) . await {
201
+ for room_key_info in vec {
202
+ retry_decryption (
203
+ client. clone ( ) ,
204
+ inner. clone ( ) ,
205
+ room_key_info. room_id ,
206
+ room_key_info. session_id ,
207
+ )
208
+ . await ;
209
+ }
210
+ }
211
+ } ) ,
212
+ None => {
213
+ error ! ( "Can't listen for room key updates, OlmMachine not set up" ) ;
214
+ spawn ( async move { } )
215
+ }
216
+ }
217
+ } ;
210
218
211
219
let ( msg_sender, msg_receiver) = mpsc:: channel ( 1 ) ;
212
220
if !read_only {
@@ -221,9 +229,9 @@ impl TimelineBuilder {
221
229
_end_token : Mutex :: new ( None ) ,
222
230
msg_sender,
223
231
drop_handle : Arc :: new ( TimelineDropHandle {
224
- client,
225
- event_handler_handles : handles,
226
232
room_update_join_handle,
233
+ #[ cfg( feature = "e2e-encryption" ) ]
234
+ room_key_update_join_handle,
227
235
} ) ,
228
236
} ;
229
237
@@ -242,3 +250,18 @@ impl TimelineBuilder {
242
250
timeline
243
251
}
244
252
}
253
+
254
+ #[ tracing:: instrument( skip( client, inner) ) ]
255
+ async fn retry_decryption (
256
+ client : Client ,
257
+ inner : Arc < TimelineInner > ,
258
+ room_id : OwnedRoomId ,
259
+ session_id : String ,
260
+ ) {
261
+ let Some ( room) = client. get_room ( & room_id) else {
262
+ error ! ( "Failed to fetch room object" ) ;
263
+ return ;
264
+ } ;
265
+
266
+ inner. retry_event_decryption ( & room, Some ( iter:: once ( session_id. as_str ( ) ) . collect ( ) ) ) . await ;
267
+ }
0 commit comments