@@ -19,8 +19,8 @@ use assert_matches2::assert_let;
19
19
use eyeball_im:: VectorDiff ;
20
20
use futures_util:: StreamExt ;
21
21
use matrix_sdk:: {
22
- assert_next_matches_with_timeout , config:: SyncSettings , executor:: spawn,
23
- ruma :: MilliSecondsSinceUnixEpoch , test_utils:: logged_in_client_with_server,
22
+ config:: SyncSettings , executor:: spawn, ruma :: MilliSecondsSinceUnixEpoch ,
23
+ test_utils:: logged_in_client_with_server,
24
24
} ;
25
25
use matrix_sdk_test:: {
26
26
async_test, event_factory:: EventFactory , mocks:: mock_encryption_state, JoinedRoomBuilder ,
@@ -33,7 +33,7 @@ use ruma::{
33
33
room_id, uint, user_id,
34
34
} ;
35
35
use serde_json:: json;
36
- use stream_assert:: assert_next_matches;
36
+ use stream_assert:: { assert_next_matches, assert_pending } ;
37
37
use tokio:: task:: yield_now;
38
38
use wiremock:: {
39
39
matchers:: { header, method, path_regex} ,
@@ -65,7 +65,7 @@ async fn test_echo() {
65
65
. await
66
66
. unwrap ( ) ,
67
67
) ;
68
- let ( _, mut timeline_stream) = timeline. subscribe ( ) . await ;
68
+ let ( _, mut timeline_stream) = timeline. subscribe_batched ( ) . await ;
69
69
70
70
let event_id = event_id ! ( "$ev" ) ;
71
71
@@ -83,7 +83,10 @@ async fn test_echo() {
83
83
timeline. send ( RoomMessageEventContent :: text_plain ( "Hello, World!" ) . into ( ) ) . await
84
84
} ) ;
85
85
86
- assert_let ! ( Some ( VectorDiff :: PushBack { value: local_echo } ) = timeline_stream. next( ) . await ) ;
86
+ assert_let ! ( Some ( timeline_updates) = timeline_stream. next( ) . await ) ;
87
+ assert_eq ! ( timeline_updates. len( ) , 2 ) ;
88
+
89
+ assert_let ! ( VectorDiff :: PushBack { value: local_echo } = & timeline_updates[ 0 ] ) ;
87
90
let item = local_echo. as_event ( ) . unwrap ( ) ;
88
91
assert_matches ! ( item. send_state( ) , Some ( EventSendState :: NotSentYet ) ) ;
89
92
assert_let ! ( TimelineItemContent :: Message ( msg) = item. content( ) ) ;
@@ -92,15 +95,16 @@ async fn test_echo() {
92
95
assert ! ( item. event_id( ) . is_none( ) ) ;
93
96
let txn_id = item. transaction_id ( ) . unwrap ( ) ;
94
97
95
- assert_let ! ( Some ( VectorDiff :: PushFront { value: date_divider } ) = timeline_stream . next ( ) . await ) ;
98
+ assert_let ! ( VectorDiff :: PushFront { value: date_divider } = & timeline_updates [ 1 ] ) ;
96
99
assert ! ( date_divider. is_date_divider( ) ) ;
97
100
98
101
// Wait for the sending to finish and assert everything was successful
99
102
send_hdl. await . unwrap ( ) . unwrap ( ) ;
100
103
101
- assert_let ! (
102
- Some ( VectorDiff :: Set { index: 1 , value: sent_confirmation } ) = timeline_stream. next( ) . await
103
- ) ;
104
+ assert_let ! ( Some ( timeline_updates) = timeline_stream. next( ) . await ) ;
105
+ assert_eq ! ( timeline_updates. len( ) , 1 ) ;
106
+
107
+ assert_let ! ( VectorDiff :: Set { index: 1 , value: sent_confirmation } = & timeline_updates[ 0 ] ) ;
104
108
let item = sent_confirmation. as_event ( ) . unwrap ( ) ;
105
109
assert_matches ! ( item. send_state( ) , Some ( EventSendState :: Sent { .. } ) ) ;
106
110
assert_eq ! ( item. event_id( ) , Some ( event_id) ) ;
@@ -120,19 +124,24 @@ async fn test_echo() {
120
124
let _response = client. sync_once ( sync_settings. clone ( ) ) . await . unwrap ( ) ;
121
125
server. reset ( ) . await ;
122
126
127
+ assert_let ! ( Some ( timeline_updates) = timeline_stream. next( ) . await ) ;
128
+ assert_eq ! ( timeline_updates. len( ) , 4 ) ;
129
+
123
130
// Local echo is replaced with the remote echo.
124
- assert_next_matches ! ( timeline_stream , VectorDiff :: Remove { index: 1 } ) ;
125
- let remote_echo =
126
- assert_next_matches ! ( timeline_stream , VectorDiff :: PushFront { value } => value ) ;
131
+ assert_let ! ( VectorDiff :: Remove { index: 1 } = & timeline_updates [ 0 ] ) ;
132
+
133
+ assert_let ! ( VectorDiff :: PushFront { value: remote_echo } = & timeline_updates [ 1 ] ) ;
127
134
let item = remote_echo. as_event ( ) . unwrap ( ) ;
128
135
assert ! ( item. is_own( ) ) ;
129
136
assert_eq ! ( item. timestamp( ) , MilliSecondsSinceUnixEpoch ( uint!( 152038280 ) ) ) ;
130
137
131
138
// The date divider is also replaced.
132
- let date_divider =
133
- assert_next_matches ! ( timeline_stream, VectorDiff :: PushFront { value } => value) ;
139
+ assert_let ! ( VectorDiff :: PushFront { value: date_divider } = & timeline_updates[ 2 ] ) ;
134
140
assert ! ( date_divider. is_date_divider( ) ) ;
135
- assert_next_matches ! ( timeline_stream, VectorDiff :: Remove { index: 2 } ) ;
141
+
142
+ assert_let ! ( VectorDiff :: Remove { index: 2 } = & timeline_updates[ 3 ] ) ;
143
+
144
+ assert_pending ! ( timeline_stream) ;
136
145
}
137
146
138
147
#[ async_test]
@@ -229,7 +238,7 @@ async fn test_dedup_by_event_id_late() {
229
238
230
239
let room = client. get_room ( room_id) . unwrap ( ) ;
231
240
let timeline = Arc :: new ( room. timeline ( ) . await . unwrap ( ) ) ;
232
- let ( _, mut timeline_stream) = timeline. subscribe ( ) . await ;
241
+ let ( _, mut timeline_stream) = timeline. subscribe_batched ( ) . await ;
233
242
234
243
let event_id = event_id ! ( "$wWgymRfo7ri1uQx0NXO40vLJ" ) ;
235
244
@@ -251,14 +260,16 @@ async fn test_dedup_by_event_id_late() {
251
260
252
261
timeline. send ( RoomMessageEventContent :: text_plain ( "Hello, World!" ) . into ( ) ) . await . unwrap ( ) ;
253
262
263
+ assert_let ! ( Some ( timeline_updates) = timeline_stream. next( ) . await ) ;
264
+ assert_eq ! ( timeline_updates. len( ) , 2 ) ;
265
+
254
266
// Timeline: [local echo]
255
- let local_echo =
256
- assert_next_matches_with_timeout ! ( timeline_stream, VectorDiff :: PushBack { value } => value) ;
267
+ assert_let ! ( VectorDiff :: PushBack { value: local_echo } = & timeline_updates[ 0 ] ) ;
257
268
let item = local_echo. as_event ( ) . unwrap ( ) ;
258
269
assert_matches ! ( item. send_state( ) , Some ( EventSendState :: NotSentYet ) ) ;
259
270
260
271
// Timeline: [date-divider, local echo]
261
- let date_divider = assert_next_matches_with_timeout ! ( timeline_stream , VectorDiff :: PushFront { value } => value ) ;
272
+ assert_let ! ( VectorDiff :: PushFront { value: date_divider } = & timeline_updates [ 1 ] ) ;
262
273
assert ! ( date_divider. is_date_divider( ) ) ;
263
274
264
275
let f = EventFactory :: new ( ) ;
@@ -275,21 +286,29 @@ async fn test_dedup_by_event_id_late() {
275
286
mock_sync ( & server, sync_builder. build_json_sync_response ( ) , None ) . await ;
276
287
let _response = client. sync_once ( sync_settings. clone ( ) ) . await . unwrap ( ) ;
277
288
289
+ assert_let ! ( Some ( timeline_updates) = timeline_stream. next( ) . await ) ;
290
+ assert_eq ! ( timeline_updates. len( ) , 2 ) ;
291
+
278
292
// Timeline: [remote-echo, date-divider, local echo]
279
- let remote_echo =
280
- assert_next_matches ! ( timeline_stream, VectorDiff :: PushFront { value } => value) ;
293
+ assert_let ! ( VectorDiff :: PushFront { value: remote_echo } = & timeline_updates[ 0 ] ) ;
281
294
let item = remote_echo. as_event ( ) . unwrap ( ) ;
282
295
assert_eq ! ( item. event_id( ) , Some ( event_id) ) ;
283
296
284
297
// Timeline: [date-divider, remote-echo, date-divider, local echo]
285
- let date_divider = assert_next_matches_with_timeout ! ( timeline_stream , VectorDiff :: PushFront { value } => value ) ;
298
+ assert_let ! ( VectorDiff :: PushFront { value: date_divider } = & timeline_updates [ 1 ] ) ;
286
299
assert ! ( date_divider. is_date_divider( ) ) ;
287
300
301
+ assert_let ! ( Some ( timeline_updates) = timeline_stream. next( ) . await ) ;
302
+ assert_eq ! ( timeline_updates. len( ) , 2 ) ;
303
+
288
304
// Local echo and its date divider are removed.
289
305
// Timeline: [date-divider, remote-echo, date-divider]
290
- assert_matches ! ( timeline_stream. next( ) . await , Some ( VectorDiff :: Remove { index: 3 } ) ) ;
306
+ assert_let ! ( VectorDiff :: Remove { index: 3 } = & timeline_updates[ 0 ] ) ;
307
+
291
308
// Timeline: [date-divider, remote-echo]
292
- assert_matches ! ( timeline_stream. next( ) . await , Some ( VectorDiff :: Remove { index: 2 } ) ) ;
309
+ assert_let ! ( VectorDiff :: Remove { index: 2 } = & timeline_updates[ 1 ] ) ;
310
+
311
+ assert_pending ! ( timeline_stream) ;
293
312
}
294
313
295
314
#[ async_test]
0 commit comments