Skip to content

Commit 40857ef

Browse files
Simplify state held by AbstractSearchAsyncAction a little further
Continuing to simplify this a little further: No need to create the throttling CHM unless we actually are throttling, we can use the `null` to signal yes/no here. Likewise, no need to have an extra mutex field on the failures reference, we can use the set-once reference. Lastly, push down the building of the PIT id to the single place where it's actually used, that way we also save the transport version field that's only used in the PIT action on the parent class.
1 parent 987e9f7 commit 40857ef

File tree

2 files changed

+26
-29
lines changed

2 files changed

+26
-29
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.apache.lucene.util.SetOnce;
1414
import org.elasticsearch.ElasticsearchException;
1515
import org.elasticsearch.ExceptionsHelper;
16-
import org.elasticsearch.TransportVersion;
1716
import org.elasticsearch.action.ActionListener;
1817
import org.elasticsearch.action.NoShardAvailableActionException;
1918
import org.elasticsearch.action.OriginalIndices;
@@ -79,11 +78,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
7978
protected final SearchTask task;
8079
protected final SearchPhaseResults<Result> results;
8180
private final long clusterStateVersion;
82-
private final TransportVersion minTransportVersion;
8381
protected final Map<String, AliasFilter> aliasFilter;
8482
protected final Map<String, Float> concreteIndexBoosts;
8583
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
86-
private final Object shardFailuresMutex = new Object();
8784
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
8885
private final AtomicInteger successfulOps;
8986
protected final SearchTimeProvider timeProvider;
@@ -93,8 +90,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9390
protected final SearchShardIterator[] shardIterators;
9491
private final AtomicInteger outstandingShards;
9592
private final int maxConcurrentRequestsPerNode;
96-
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
97-
private final boolean throttleConcurrentRequests;
93+
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
9894
private final AtomicBoolean requestCancelled = new AtomicBoolean();
9995
private final int skippedCount;
10096

@@ -142,7 +138,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
142138
Arrays.sort(shardIterators);
143139
this.maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode;
144140
// in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle
145-
this.throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts.size();
141+
this.pendingExecutionsPerNode = maxConcurrentRequestsPerNode < shardsIts.size() ? new ConcurrentHashMap<>() : null;
146142
this.timeProvider = timeProvider;
147143
this.logger = logger;
148144
this.searchTransportService = searchTransportService;
@@ -153,7 +149,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
153149
this.nodeIdToConnection = nodeIdToConnection;
154150
this.concreteIndexBoosts = concreteIndexBoosts;
155151
this.clusterStateVersion = clusterState.version();
156-
this.minTransportVersion = clusterState.getMinTransportVersion();
157152
this.aliasFilter = aliasFilter;
158153
this.results = resultConsumer;
159154
// register the release of the query consumer to free up the circuit breaker memory
@@ -254,7 +249,8 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
254249
}
255250

256251
protected final void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
257-
if (throttleConcurrentRequests) {
252+
var pendingExecutionsPerNode = this.pendingExecutionsPerNode;
253+
if (pendingExecutionsPerNode != null) {
258254
var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent(
259255
shard.getNodeId(),
260256
n -> new PendingExecutions(maxConcurrentRequestsPerNode)
@@ -464,7 +460,7 @@ void onShardFailure(final int shardIndex, SearchShardTarget shardTarget, Excepti
464460
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
465461
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
466462
if (shardFailures == null) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally
467-
synchronized (shardFailuresMutex) {
463+
synchronized (this.shardFailures) {
468464
shardFailures = this.shardFailures.get(); // read again otherwise somebody else has created it?
469465
if (shardFailures == null) { // still null so we are the first and create a new instance
470466
shardFailures = new AtomicArray<>(getNumShards());
@@ -585,10 +581,6 @@ private SearchResponse buildSearchResponse(
585581
);
586582
}
587583

588-
boolean buildPointInTimeFromSearchResults() {
589-
return false;
590-
}
591-
592584
/**
593585
* Builds and sends the final search response back to the user.
594586
*
@@ -602,23 +594,25 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
602594
if (allowPartialResults == false && failures.length > 0) {
603595
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
604596
} else {
605-
final String scrollId = request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults) : null;
606-
final BytesReference searchContextId;
607-
if (buildPointInTimeFromSearchResults()) {
608-
searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minTransportVersion, failures);
609-
} else {
610-
if (request.source() != null
611-
&& request.source().pointInTimeBuilder() != null
612-
&& request.source().pointInTimeBuilder().singleSession() == false) {
613-
searchContextId = request.source().pointInTimeBuilder().getEncodedId();
614-
} else {
615-
searchContextId = null;
616-
}
617-
}
618-
ActionListener.respondAndRelease(listener, buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
597+
ActionListener.respondAndRelease(
598+
listener,
599+
buildSearchResponse(
600+
internalSearchResponse,
601+
failures,
602+
request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults) : null,
603+
buildSearchContextId(failures)
604+
)
605+
);
619606
}
620607
}
621608

609+
protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) {
610+
var source = request.source();
611+
return source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false
612+
? source.pointInTimeBuilder().getEncodedId()
613+
: null;
614+
}
615+
622616
/**
623617
* This method will communicate a fatal phase failure back to the user. In contrast to a shard failure
624618
* will this method immediately fail the search request and return the failure to the issuer of the request

server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
1414
import org.elasticsearch.ElasticsearchStatusException;
15+
import org.elasticsearch.TransportVersion;
1516
import org.elasticsearch.TransportVersions;
1617
import org.elasticsearch.action.ActionListener;
1718
import org.elasticsearch.action.ActionListenerResponseHandler;
@@ -24,6 +25,7 @@
2425
import org.elasticsearch.action.support.IndicesOptions;
2526
import org.elasticsearch.cluster.ClusterState;
2627
import org.elasticsearch.cluster.service.ClusterService;
28+
import org.elasticsearch.common.bytes.BytesReference;
2729
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2830
import org.elasticsearch.common.io.stream.StreamInput;
2931
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -217,6 +219,7 @@ void runOpenPointInTimePhase(
217219
) {
218220
assert searchRequest.getMaxConcurrentShardRequests() == pitRequest.maxConcurrentShardRequests()
219221
: searchRequest.getMaxConcurrentShardRequests() + " != " + pitRequest.maxConcurrentShardRequests();
222+
TransportVersion minTransportVersion = clusterState.getMinTransportVersion();
220223
new AbstractSearchAsyncAction<>(
221224
actionName,
222225
logger,
@@ -266,8 +269,8 @@ protected void run() {
266269
}
267270

268271
@Override
269-
boolean buildPointInTimeFromSearchResults() {
270-
return true;
272+
protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) {
273+
return SearchContextId.encode(results.getAtomicArray().asList(), aliasFilter, minTransportVersion, failures);
271274
}
272275
}.start();
273276
}

0 commit comments

Comments
 (0)