@@ -19,7 +19,6 @@ use std::time::Duration;
19
19
use anyhow:: Result ;
20
20
use itertools:: Itertools ;
21
21
use risingwave_simulation:: cluster:: { Cluster , Configuration , Session } ;
22
- use risingwave_simulation:: utils:: AssertResult ;
23
22
use tokio:: time:: sleep;
24
23
25
24
const DATABASE_RECOVERY_START : & str = "DATABASE_RECOVERY_START" ;
@@ -182,19 +181,18 @@ async fn test_isolation_simple_two_databases_join() -> Result<()> {
182
181
. run ( "insert into t2 select * from generate_series(1, 50);" )
183
182
. await ?;
184
183
185
- session
186
- . run ( "select count(*) from group1.public.t1;" )
187
- . await ?
188
- . assert_result_eq ( "100" ) ;
184
+ wait_until (
185
+ & mut session,
186
+ "select count(*) from group1.public.t1;" ,
187
+ "100" ,
188
+ )
189
+ . await ?;
189
190
190
191
session
191
192
. run ( "create materialized view mv_join as select t2.v as v from group1.public.t1 join t2 on t1.v = t2.v;" )
192
193
. await ?;
193
194
194
- session
195
- . run ( "select count(*) from mv_join;" )
196
- . await ?
197
- . assert_result_eq ( "50" ) ;
195
+ wait_until ( & mut session, "select count(*) from mv_join;" , "50" ) . await ?;
198
196
199
197
cluster. simple_kill_nodes ( [ "compute-1" ] ) . await ;
200
198
@@ -208,15 +206,8 @@ async fn test_isolation_simple_two_databases_join() -> Result<()> {
208
206
. run ( "insert into t2 select * from generate_series(51, 120)" )
209
207
. await ?;
210
208
211
- session
212
- . run ( "select max(v) from t2" )
213
- . await ?
214
- . assert_result_eq ( "120" ) ;
215
-
216
- session
217
- . run ( "select count(*) from mv_join;" )
218
- . await ?
219
- . assert_result_eq ( "100" ) ;
209
+ wait_until ( & mut session, "select max(v) from t2" , "120" ) . await ?;
210
+ wait_until ( & mut session, "select count(*) from mv_join;" , "100" ) . await ?;
220
211
221
212
cluster. simple_restart_nodes ( [ "compute-1" ] ) . await ;
222
213
@@ -232,10 +223,7 @@ async fn test_isolation_simple_two_databases_join() -> Result<()> {
232
223
// flush is only oriented to the current database, so flush is required here
233
224
session. run ( "flush" ) . await ?;
234
225
235
- session
236
- . run ( "select count(*) from mv_join;" )
237
- . await ?
238
- . assert_result_eq ( "110" ) ;
226
+ wait_until ( & mut session, "select count(*) from mv_join;" , "110" ) . await ?;
239
227
240
228
let mut database_recovery_events = database_recovery_events ( & mut session) . await ?;
241
229
@@ -297,10 +285,7 @@ async fn test_isolation_simple_two_databases_join_in_other() -> Result<()> {
297
285
. run ( "create materialized view mv_join as select t2.v as v from group1.public.t1 join group2.public.t2 on t1.v = t2.v;" )
298
286
. await ?;
299
287
300
- session
301
- . run ( "select count(*) from mv_join;" )
302
- . await ?
303
- . assert_result_eq ( "50" ) ;
288
+ wait_until ( & mut session, "select count(*) from mv_join;" , "50" ) . await ?;
304
289
305
290
cluster. simple_kill_nodes ( [ "compute-1" , "compute-3" ] ) . await ;
306
291
@@ -335,10 +320,7 @@ async fn test_isolation_simple_two_databases_join_in_other() -> Result<()> {
335
320
336
321
session. run ( "use group3" ) . await ?;
337
322
338
- session
339
- . run ( "select count(*) from mv_join;" )
340
- . await ?
341
- . assert_result_eq ( "110" ) ;
323
+ wait_until ( & mut session, "select count(*) from mv_join;" , "110" ) . await ?;
342
324
343
325
let mut database_recovery_events = database_recovery_events ( & mut session) . await ?;
344
326
@@ -371,6 +353,20 @@ async fn test_isolation_simple_two_databases_join_in_other() -> Result<()> {
371
353
Ok ( ( ) )
372
354
}
373
355
356
+ async fn wait_until ( session : & mut Session , sql : & str , target : & str ) -> Result < ( ) > {
357
+ tokio:: time:: timeout ( Duration :: from_secs ( 100 ) , async {
358
+ loop {
359
+ if session. run ( sql) . await . unwrap ( ) == target {
360
+ return ;
361
+ }
362
+ sleep ( Duration :: from_secs ( 1 ) ) . await ;
363
+ }
364
+ } )
365
+ . await ?;
366
+
367
+ Ok ( ( ) )
368
+ }
369
+
374
370
async fn prepare_isolation_env ( ) -> Result < ( Cluster , Session ) > {
375
371
let mut config = Configuration :: for_auto_parallelism ( MAX_HEARTBEAT_INTERVAL_SEC , true ) ;
376
372
0 commit comments