@@ -64,18 +64,22 @@ public class ApplicationIntTest extends TbIntTestPreparation {
64
64
withFileSystemBind ("build/intTest/config" , "/runner/config" , BindMode .READ_WRITE ).
65
65
withStartupTimeout (Duration .ofMinutes (5 ));
66
66
67
- private static final int SMOKE_READ_TIMEOUT_S = 30 ;
68
- private static final int SMOKE_READ_MESSAGES = 10 ;
67
+ private static final int READ_TIMEOUT_S = 10 ;
69
68
70
- private static final int VALIDATION_READ_TIMEOUT_S = 300 ;
69
+ private static final int SMOKE_READ_MESSAGES = 100 ;
71
70
private static final int VALIDATION_READ_MESSAGES = 1000 ;
72
71
72
+ private static Map <String , TestConnectorReports > reports = new LinkedHashMap <>();
73
+
73
74
@ BeforeAll
74
75
static void beforeAll () throws Exception {
75
76
TIMEBASE_CONTAINER .start ();
76
77
APP_CONTAINER .start ();
77
78
78
79
Thread .sleep (10_000 );
80
+
81
+ Arrays .stream (listConnectors ()).sorted (Comparator .comparing (c -> c .connector ))
82
+ .forEach (c -> reports .put (c .connector , c ));
79
83
}
80
84
81
85
@ AfterAll
@@ -99,38 +103,39 @@ void closeTb() {
99
103
100
104
@ TestFactory
101
105
Stream <DynamicTest > testDataFeedInTbSmoke () throws Exception {
102
- return Arrays . stream ( listStreams ()).
103
- map (c -> DynamicTest .dynamicTest (
104
- c .connector ,
106
+ return reports . values (). stream ()
107
+ . map (c -> DynamicTest .dynamicTest (
108
+ "Connection To " + c .connector ,
105
109
() -> tryReadSomeData (c )
106
110
));
107
111
}
108
112
109
113
@ TestFactory
110
114
Stream <DynamicTest > testDataFeedInTbValidateOrderBook () throws Exception {
111
- return Arrays .stream (listStreams ()).filter (c -> !SKIP_CONNECTORS_DATA_VALIDATION .contains (c .stream )).
112
- map (c -> DynamicTest .dynamicTest (
113
- c .connector ,
115
+ return reports .values ().stream ()
116
+ .filter (c -> !SKIP_CONNECTORS_DATA_VALIDATION .contains (c .stream ))
117
+ .map (c -> DynamicTest .dynamicTest (
118
+ "Validate L2 for " + c .connector ,
114
119
() -> tryBuildOrderBook (c )
115
120
));
116
121
}
117
122
118
- private static ConnectorStream [] listStreams () throws Exception {
123
+ private static TestConnectorReports [] listConnectors () throws Exception {
119
124
final JsonArray connectors =
120
125
rest ("http://localhost:" + APP_CONTAINER .getFirstMappedPort () + "/api/v0/connectors" ).
121
126
asArrayRequired ();
122
127
123
128
return connectors .items ().
124
129
map (JsonValue ::asObjectRequired ).
125
- map (ConnectorStream ::new ).
130
+ map (TestConnectorReports ::new ).
126
131
filter (connector -> !SKIP_CONNECTORS .contains (connector .stream )).
127
- toArray (ConnectorStream []::new );
132
+ toArray (TestConnectorReports []::new );
128
133
}
129
134
130
- void tryReadSomeData (final ConnectorStream connector ) {
135
+ void tryReadSomeData (final TestConnectorReports connector ) {
131
136
// we are trying to read N messages
132
137
final int expectedNumOfMessages = SMOKE_READ_MESSAGES ;
133
- final int timeoutSeconds = SMOKE_READ_TIMEOUT_S ;
138
+ final int timeoutSeconds = READ_TIMEOUT_S ;
134
139
135
140
final DXTickStream stream = db .getStream (connector .stream );
136
141
@@ -139,20 +144,18 @@ void tryReadSomeData(final ConnectorStream connector) {
139
144
140
145
try (TickCursor cursor = stream .select (TimeConstants .TIMESTAMP_UNKNOWN ,
141
146
new SelectionOptions (true , true ))) {
142
- Assertions .assertTimeoutPreemptively (Duration .ofSeconds (timeoutSeconds ), () -> {
143
- int messages = 0 ;
144
- while (cursor .next ()) {
145
- if (++messages == expectedNumOfMessages ) {
146
- break ;
147
- }
147
+ int messages = 0 ;
148
+ while (readWithTimeout (timeoutSeconds , cursor , connector )) {
149
+ if (++messages == expectedNumOfMessages ) {
150
+ break ;
148
151
}
149
- }, "Cannot read data for the " + connector );
152
+ }
150
153
}
151
154
}
152
155
153
- void tryBuildOrderBook (final ConnectorStream connector ) {
156
+ void tryBuildOrderBook (final TestConnectorReports connector ) {
154
157
final int expectedNumOfMessages = VALIDATION_READ_MESSAGES ;
155
- final int timeoutSeconds = VALIDATION_READ_TIMEOUT_S ;
158
+ final int timeoutSeconds = READ_TIMEOUT_S ;
156
159
157
160
final DXTickStream stream = db .getStream (connector .stream );
158
161
@@ -163,54 +166,59 @@ void tryBuildOrderBook(final ConnectorStream connector) {
163
166
AtomicLong errors = new AtomicLong ();
164
167
165
168
try (TickCursor cursor = stream .select (TimeConstants .TIMESTAMP_UNKNOWN , new SelectionOptions (false , true ))) {
166
- Assertions .assertTimeoutPreemptively (Duration .ofSeconds (timeoutSeconds ), () -> {
167
- int messages = 0 ;
168
- while (cursor .next ()) {
169
- InstrumentMessage message = cursor .getMessage ();
170
- if (message .getSymbol () == null ) {
171
- continue ;
169
+ int messages = 0 ;
170
+ while (readWithTimeout (timeoutSeconds , cursor , connector )) {
171
+ InstrumentMessage message = cursor .getMessage ();
172
+ if (message .getSymbol () == null ) {
173
+ continue ;
174
+ }
175
+
176
+ // validate
177
+ if (message instanceof PackageHeaderInfo ) {
178
+ PackageHeaderInfo packageHeader = (PackageHeaderInfo ) message ;
179
+ DataValidator validator = validators .computeIfAbsent (
180
+ message .getSymbol ().toString (),
181
+ k -> createValidator (k , (sender , severity , exception , stringMessage ) -> {
182
+ if (severity == Severity .ERROR ) {
183
+ errors .addAndGet (1 );
184
+ LOG .severe (severity + " | " + stringMessage );
185
+ } else {
186
+ LOG .warning (severity + " | " + stringMessage );
187
+ }
188
+
189
+ if (exception != null ) {
190
+ LOG .log (Level .SEVERE , "Exception" , exception );
191
+ }
192
+ }, stream
193
+ )
194
+ );
195
+
196
+ validator .sendPackage (packageHeader );
197
+
198
+ if (++messages % 100 == 0 ) {
199
+ LOG .info ("Processed " + messages + " package headers" );
172
200
}
173
201
174
- // validate
175
- if (message instanceof PackageHeaderInfo ) {
176
- PackageHeaderInfo packageHeader = (PackageHeaderInfo ) message ;
177
- DataValidator validator = validators .computeIfAbsent (
178
- message .getSymbol ().toString (),
179
- k -> createValidator (k , (sender , severity , exception , stringMessage ) -> {
180
- if (severity == Severity .ERROR ) {
181
- errors .addAndGet (1 );
182
- LOG .severe (severity + " | " + stringMessage );
183
- } else {
184
- LOG .warning (severity + " | " + stringMessage );
185
- }
186
-
187
- if (exception != null ) {
188
- LOG .log (Level .SEVERE , "Exception" , exception );
189
- }
190
- }, stream
191
- )
192
- );
193
-
194
- validator .sendPackage (packageHeader );
195
-
196
- if (++messages % 100 == 0 ) {
197
- LOG .info ("Processed " + messages + " package headers" );
198
- }
199
-
200
- if (messages == expectedNumOfMessages ) {
201
- LOG .info ("Processed " + messages + " package headers" );
202
- break ;
203
- }
202
+ if (messages == expectedNumOfMessages ) {
203
+ LOG .info ("Processed " + messages + " package headers" );
204
+ break ;
204
205
}
205
206
}
206
- }, "Cannot read data for the " + connector );
207
+ }
207
208
} finally {
208
209
exportStream (stream );
209
210
}
210
211
211
212
Assertions .assertEquals (0L , errors .get ());
212
213
}
213
214
215
+ private static Boolean readWithTimeout (long timeoutSeconds , TickCursor cursor , TestConnectorReports connector ) {
216
+ return Assertions .assertTimeoutPreemptively (
217
+ Duration .ofSeconds (timeoutSeconds ), cursor ::next ,
218
+ "Cannot read data for the " + connector
219
+ );
220
+ }
221
+
214
222
private static DataValidator createValidator (String symbol , LogProcessor log , DXTickStream stream ) {
215
223
DataValidatorImpl validator = new DataValidatorImpl (symbol , log ,
216
224
Decimal64Utils .parse ("0.0000000000000000001" ), Decimal64Utils .parse ("0.0000000000000000001" ),
@@ -260,16 +268,16 @@ private static JsonValue rest(final String url) throws Exception {
260
268
return new JsonValueParser ().parseAndEoj (response .body ());
261
269
}
262
270
263
- private static class ConnectorStream {
271
+ private static class TestConnectorReports {
264
272
private final String connector ;
265
273
private final String stream ;
266
274
267
- private ConnectorStream (final JsonObject fromJsom ) {
275
+ private TestConnectorReports (final JsonObject fromJsom ) {
268
276
this (fromJsom .getStringRequired ("name" ),
269
277
fromJsom .getStringRequired ("stream" ));
270
278
}
271
279
272
- private ConnectorStream (final String connector , final String stream ) {
280
+ private TestConnectorReports (final String connector , final String stream ) {
273
281
this .connector = connector ;
274
282
this .stream = stream ;
275
283
}
0 commit comments