19
19
import com .fasterxml .jackson .databind .ObjectMapper ;
20
20
import com .github .dockerjava .api .command .InspectContainerResponse ;
21
21
import com .github .dockerjava .api .model .ContainerNetwork ;
22
+ import lombok .Cleanup ;
22
23
import okhttp3 .Credentials ;
23
24
import okhttp3 .FormBody ;
24
25
import okhttp3 .OkHttpClient ;
25
26
import okhttp3 .Request ;
26
27
import okhttp3 .RequestBody ;
27
28
import okhttp3 .Response ;
29
+ import org .rnorth .ducttape .unreliables .Unreliables ;
28
30
import org .testcontainers .containers .GenericContainer ;
29
31
import org .testcontainers .containers .wait .strategy .HttpWaitStrategy ;
30
32
import org .testcontainers .containers .wait .strategy .WaitAllStrategy ;
36
38
import java .util .List ;
37
39
import java .util .Optional ;
38
40
import java .util .Set ;
41
+ import java .util .concurrent .TimeUnit ;
39
42
import java .util .stream .Collectors ;
40
43
41
44
/**
@@ -224,7 +227,7 @@ private void waitUntilNodeIsOnline() {
224
227
private void renameNode () {
225
228
logger ().debug ("Renaming Couchbase Node from localhost to {}" , getHost ());
226
229
227
- Response response = doHttpRequest (MGMT_PORT , "/node/controller/rename" , "POST" , new FormBody .Builder ()
230
+ @ Cleanup Response response = doHttpRequest (MGMT_PORT , "/node/controller/rename" , "POST" , new FormBody .Builder ()
228
231
.add ("hostname" , getInternalIpAddress ())
229
232
.build (), false
230
233
);
@@ -248,7 +251,7 @@ private void initializeServices() {
248
251
}
249
252
}).collect (Collectors .joining ("," ));
250
253
251
- Response response = doHttpRequest (MGMT_PORT , "/node/controller/setupServices" , "POST" , new FormBody .Builder ()
254
+ @ Cleanup Response response = doHttpRequest (MGMT_PORT , "/node/controller/setupServices" , "POST" , new FormBody .Builder ()
252
255
.add ("services" , services )
253
256
.build (), false
254
257
);
@@ -264,7 +267,7 @@ private void initializeServices() {
264
267
private void configureAdminUser () {
265
268
logger ().debug ("Configuring couchbase admin user with username: \" {}\" " , username );
266
269
267
- Response response = doHttpRequest (MGMT_PORT , "/settings/web" , "POST" , new FormBody .Builder ()
270
+ @ Cleanup Response response = doHttpRequest (MGMT_PORT , "/settings/web" , "POST" , new FormBody .Builder ()
268
271
.add ("username" , username )
269
272
.add ("password" , password )
270
273
.add ("port" , Integer .toString (MGMT_PORT ))
@@ -305,7 +308,7 @@ private void configureExternalPorts() {
305
308
builder .add ("ftsSSL" , Integer .toString (getMappedPort (SEARCH_SSL_PORT )));
306
309
}
307
310
308
- final Response response = doHttpRequest (
311
+ @ Cleanup Response response = doHttpRequest (
309
312
MGMT_PORT ,
310
313
"/node/controller/setupAlternateAddresses/external" ,
311
314
"PUT" ,
@@ -322,7 +325,7 @@ private void configureExternalPorts() {
322
325
private void configureIndexer () {
323
326
logger ().debug ("Configuring the indexer service" );
324
327
325
- Response response = doHttpRequest (MGMT_PORT , "/settings/indexes" , "POST" , new FormBody .Builder ()
328
+ @ Cleanup Response response = doHttpRequest (MGMT_PORT , "/settings/indexes" , "POST" , new FormBody .Builder ()
326
329
.add ("storageMode" , "memory_optimized" )
327
330
.build (), true
328
331
);
@@ -339,7 +342,7 @@ private void createBuckets() {
339
342
for (BucketDefinition bucket : buckets ) {
340
343
logger ().debug ("Creating bucket \" " + bucket .getName () + "\" " );
341
344
342
- Response response = doHttpRequest (MGMT_PORT , "/pools/default/buckets" , "POST" , new FormBody .Builder ()
345
+ @ Cleanup Response response = doHttpRequest (MGMT_PORT , "/pools/default/buckets" , "POST" , new FormBody .Builder ()
343
346
.add ("name" , bucket .getName ())
344
347
.add ("ramQuotaMB" , Integer .toString (bucket .getQuota ()))
345
348
.build (), true );
@@ -353,9 +356,27 @@ private void createBuckets() {
353
356
.forStatusCode (200 )
354
357
.waitUntilReady (this );
355
358
359
+ if (enabledServices .contains (CouchbaseService .QUERY )) {
360
+ // If the query service is enabled, make sure that we only proceed if the query engine also
361
+ // knows about the bucket in its metadata configuration.
362
+ Unreliables .retryUntilTrue (1 , TimeUnit .MINUTES , () -> {
363
+ @ Cleanup Response queryResponse = doHttpRequest (QUERY_PORT , "/query/service" , "POST" , new FormBody .Builder ()
364
+ .add ("statement" , "SELECT COUNT(*) > 0 as present FROM system:keyspaces WHERE name = \" " + bucket .getName () + "\" " )
365
+ .build (), true );
366
+
367
+ String body = queryResponse .body () != null ? queryResponse .body ().string () : null ;
368
+ checkSuccessfulResponse (queryResponse , "Could not poll query service state for bucket: " + bucket .getName ());
369
+
370
+ return Optional .of (MAPPER .readTree (body ))
371
+ .map (n -> n .at ("/results/0/present" ))
372
+ .map (JsonNode ::asBoolean )
373
+ .orElse (false );
374
+ });
375
+ }
376
+
356
377
if (bucket .hasPrimaryIndex ()) {
357
378
if (enabledServices .contains (CouchbaseService .QUERY )) {
358
- Response queryResponse = doHttpRequest (QUERY_PORT , "/query/service" , "POST" , new FormBody .Builder ()
379
+ @ Cleanup Response queryResponse = doHttpRequest (QUERY_PORT , "/query/service" , "POST" , new FormBody .Builder ()
359
380
.add ("statement" , "CREATE PRIMARY INDEX on `" + bucket .getName () + "`" )
360
381
.build (), true );
361
382
@@ -384,14 +405,8 @@ private String getInternalIpAddress() {
384
405
* @param message the message that should be part of the exception of not successful.
385
406
*/
386
407
private void checkSuccessfulResponse (final Response response , final String message ) {
387
- try {
388
- if (!response .isSuccessful ()) {
389
- throw new IllegalStateException (message + ": " + response .toString ());
390
- }
391
- } finally {
392
- if (response .body () != null ) {
393
- response .body ().close ();
394
- }
408
+ if (!response .isSuccessful ()) {
409
+ throw new IllegalStateException (message + ": " + response .toString ());
395
410
}
396
411
}
397
412
0 commit comments