Skip to content

Commit 44bdd6e

Browse files
Fork TransportSearchAction to search coordination pool
We can run into situations where this action releases a store reference which can block for up to seconds -> fork to search coordination to make absolutely sure this won't cause transport threads to get blocked.
1 parent 89945db commit 44bdd6e

File tree

4 files changed

+14
-1
lines changed

4 files changed

+14
-1
lines changed

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.TransportVersions;
1515
import org.elasticsearch.action.ActionListener;
1616
import org.elasticsearch.action.ActionListenerResponseHandler;
17+
import org.elasticsearch.action.ActionRunnable;
1718
import org.elasticsearch.action.ActionType;
1819
import org.elasticsearch.action.IndicesRequest;
1920
import org.elasticsearch.action.OriginalIndices;
@@ -137,6 +138,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
137138
);
138139

139140
private final ThreadPool threadPool;
141+
private final Executor searchCoordinationExecutor;
140142
private final ClusterService clusterService;
141143
private final TransportService transportService;
142144
private final SearchTransportService searchTransportService;
@@ -169,6 +171,7 @@ public TransportSearchAction(
169171
) {
170172
super(TYPE.name(), transportService, actionFilters, SearchRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
171173
this.threadPool = threadPool;
174+
this.searchCoordinationExecutor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION);
172175
this.circuitBreaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST);
173176
this.searchPhaseController = searchPhaseController;
174177
this.searchTransportService = searchTransportService;
@@ -288,6 +291,11 @@ public long buildTookInMillis() {
288291

289292
@Override
290293
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
294+
// workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
295+
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked((SearchTask) task, searchRequest, l)));
296+
}
297+
298+
private void doExecuteForked(SearchTask task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
291299
ActionListener<SearchResponse> loggingAndMetrics = listener.delegateFailureAndWrap((l, searchResponse) -> {
292300
searchResponseMetrics.recordTookTime(searchResponse.getTookInMillis());
293301
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {
@@ -306,7 +314,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
306314
}
307315
l.onResponse(searchResponse);
308316
});
309-
executeRequest((SearchTask) task, searchRequest, loggingAndMetrics, AsyncSearchActionProvider::new);
317+
executeRequest(task, searchRequest, loggingAndMetrics, AsyncSearchActionProvider::new);
310318
}
311319

312320
void executeRequest(

server/src/main/java/org/elasticsearch/env/ShardLock.java

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.env;
1010

1111
import org.elasticsearch.index.shard.ShardId;
12+
import org.elasticsearch.transport.Transports;
1213

1314
import java.io.Closeable;
1415
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,6 +39,7 @@ public final ShardId getShardId() {
3839

3940
@Override
4041
public final void close() {
42+
assert Transports.assertNotTransportThread("shard lock should not have been acquired on a transport thread");
4143
if (this.closed.compareAndSet(false, true)) {
4244
closeInternal();
4345
}

server/src/main/java/org/elasticsearch/index/engine/Engine.java

+1
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,7 @@ public Searcher acquireSearcherInternal(String source) {
763763

764764
@Override
765765
protected void doClose() {
766+
assert Transports.assertNotTransportThread("releasing from the reference manager and/or store may block");
766767
try {
767768
referenceManager.release(acquire);
768769
} catch (IOException e) {

server/src/main/java/org/elasticsearch/index/store/Store.java

+2
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.index.shard.IndexShard;
6868
import org.elasticsearch.index.shard.ShardId;
6969
import org.elasticsearch.index.translog.Translog;
70+
import org.elasticsearch.transport.Transports;
7071

7172
import java.io.Closeable;
7273
import java.io.EOFException;
@@ -431,6 +432,7 @@ public boolean isClosing() {
431432
}
432433

433434
private void closeInternal() {
435+
assert Transports.assertNotTransportThread("store closing must not happen on the transport thread");
434436
// Leverage try-with-resources to close the shard lock for us
435437
try (Closeable c = shardLock) {
436438
try {

0 commit comments

Comments
 (0)