Skip to content

Commit 433f91f

Browse files
committed
Part 2: Support for cancel_after_timeinterval parameter in search and msearch request
This commit adds the handling of the new request level parameter and schedule cancellation task. It also adds a cluster setting to set a global cancellation timeout for search request which will be used in absence of request level timeout. TEST: Added new tests in SearchCancellationIT Signed-off-by: Sorabh Hamirwasia <[email protected]>
1 parent 982a599 commit 433f91f

File tree

7 files changed

+457
-6
lines changed

7 files changed

+457
-6
lines changed

server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java

+273-4
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/action/search/SearchRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ public TimeValue getCancelAfterTimeInterval() {
692692

693693
@Override
694694
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
695-
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers);
695+
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, true, cancelAfterTimeInterval);
696696
}
697697

698698
public final String buildDescription() {

server/src/main/java/org/opensearch/action/search/SearchTask.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.action.search;
3434

35+
import org.opensearch.common.unit.TimeValue;
3536
import org.opensearch.tasks.CancellableTask;
3637
import org.opensearch.tasks.TaskId;
3738

@@ -48,7 +49,12 @@ public class SearchTask extends CancellableTask {
4849

4950
public SearchTask(long id, String type, String action, Supplier<String> descriptionSupplier,
5051
TaskId parentTaskId, Map<String, String> headers) {
51-
super(id, type, action, null, parentTaskId, headers);
52+
this(id, type, action, descriptionSupplier, parentTaskId, headers, false, null);
53+
}
54+
55+
public SearchTask(long id, String type, String action, Supplier<String> descriptionSupplier,
56+
TaskId parentTaskId, Map<String, String> headers, boolean cancelOnTimeout, TimeValue cancelTimeout) {
57+
super(id, type, action, null, parentTaskId, headers, cancelOnTimeout, cancelTimeout);
5258
this.descriptionSupplier = descriptionSupplier;
5359
}
5460

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

+24
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.opensearch.action.support.ActionFilters;
4242
import org.opensearch.action.support.HandledTransportAction;
4343
import org.opensearch.action.support.IndicesOptions;
44+
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
4445
import org.opensearch.client.Client;
4546
import org.opensearch.client.OriginSettingClient;
4647
import org.opensearch.client.node.NodeClient;
@@ -81,6 +82,7 @@
8182
import org.opensearch.search.internal.SearchContext;
8283
import org.opensearch.search.profile.ProfileShardResult;
8384
import org.opensearch.search.profile.SearchProfileShardResults;
85+
import org.opensearch.tasks.CancellableTask;
8486
import org.opensearch.tasks.Task;
8587
import org.opensearch.tasks.TaskId;
8688
import org.opensearch.threadpool.ThreadPool;
@@ -121,6 +123,19 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
121123
public static final Setting<Long> SHARD_COUNT_LIMIT_SETTING = Setting.longSetting(
122124
"action.search.shard_count.limit", Long.MAX_VALUE, 1L, Property.Dynamic, Property.NodeScope);
123125

126+
// cluster level setting for timeout based search cancellation. If search request level parameter is present then that will take
127+
// precedence over the cluster setting value
128+
public static final String SEARCH_REQUEST_CANCEL_AFTER_TIMEINTERVAL_SETTING_KEY = "search.cancel_after_timeinterval";
129+
public static final Setting<TimeValue> SEARCH_REQUEST_CANCEL_AFTER_TIMEINTERVAL_SETTING =
130+
Setting.timeSetting(SEARCH_REQUEST_CANCEL_AFTER_TIMEINTERVAL_SETTING_KEY, TimeValue.timeValueSeconds(300), SearchService.NO_TIMEOUT,
131+
Setting.Property.Dynamic, Setting.Property.NodeScope);
132+
133+
// cluster level setting to control enabling/disabling the timeout based cancellation. This is enabled by default
134+
public static final String SEARCH_REQUEST_CANCELLATION_ENABLE_SETTING_KEY = "search.timeout.cancellation.enable";
135+
public static final Setting<Boolean> SEARCH_REQUEST_CANCELLATION_ENABLE_SETTING =
136+
Setting.boolSetting(SEARCH_REQUEST_CANCELLATION_ENABLE_SETTING_KEY, true, Setting.Property.Dynamic,
137+
Setting.Property.NodeScope);
138+
124139
private final NodeClient client;
125140
private final ThreadPool threadPool;
126141
private final ClusterService clusterService;
@@ -239,6 +254,15 @@ long buildTookInMillis() {
239254

240255
@Override
241256
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
257+
// only if task is of type CancellableTask and support cancellation on timeout, treat this request eligible for timeout based
258+
// cancellation. There may be other top level requests like AsyncSearch which is using SearchRequest internally and has it's own
259+
// cancellation mechanism. For such cases, the SearchRequest when created can override the createTask and provide the necessary
260+
// flag to indicate it and bypass this mechanism
261+
final boolean isTimeoutCancelEnabled = clusterService.getClusterSettings().get(SEARCH_REQUEST_CANCELLATION_ENABLE_SETTING);
262+
if (task instanceof CancellableTask && ((CancellableTask) task).shouldCancelOnTimeout() && isTimeoutCancelEnabled) {
263+
listener = TimeoutTaskCancellationUtility.wrapWithCancellationListener(client, (CancellableTask) task,
264+
clusterService.getClusterSettings().get(SEARCH_REQUEST_CANCEL_AFTER_TIMEINTERVAL_SETTING), listener);
265+
}
242266
executeRequest(task, searchRequest, this::searchAsyncAction, listener);
243267
}
244268

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.support;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.opensearch.action.ActionListener;
15+
import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
16+
import org.opensearch.client.OriginSettingClient;
17+
import org.opensearch.client.node.NodeClient;
18+
import org.opensearch.common.unit.TimeValue;
19+
import org.opensearch.search.SearchService;
20+
import org.opensearch.tasks.CancellableTask;
21+
import org.opensearch.tasks.TaskId;
22+
import org.opensearch.threadpool.Scheduler;
23+
import org.opensearch.threadpool.ThreadPool;
24+
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
28+
import static org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
29+
30+
public class TimeoutTaskCancellationUtility {
31+
32+
private static final Logger logger = LogManager.getLogger(TimeoutTaskCancellationUtility.class);
33+
34+
/**
35+
* Wraps a listener with a timeout listener {@link TimeoutRunnableListener} to schedule the task cancellation for provided tasks on
36+
* generic thread pool
37+
* @param client - {@link NodeClient}
38+
* @param taskToCancel - task to schedule cancellation for
39+
* @param globalTimeout - global timeout to use for scheduling cancellation task in absence of task level parameter
40+
* @param listener - original listener associated with the task
41+
* @return wrapped listener
42+
*/
43+
public static <Response> ActionListener<Response> wrapWithCancellationListener(NodeClient client, CancellableTask taskToCancel,
44+
TimeValue globalTimeout, ActionListener<Response> listener) {
45+
final TimeValue timeoutInterval = (taskToCancel.getCancellationTimeout() == null) ? globalTimeout
46+
: taskToCancel.getCancellationTimeout();
47+
// Note: If -1 (or no timeout) is set at request level then we will use that value instead of cluster level value. This will help
48+
// to turn off cancellation at request level.
49+
ActionListener<Response> listenerToReturn = listener;
50+
if (timeoutInterval.equals(SearchService.NO_TIMEOUT)) {
51+
return listenerToReturn;
52+
}
53+
54+
try {
55+
final TimeoutRunnableListener<Response> wrappedListener = new TimeoutRunnableListener<>(timeoutInterval, listener, () -> {
56+
final CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
57+
cancelTasksRequest.setTaskId(new TaskId(client.getLocalNodeId(), taskToCancel.getId()));
58+
cancelTasksRequest.setReason("Cancellation timeout of " + timeoutInterval + " is expired");
59+
// force the origin to execute the cancellation as a system user
60+
new OriginSettingClient(client, TASKS_ORIGIN).admin().cluster()
61+
.cancelTasks(cancelTasksRequest, ActionListener.wrap(r -> logger.debug(
62+
"Scheduled cancel task with timeout: {} for original task: {} is successfully completed", timeoutInterval,
63+
cancelTasksRequest.getTaskId()),
64+
e -> logger.error(new ParameterizedMessage("Scheduled cancel task with timeout: {} for original task: {} is failed",
65+
timeoutInterval, cancelTasksRequest.getTaskId()), e))
66+
);
67+
});
68+
wrappedListener.cancellable = client.threadPool().schedule(wrappedListener, timeoutInterval, ThreadPool.Names.GENERIC);
69+
listenerToReturn = wrappedListener;
70+
} catch (Exception ex) {
71+
// if there is any exception in scheduling the cancellation task then continue without it
72+
logger.warn("Failed to schedule the cancellation task for original task: {}, will continue without it", taskToCancel.getId());
73+
}
74+
return listenerToReturn;
75+
}
76+
77+
/**
78+
* Timeout listener which executes the provided runnable after timeout is expired and if a response/failure is not yet received.
79+
* If either a response/failure is received before timeout then the scheduled task is cancelled and response/failure is sent back to
80+
* the original listener.
81+
*/
82+
private static class TimeoutRunnableListener<Response> implements ActionListener<Response>, Runnable {
83+
84+
private static final Logger logger = LogManager.getLogger(TimeoutRunnableListener.class);
85+
86+
// Runnable to execute after timeout
87+
private final TimeValue timeout;
88+
private final ActionListener<Response> originalListener;
89+
private final Runnable timeoutRunnable;
90+
private final AtomicBoolean executeRunnable = new AtomicBoolean(true);
91+
private volatile Scheduler.ScheduledCancellable cancellable;
92+
private final long creationTime;
93+
94+
TimeoutRunnableListener(TimeValue timeout, ActionListener<Response> listener, Runnable runAfterTimeout) {
95+
this.timeout = timeout;
96+
this.originalListener = listener;
97+
this.timeoutRunnable = runAfterTimeout;
98+
this.creationTime = System.nanoTime();
99+
}
100+
101+
@Override public void onResponse(Response response) {
102+
checkAndCancel();
103+
originalListener.onResponse(response);
104+
}
105+
106+
@Override public void onFailure(Exception e) {
107+
checkAndCancel();
108+
originalListener.onFailure(e);
109+
}
110+
111+
@Override public void run() {
112+
try {
113+
if (executeRunnable.compareAndSet(true, false)) {
114+
timeoutRunnable.run();
115+
} // else do nothing since either response/failure is already sent to client
116+
} catch (Exception ex) {
117+
// ignore the exception
118+
logger.error(new ParameterizedMessage("Ignoring the failure to run the provided runnable after timeout of {} with " +
119+
"exception", timeout), ex);
120+
}
121+
}
122+
123+
private void checkAndCancel() {
124+
if (executeRunnable.compareAndSet(true, false)) {
125+
logger.debug("Aborting the scheduled cancel task after {}",
126+
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - creationTime));
127+
// timer has not yet expired so cancel it
128+
cancellable.cancel();
129+
}
130+
}
131+
}
132+
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+2
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,8 @@ public void apply(Settings value, Settings current, Settings previous) {
345345
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
346346
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
347347
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
348+
TransportSearchAction.SEARCH_REQUEST_CANCELLATION_ENABLE_SETTING,
349+
TransportSearchAction.SEARCH_REQUEST_CANCEL_AFTER_TIMEINTERVAL_SETTING,
348350
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
349351
RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE,
350352
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,

server/src/main/java/org/opensearch/tasks/CancellableTask.java

+18
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
package org.opensearch.tasks;
3434

3535
import org.opensearch.common.Nullable;
36+
import org.opensearch.common.unit.TimeValue;
3637

3738
import java.util.Map;
3839
import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,9 +45,18 @@ public abstract class CancellableTask extends Task {
4445

4546
private volatile String reason;
4647
private final AtomicBoolean cancelled = new AtomicBoolean(false);
48+
private final boolean cancelOnTimeout;
49+
private final TimeValue cancellationTimeout;
4750

4851
public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
52+
this(id, type, action, description, parentTaskId, headers, false, null);
53+
}
54+
55+
public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers,
56+
boolean cancelOnTimeout, TimeValue cancellationTimeout) {
4957
super(id, type, action, description, parentTaskId, headers);
58+
this.cancelOnTimeout = cancelOnTimeout;
59+
this.cancellationTimeout = cancellationTimeout;
5060
}
5161

5262
/**
@@ -77,6 +87,14 @@ public boolean isCancelled() {
7787
return cancelled.get();
7888
}
7989

90+
public boolean shouldCancelOnTimeout() {
91+
return cancelOnTimeout;
92+
}
93+
94+
public TimeValue getCancellationTimeout() {
95+
return cancellationTimeout;
96+
}
97+
8098
/**
8199
* The reason the task was cancelled or null if it hasn't been cancelled.
82100
*/

0 commit comments

Comments
 (0)