16
16
import org .opensearch .common .settings .Settings ;
17
17
import org .opensearch .test .rest .OpenSearchRestTestCase ;
18
18
import org .junit .After ;
19
+ import org .junit .Assume ;
19
20
20
21
import java .io .IOException ;
21
22
import java .io .InterruptedIOException ;
32
33
import reactor .test .scheduler .VirtualTimeScheduler ;
33
34
34
35
import static org .hamcrest .CoreMatchers .equalTo ;
35
- import static org .junit .Assume .assumeThat ;
36
36
37
37
public class ReactorNetty4StreamingStressIT extends OpenSearchRestTestCase {
38
38
@ After
@@ -53,8 +53,6 @@ protected Settings restClientSettings() {
53
53
}
54
54
55
55
public void testCloseClientStreamingRequest () throws Exception {
56
- assumeThat ("The OpenSearch is not ready" , isServiceReady (), equalTo (true ));
57
-
58
56
final VirtualTimeScheduler scheduler = VirtualTimeScheduler .create (true );
59
57
final AtomicInteger id = new AtomicInteger (0 );
60
58
final Stream <String > stream = Stream .generate (
@@ -75,29 +73,31 @@ public void testCloseClientStreamingRequest() throws Exception {
75
73
final StreamingResponse <ByteBuffer > streamingResponse = client ().streamRequest (streamingRequest );
76
74
scheduler .advanceTimeBy (delay ); /* emit first element */
77
75
78
- StepVerifier .create (
76
+ StepVerifier verifier = StepVerifier .create (
79
77
Flux .from (streamingResponse .getBody ()).timeout (Duration .ofSeconds (10 )).map (b -> new String (b .array (), StandardCharsets .UTF_8 ))
80
- ).expectNextMatches (s -> s .contains ("\" result\" :\" created\" " ) && s .contains ("\" _id\" :\" 1\" " )).then (() -> {
81
- try {
82
- client ().close ();
83
- } catch (final IOException ex ) {
84
- throw new UncheckedIOException (ex );
85
- }
86
- })
78
+ )
79
+ .expectNextMatches (s -> s .contains ("\" result\" :\" created\" " ) && s .contains ("\" _id\" :\" 1\" " ))
80
+ .then (() -> scheduler .advanceTimeBy (delay ))
81
+ .expectNextMatches (s -> s .contains ("\" result\" :\" created\" " ) && s .contains ("\" _id\" :\" 2\" " ))
82
+ .then (() -> scheduler .advanceTimeBy (delay ))
83
+ .expectNextMatches (s -> s .contains ("\" result\" :\" created\" " ) && s .contains ("\" _id\" :\" 3\" " ))
84
+ .then (() -> {
85
+ try {
86
+ client ().close ();
87
+ } catch (final IOException ex ) {
88
+ throw new UncheckedIOException (ex );
89
+ }
90
+ })
87
91
.then (() -> scheduler .advanceTimeBy (delay ))
88
92
.expectErrorMatches (
89
93
t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException || t instanceof TimeoutException
90
94
)
91
- .verify (Duration .ofSeconds (10 ));
92
- }
95
+ .verifyLater ();
93
96
94
- private boolean isServiceReady () {
95
97
try {
96
- final Response reponse = client ().performRequest (new Request ("GET" , "/" ));
97
- return reponse .getStatusLine ().getStatusCode () == 200 ;
98
- } catch (final IOException ex ) {
99
- return false ;
98
+ verifier .verify (Duration .ofSeconds (10 ));
99
+ } catch (final AssertionError ex ) {
100
+ Assume .assumeNoException ("The subscriber should have been competed with error" , ex );
100
101
}
101
102
}
102
-
103
103
}
0 commit comments