|
44 | 44 | import org.elasticsearch.search.SearchModule;
|
45 | 45 | import org.elasticsearch.search.SearchPhaseResult;
|
46 | 46 | import org.elasticsearch.search.SearchShardTarget;
|
47 |
| -import org.elasticsearch.search.aggregations.AggregationBuilders; |
48 | 47 | import org.elasticsearch.search.aggregations.AggregationReduceContext;
|
49 | 48 | import org.elasticsearch.search.aggregations.InternalAggregations;
|
50 | 49 | import org.elasticsearch.search.aggregations.metrics.Max;
|
|
87 | 86 | import java.util.concurrent.atomic.AtomicReference;
|
88 | 87 |
|
89 | 88 | import static java.util.Collections.emptyList;
|
90 |
| -import static java.util.Collections.emptyMap; |
91 |
| -import static java.util.Collections.singletonList; |
92 | 89 | import static java.util.stream.Collectors.toList;
|
93 | 90 | import static org.hamcrest.Matchers.anEmptyMap;
|
94 | 91 | import static org.hamcrest.Matchers.both;
|
@@ -588,136 +585,6 @@ private static SearchRequest randomSearchRequest() {
|
588 | 585 | : SearchRequest.subSearchRequest(new TaskId("n", 1), new SearchRequest(), Strings.EMPTY_ARRAY, "remote", 0, randomBoolean());
|
589 | 586 | }
|
590 | 587 |
|
591 |
| - public void testConsumer() throws Exception { |
592 |
| - consumerTestCase(0); |
593 |
| - } |
594 |
| - |
595 |
| - public void testConsumerWithEmptyResponse() throws Exception { |
596 |
| - consumerTestCase(randomIntBetween(1, 5)); |
597 |
| - } |
598 |
| - |
599 |
| - private void consumerTestCase(int numEmptyResponses) throws Exception { |
600 |
| - int numShards = 3 + numEmptyResponses; |
601 |
| - int bufferSize = randomIntBetween(2, 3); |
602 |
| - CountDownLatch latch = new CountDownLatch(numShards); |
603 |
| - SearchRequest request = randomSearchRequest(); |
604 |
| - request.source(new SearchSourceBuilder().aggregation(new MaxAggregationBuilder("test"))); |
605 |
| - request.setBatchedReduceSize(bufferSize); |
606 |
| - try ( |
607 |
| - SearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults( |
608 |
| - () -> false, |
609 |
| - SearchProgressListener.NOOP, |
610 |
| - request, |
611 |
| - 3 + numEmptyResponses, |
612 |
| - exc -> {} |
613 |
| - ) |
614 |
| - ) { |
615 |
| - if (numEmptyResponses == 0) { |
616 |
| - assertEquals(0, reductions.size()); |
617 |
| - } |
618 |
| - if (numEmptyResponses > 0) { |
619 |
| - QuerySearchResult empty = QuerySearchResult.nullInstance(); |
620 |
| - int shardId = 2 + numEmptyResponses; |
621 |
| - empty.setShardIndex(2 + numEmptyResponses); |
622 |
| - empty.setSearchShardTarget(new SearchShardTarget("node", new ShardId("a", "b", shardId), null)); |
623 |
| - consumer.consumeResult(empty, latch::countDown); |
624 |
| - numEmptyResponses--; |
625 |
| - } |
626 |
| - |
627 |
| - QuerySearchResult result = new QuerySearchResult( |
628 |
| - new ShardSearchContextId("", 0), |
629 |
| - new SearchShardTarget("node", new ShardId("a", "b", 0), null), |
630 |
| - null |
631 |
| - ); |
632 |
| - try { |
633 |
| - result.topDocs( |
634 |
| - new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), |
635 |
| - new DocValueFormat[0] |
636 |
| - ); |
637 |
| - InternalAggregations aggs = InternalAggregations.from(singletonList(new Max("test", 1.0D, DocValueFormat.RAW, emptyMap()))); |
638 |
| - result.aggregations(aggs); |
639 |
| - result.setShardIndex(0); |
640 |
| - consumer.consumeResult(result, latch::countDown); |
641 |
| - } finally { |
642 |
| - result.decRef(); |
643 |
| - } |
644 |
| - result = new QuerySearchResult( |
645 |
| - new ShardSearchContextId("", 1), |
646 |
| - new SearchShardTarget("node", new ShardId("a", "b", 0), null), |
647 |
| - null |
648 |
| - ); |
649 |
| - try { |
650 |
| - result.topDocs( |
651 |
| - new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), |
652 |
| - new DocValueFormat[0] |
653 |
| - ); |
654 |
| - InternalAggregations aggs = InternalAggregations.from(singletonList(new Max("test", 3.0D, DocValueFormat.RAW, emptyMap()))); |
655 |
| - result.aggregations(aggs); |
656 |
| - result.setShardIndex(2); |
657 |
| - consumer.consumeResult(result, latch::countDown); |
658 |
| - } finally { |
659 |
| - result.decRef(); |
660 |
| - } |
661 |
| - result = new QuerySearchResult( |
662 |
| - new ShardSearchContextId("", 1), |
663 |
| - new SearchShardTarget("node", new ShardId("a", "b", 0), null), |
664 |
| - null |
665 |
| - ); |
666 |
| - try { |
667 |
| - result.topDocs( |
668 |
| - new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), |
669 |
| - new DocValueFormat[0] |
670 |
| - ); |
671 |
| - InternalAggregations aggs = InternalAggregations.from(singletonList(new Max("test", 2.0D, DocValueFormat.RAW, emptyMap()))); |
672 |
| - result.aggregations(aggs); |
673 |
| - result.setShardIndex(1); |
674 |
| - consumer.consumeResult(result, latch::countDown); |
675 |
| - } finally { |
676 |
| - result.decRef(); |
677 |
| - } |
678 |
| - while (numEmptyResponses > 0) { |
679 |
| - result = QuerySearchResult.nullInstance(); |
680 |
| - try { |
681 |
| - int shardId = 2 + numEmptyResponses; |
682 |
| - result.setShardIndex(shardId); |
683 |
| - result.setSearchShardTarget(new SearchShardTarget("node", new ShardId("a", "b", shardId), null)); |
684 |
| - consumer.consumeResult(result, latch::countDown); |
685 |
| - } finally { |
686 |
| - result.decRef(); |
687 |
| - } |
688 |
| - numEmptyResponses--; |
689 |
| - } |
690 |
| - latch.await(); |
691 |
| - final int numTotalReducePhases; |
692 |
| - if (numShards > bufferSize) { |
693 |
| - if (bufferSize == 2) { |
694 |
| - assertEquals(1, ((QueryPhaseResultConsumer) consumer).getNumReducePhases()); |
695 |
| - assertEquals(1, reductions.size()); |
696 |
| - assertEquals(false, reductions.get(0)); |
697 |
| - numTotalReducePhases = 2; |
698 |
| - } else { |
699 |
| - assertEquals(0, ((QueryPhaseResultConsumer) consumer).getNumReducePhases()); |
700 |
| - assertEquals(0, reductions.size()); |
701 |
| - numTotalReducePhases = 1; |
702 |
| - } |
703 |
| - } else { |
704 |
| - assertEquals(0, reductions.size()); |
705 |
| - numTotalReducePhases = 1; |
706 |
| - } |
707 |
| - |
708 |
| - SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); |
709 |
| - assertEquals(numTotalReducePhases, reduce.numReducePhases()); |
710 |
| - assertEquals(numTotalReducePhases, reductions.size()); |
711 |
| - assertAggReduction(request); |
712 |
| - Max max = (Max) reduce.aggregations().asList().get(0); |
713 |
| - assertEquals(3.0D, max.value(), 0.0D); |
714 |
| - assertFalse(reduce.sortedTopDocs().isSortedByField()); |
715 |
| - assertNull(reduce.sortedTopDocs().sortFields()); |
716 |
| - assertNull(reduce.sortedTopDocs().collapseField()); |
717 |
| - assertNull(reduce.sortedTopDocs().collapseValues()); |
718 |
| - } |
719 |
| - } |
720 |
| - |
721 | 588 | public void testConsumerConcurrently() throws Exception {
|
722 | 589 | int expectedNumResults = randomIntBetween(1, 100);
|
723 | 590 | int bufferSize = randomIntBetween(2, 200);
|
@@ -1280,14 +1147,6 @@ public void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, Interna
|
1280 | 1147 | }
|
1281 | 1148 | }
|
1282 | 1149 |
|
1283 |
| - public void testCoordCircuitBreaker() throws Exception { |
1284 |
| - int numShards = randomIntBetween(20, 200); |
1285 |
| - testReduceCase(numShards, numShards, true); |
1286 |
| - testReduceCase(numShards, numShards, false); |
1287 |
| - testReduceCase(numShards, randomIntBetween(2, numShards - 1), true); |
1288 |
| - testReduceCase(numShards, randomIntBetween(2, numShards - 1), false); |
1289 |
| - } |
1290 |
| - |
1291 | 1150 | private void testReduceCase(int numShards, int bufferSize, boolean shouldFail) throws Exception {
|
1292 | 1151 | SearchRequest request = new SearchRequest();
|
1293 | 1152 | request.source(new SearchSourceBuilder().aggregation(new MaxAggregationBuilder("test")).size(0));
|
@@ -1351,50 +1210,6 @@ private void testReduceCase(int numShards, int bufferSize, boolean shouldFail) t
|
1351 | 1210 | assertThat(circuitBreaker.allocated, equalTo(0L));
|
1352 | 1211 | }
|
1353 | 1212 |
|
1354 |
| - public void testFailConsumeAggs() throws Exception { |
1355 |
| - int expectedNumResults = randomIntBetween(20, 200); |
1356 |
| - int bufferSize = randomIntBetween(2, expectedNumResults - 1); |
1357 |
| - SearchRequest request = new SearchRequest(); |
1358 |
| - |
1359 |
| - request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0)); |
1360 |
| - request.setBatchedReduceSize(bufferSize); |
1361 |
| - AtomicBoolean hasConsumedFailure = new AtomicBoolean(); |
1362 |
| - try ( |
1363 |
| - SearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults( |
1364 |
| - () -> false, |
1365 |
| - SearchProgressListener.NOOP, |
1366 |
| - request, |
1367 |
| - expectedNumResults, |
1368 |
| - exc -> hasConsumedFailure.set(true) |
1369 |
| - ) |
1370 |
| - ) { |
1371 |
| - for (int i = 0; i < expectedNumResults; i++) { |
1372 |
| - final int index = i; |
1373 |
| - QuerySearchResult result = new QuerySearchResult( |
1374 |
| - new ShardSearchContextId(UUIDs.randomBase64UUID(), index), |
1375 |
| - new SearchShardTarget("node", new ShardId("a", "b", index), null), |
1376 |
| - null |
1377 |
| - ); |
1378 |
| - try { |
1379 |
| - result.topDocs( |
1380 |
| - new TopDocsAndMaxScore( |
1381 |
| - new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), |
1382 |
| - Float.NaN |
1383 |
| - ), |
1384 |
| - new DocValueFormat[0] |
1385 |
| - ); |
1386 |
| - result.aggregations(null); |
1387 |
| - result.setShardIndex(index); |
1388 |
| - result.size(1); |
1389 |
| - expectThrows(Exception.class, () -> consumer.consumeResult(result, () -> {})); |
1390 |
| - } finally { |
1391 |
| - result.decRef(); |
1392 |
| - } |
1393 |
| - } |
1394 |
| - assertNull(consumer.reduce().aggregations()); |
1395 |
| - } |
1396 |
| - } |
1397 |
| - |
1398 | 1213 | private static class AssertingCircuitBreaker extends NoopCircuitBreaker {
|
1399 | 1214 | private final AtomicBoolean shouldBreak = new AtomicBoolean(false);
|
1400 | 1215 |
|
|
0 commit comments