Skip to content

Commit 82794c5

Browse files
Faster and lower overhead response deserialization for query phase batched results
Lower overhead if we don't materialize a response array and potentially faster to deal with things more directly as they show up in the CPU cache.
1 parent fd2cc97 commit 82794c5

File tree

6 files changed

+193
-80
lines changed

6 files changed

+193
-80
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

+12-21
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase {
6767
protected static final float DEFAULT_INDEX_BOOST = 1.0f;
6868
private final Logger logger;
69-
private final NamedWriteableRegistry namedWriteableRegistry;
69+
protected final NamedWriteableRegistry namedWriteableRegistry;
7070
protected final SearchTransportService searchTransportService;
7171
private final Executor executor;
7272
private final ActionListener<SearchResponse> listener;
@@ -91,7 +91,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9191

9292
protected final List<SearchShardIterator> shardsIts;
9393
protected final SearchShardIterator[] shardIterators;
94-
private final AtomicInteger outstandingShards;
94+
protected final AtomicInteger outstandingShards;
9595
private final int maxConcurrentRequestsPerNode;
9696
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
9797
private final boolean throttleConcurrentRequests;
@@ -426,11 +426,15 @@ protected final void onShardFailure(final int shardIndex, SearchShardTarget shar
426426
performPhaseOnShard(shardIndex, shardIt, nextShard);
427427
} else {
428428
// count down outstanding shards, we're done with this shard as there's no more copies to try
429-
final int outstanding = outstandingShards.decrementAndGet();
430-
assert outstanding >= 0 : "outstanding: " + outstanding;
431-
if (outstanding == 0) {
432-
onPhaseDone();
433-
}
429+
onShardDone();
430+
}
431+
}
432+
433+
protected void onShardDone() {
434+
final int outstanding = outstandingShards.decrementAndGet();
435+
assert outstanding >= 0 : "outstanding: " + outstanding;
436+
if (outstanding == 0) {
437+
onPhaseDone();
434438
}
435439
}
436440

@@ -512,20 +516,7 @@ private void onShardResultConsumed(Result result) {
512516
if (shardFailures != null) {
513517
shardFailures.set(result.getShardIndex(), null);
514518
}
515-
// we need to increment successful ops first before we compare the exit condition otherwise if we
516-
// are fast we could concurrently update totalOps but then preempt one of the threads which can
517-
// cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
518-
// increment all the "future" shards to update the total ops since we some may work and some may not...
519-
// and when that happens, we break on total ops, so we must maintain them
520-
successfulShardExecution();
521-
}
522-
523-
private void successfulShardExecution() {
524-
final int outstanding = outstandingShards.decrementAndGet();
525-
assert outstanding >= 0 : "outstanding: " + outstanding;
526-
if (outstanding == 0) {
527-
onPhaseDone();
528-
}
519+
onShardDone();
529520
}
530521

531522
/**

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

+70-49
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.lucene.search.ScoreDoc;
1515
import org.apache.lucene.search.TopFieldDocs;
1616
import org.elasticsearch.ExceptionsHelper;
17+
import org.elasticsearch.TransportVersion;
1718
import org.elasticsearch.TransportVersions;
1819
import org.elasticsearch.Version;
1920
import org.elasticsearch.action.ActionListener;
@@ -23,6 +24,7 @@
2324
import org.elasticsearch.action.support.IndicesOptions;
2425
import org.elasticsearch.client.internal.Client;
2526
import org.elasticsearch.cluster.ClusterState;
27+
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
2628
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2729
import org.elasticsearch.common.io.stream.StreamInput;
2830
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -50,6 +52,7 @@
5052
import org.elasticsearch.tasks.TaskCancelledException;
5153
import org.elasticsearch.tasks.TaskId;
5254
import org.elasticsearch.threadpool.ThreadPool;
55+
import org.elasticsearch.transport.BytesTransportResponse;
5356
import org.elasticsearch.transport.LeakTracker;
5457
import org.elasticsearch.transport.SendRequestTransportException;
5558
import org.elasticsearch.transport.Transport;
@@ -232,11 +235,6 @@ public static final class NodeQueryResponse extends TransportResponse {
232235
assert Arrays.stream(results).noneMatch(Objects::isNull) : Arrays.toString(results);
233236
}
234237

235-
// public for tests
236-
public Object[] getResults() {
237-
return results;
238-
}
239-
240238
@Override
241239
public void writeTo(StreamOutput out) throws IOException {
242240
out.writeArray((o, v) -> {
@@ -465,60 +463,83 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
465463
return;
466464
}
467465
searchTransportService.transportService()
468-
.sendChildRequest(connection, NODE_SEARCH_ACTION_NAME, request, task, new TransportResponseHandler<NodeQueryResponse>() {
469-
@Override
470-
public NodeQueryResponse read(StreamInput in) throws IOException {
471-
return new NodeQueryResponse(in);
472-
}
473-
474-
@Override
475-
public Executor executor() {
476-
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
477-
}
466+
.sendChildRequest(
467+
connection,
468+
NODE_SEARCH_ACTION_NAME,
469+
request,
470+
task,
471+
new TransportResponseHandler<BytesTransportResponse>() {
472+
@Override
473+
public BytesTransportResponse read(StreamInput in) throws IOException {
474+
return new BytesTransportResponse(in);
475+
}
478476

479-
@Override
480-
public void handleResponse(NodeQueryResponse response) {
481-
if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) {
482-
queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult);
477+
@Override
478+
public Executor executor() {
479+
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
483480
}
484-
for (int i = 0; i < response.results.length; i++) {
485-
var s = request.shards.get(i);
486-
int shardIdx = s.shardIndex;
487-
final SearchShardTarget target = new SearchShardTarget(routing.nodeId(), s.shardId, routing.clusterAlias());
488-
switch (response.results[i]) {
489-
case Exception e -> onShardFailure(shardIdx, target, shardIterators[shardIdx], e);
490-
case SearchPhaseResult q -> {
491-
q.setShardIndex(shardIdx);
492-
q.setSearchShardTarget(target);
493-
onShardResult(q);
481+
482+
@Override
483+
public void handleResponse(BytesTransportResponse bytesTransportResponse) {
484+
outstandingShards.incrementAndGet();
485+
try {
486+
var in = bytesTransportResponse.bytes().streamInput();
487+
in.setTransportVersion(TransportVersion.current());
488+
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
489+
in.setTransportVersion(TransportVersion.current());
490+
final int count = in.readVInt();
491+
for (int i = 0; i < count; i++) {
492+
var s = request.shards.get(i);
493+
int shardIdx = s.shardIndex;
494+
final SearchShardTarget target = new SearchShardTarget(
495+
routing.nodeId(),
496+
s.shardId,
497+
routing.clusterAlias()
498+
);
499+
if (in.readBoolean()) {
500+
var q = new QuerySearchResult(in);
501+
q.setShardIndex(shardIdx);
502+
q.setSearchShardTarget(target);
503+
onShardResult(q);
504+
} else {
505+
onShardFailure(shardIdx, target, shardIterators[shardIdx], in.readException());
506+
}
494507
}
495-
case null, default -> {
496-
assert false : "impossible [" + response.results[i] + "]";
508+
var mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in);
509+
var topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in);
510+
if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) {
511+
queryPhaseResultConsumer.addBatchedPartialResult(topDocsStats, mergeResult);
497512
}
513+
} catch (IOException e) {
514+
assert false : e;
515+
handleException(new TransportException(e));
516+
} finally {
517+
onShardDone();
498518
}
499519
}
500-
}
501520

502-
@Override
503-
public void handleException(TransportException e) {
504-
Exception cause = (Exception) ExceptionsHelper.unwrapCause(e);
505-
if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException) {
506-
// two possible special cases here where we do not want to fail the phase:
507-
// failure to send out the request -> handle things the same way a shard would fail with unbatched execution
508-
// as this could be a transient failure and partial results we may have are still valid
509-
// cancellation of the whole batched request on the remote -> maybe we timed out or so, partial results may
510-
// still be valid
511-
onNodeQueryFailure(e, request, routing);
512-
} else {
513-
// Remote failure that wasn't due to networking or cancellation means that the data node was unable to reduce
514-
// its local results. Failure to reduce always fails the phase without exception so we fail the phase here.
515-
if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) {
516-
queryPhaseResultConsumer.failure.compareAndSet(null, cause);
521+
@Override
522+
public void handleException(TransportException e) {
523+
Exception cause = (Exception) ExceptionsHelper.unwrapCause(e);
524+
if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException) {
525+
// two possible special cases here where we do not want to fail the phase:
526+
// failure to send out the request -> handle things the same way a shard would fail with unbatched execution
527+
// as this could be a transient failure and partial results we may have are still valid
528+
// cancellation of the whole batched request on the remote -> maybe we timed out or so, partial results may
529+
// still be valid
530+
onNodeQueryFailure(e, request, routing);
531+
} else {
532+
// Remote failure that wasn't due to networking or cancellation means that the data node was unable to
533+
// reduce
534+
// its local results. Failure to reduce always fails the phase without exception so we fail the phase here.
535+
if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) {
536+
queryPhaseResultConsumer.failure.compareAndSet(null, cause);
537+
}
538+
onPhaseFailure(getName(), "", cause);
517539
}
518-
onPhaseFailure(getName(), "", cause);
519540
}
520541
}
521-
});
542+
);
522543
});
523544
}
524545

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
14+
import org.elasticsearch.common.io.stream.StreamOutput;
15+
16+
import java.io.IOException;
17+
18+
public interface BytesTransportMessage {
19+
20+
TransportVersion version();
21+
22+
ReleasableBytesReference bytes();
23+
24+
/**
25+
* Writes the data in a "thin" manner, without the actual bytes, assumes
26+
* the actual bytes will be appended right after this content.
27+
*/
28+
void writeThin(StreamOutput out) throws IOException;
29+
}

server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.transport;
1111

1212
import org.elasticsearch.TransportVersion;
13-
import org.elasticsearch.common.bytes.BytesReference;
1413
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1514
import org.elasticsearch.common.io.stream.StreamInput;
1615
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -21,7 +20,7 @@
2120
* A specialized, bytes only request, that can potentially be optimized on the network
2221
* layer, specifically for the same large buffer send to several nodes.
2322
*/
24-
public class BytesTransportRequest extends TransportRequest {
23+
public class BytesTransportRequest extends TransportRequest implements BytesTransportMessage {
2524

2625
final ReleasableBytesReference bytes;
2726
private final TransportVersion version;
@@ -37,18 +36,17 @@ public BytesTransportRequest(ReleasableBytesReference bytes, TransportVersion ve
3736
this.version = version;
3837
}
3938

39+
@Override
4040
public TransportVersion version() {
4141
return this.version;
4242
}
4343

44-
public BytesReference bytes() {
44+
@Override
45+
public ReleasableBytesReference bytes() {
4546
return this.bytes;
4647
}
4748

48-
/**
49-
* Writes the data in a "thin" manner, without the actual bytes, assumes
50-
* the actual bytes will be appended right after this content.
51-
*/
49+
@Override
5250
public void writeThin(StreamOutput out) throws IOException {
5351
super.writeTo(out);
5452
out.writeVInt(bytes.length());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.transport;
11+
12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
14+
import org.elasticsearch.common.io.stream.StreamInput;
15+
import org.elasticsearch.common.io.stream.StreamOutput;
16+
17+
import java.io.IOException;
18+
19+
/**
20+
* A specialized, bytes only response, that can potentially be optimized on the network layer.
21+
*/
22+
public class BytesTransportResponse extends TransportResponse implements BytesTransportMessage {
23+
24+
final ReleasableBytesReference bytes;
25+
private final TransportVersion version;
26+
27+
public BytesTransportResponse(StreamInput in) throws IOException {
28+
bytes = in.readAllToReleasableBytesReference();
29+
version = in.getTransportVersion();
30+
}
31+
32+
public BytesTransportResponse(ReleasableBytesReference bytes, TransportVersion version) {
33+
this.bytes = bytes;
34+
this.version = version;
35+
}
36+
37+
@Override
38+
public TransportVersion version() {
39+
return this.version;
40+
}
41+
42+
@Override
43+
public ReleasableBytesReference bytes() {
44+
return this.bytes;
45+
}
46+
47+
@Override
48+
public void writeThin(StreamOutput out) throws IOException {}
49+
50+
@Override
51+
public void writeTo(StreamOutput out) throws IOException {
52+
out.writeBytesReference(bytes);
53+
}
54+
55+
@Override
56+
public void incRef() {
57+
bytes.incRef();
58+
}
59+
60+
@Override
61+
public boolean tryIncRef() {
62+
return bytes.tryIncRef();
63+
}
64+
65+
@Override
66+
public boolean decRef() {
67+
return bytes.decRef();
68+
}
69+
70+
@Override
71+
public boolean hasReferences() {
72+
return bytes.hasReferences();
73+
}
74+
}

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ private void sendMessage(
227227
Releasable onAfter
228228
) throws IOException {
229229
assert action != null;
230-
final var compressionScheme = writeable instanceof BytesTransportRequest ? null : possibleCompressionScheme;
230+
final var compressionScheme = writeable instanceof BytesTransportMessage ? null : possibleCompressionScheme;
231231
final BytesReference message;
232232
boolean serializeSuccess = false;
233233
final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
@@ -334,11 +334,11 @@ private static BytesReference serializeMessageBody(
334334
final ReleasableBytesReference zeroCopyBuffer;
335335
try {
336336
stream.setTransportVersion(version);
337-
if (writeable instanceof BytesTransportRequest bRequest) {
337+
if (writeable instanceof BytesTransportMessage bRequest) {
338338
assert stream == byteStreamOutput;
339339
assert compressionScheme == null;
340340
bRequest.writeThin(stream);
341-
zeroCopyBuffer = bRequest.bytes;
341+
zeroCopyBuffer = bRequest.bytes();
342342
} else if (writeable instanceof RemoteTransportException remoteTransportException) {
343343
stream.writeException(remoteTransportException);
344344
zeroCopyBuffer = ReleasableBytesReference.empty();

0 commit comments

Comments
 (0)