@@ -1303,8 +1303,8 @@ public SearchPhase newSearchPhase(
1303
1303
task ,
1304
1304
true ,
1305
1305
searchService .getCoordinatorRewriteContextProvider (timeProvider ::absoluteStartMillis ),
1306
- listener .delegateFailureAndWrap (( l , iters ) -> {
1307
- SearchPhase action = newSearchPhase (
1306
+ listener .delegateFailureAndWrap (
1307
+ ( l , iters ) -> newSearchPhase (
1308
1308
task ,
1309
1309
searchRequest ,
1310
1310
executor ,
@@ -1317,30 +1317,32 @@ public SearchPhase newSearchPhase(
1317
1317
false ,
1318
1318
threadPool ,
1319
1319
clusters
1320
- );
1321
- action .start ();
1322
- })
1323
- );
1324
- } else {
1325
- // for synchronous CCS minimize_roundtrips=false, use the CCSSingleCoordinatorSearchProgressListener
1326
- // (AsyncSearchTask will not return SearchProgressListener.NOOP, since it uses its own progress listener
1327
- // which delegates to CCSSingleCoordinatorSearchProgressListener when minimizing roundtrips)
1328
- if (clusters .isCcsMinimizeRoundtrips () == false
1329
- && clusters .hasRemoteClusters ()
1330
- && task .getProgressListener () == SearchProgressListener .NOOP ) {
1331
- task .setProgressListener (new CCSSingleCoordinatorSearchProgressListener ());
1332
- }
1333
- final SearchPhaseResults <SearchPhaseResult > queryResultConsumer = searchPhaseController .newSearchPhaseResults (
1334
- executor ,
1335
- circuitBreaker ,
1336
- task ::isCancelled ,
1337
- task .getProgressListener (),
1338
- searchRequest ,
1339
- shardIterators .size (),
1340
- exc -> searchTransportService .cancelSearchTask (task , "failed to merge result [" + exc .getMessage () + "]" )
1320
+ ).start ()
1321
+ )
1341
1322
);
1323
+ }
1324
+ // for synchronous CCS minimize_roundtrips=false, use the CCSSingleCoordinatorSearchProgressListener
1325
+ // (AsyncSearchTask will not return SearchProgressListener.NOOP, since it uses its own progress listener
1326
+ // which delegates to CCSSingleCoordinatorSearchProgressListener when minimizing roundtrips)
1327
+ if (clusters .isCcsMinimizeRoundtrips () == false
1328
+ && clusters .hasRemoteClusters ()
1329
+ && task .getProgressListener () == SearchProgressListener .NOOP ) {
1330
+ task .setProgressListener (new CCSSingleCoordinatorSearchProgressListener ());
1331
+ }
1332
+ final SearchPhaseResults <SearchPhaseResult > queryResultConsumer = searchPhaseController .newSearchPhaseResults (
1333
+ executor ,
1334
+ circuitBreaker ,
1335
+ task ::isCancelled ,
1336
+ task .getProgressListener (),
1337
+ searchRequest ,
1338
+ shardIterators .size (),
1339
+ exc -> searchTransportService .cancelSearchTask (task , "failed to merge result [" + exc .getMessage () + "]" )
1340
+ );
1341
+ boolean success = false ;
1342
+ try {
1343
+ final SearchPhase searchPhase ;
1342
1344
if (searchRequest .searchType () == DFS_QUERY_THEN_FETCH ) {
1343
- return new SearchDfsQueryThenFetchAsyncAction (
1345
+ searchPhase = new SearchDfsQueryThenFetchAsyncAction (
1344
1346
logger ,
1345
1347
namedWriteableRegistry ,
1346
1348
searchTransportService ,
@@ -1359,7 +1361,7 @@ public SearchPhase newSearchPhase(
1359
1361
);
1360
1362
} else {
1361
1363
assert searchRequest .searchType () == QUERY_THEN_FETCH : searchRequest .searchType ();
1362
- return new SearchQueryThenFetchAsyncAction (
1364
+ searchPhase = new SearchQueryThenFetchAsyncAction (
1363
1365
logger ,
1364
1366
namedWriteableRegistry ,
1365
1367
searchTransportService ,
@@ -1377,6 +1379,12 @@ public SearchPhase newSearchPhase(
1377
1379
clusters
1378
1380
);
1379
1381
}
1382
+ success = true ;
1383
+ return searchPhase ;
1384
+ } finally {
1385
+ if (success == false ) {
1386
+ queryResultConsumer .close ();
1387
+ }
1380
1388
}
1381
1389
}
1382
1390
}
0 commit comments