Skip to content

Commit e9d8e00

Browse files
authored
Add tracking for long running SearchTask post cancellation (#17726)
* add tracking for long running SearchTask post cancellation Signed-off-by: Ruirui Zhang <[email protected]> * add version checks for searchTask Signed-off-by: Ruirui Zhang <[email protected]>
1 parent 98568e8 commit e9d8e00

9 files changed

+210
-76
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1111
- Introduce a new search node role to hold search only shards ([#17620](https://github.com/opensearch-project/OpenSearch/pull/17620))
1212
- Fix systemd integTest on deb regarding path ownership check ([#17641](https://github.com/opensearch-project/OpenSearch/pull/17641))
1313
- Add dfs transformation function in XContentMapValues ([#17612](https://github.com/opensearch-project/OpenSearch/pull/17612))
14+
- Add tracking for long-running SearchTask post cancellation ([#17726](https://github.com/opensearch-project/OpenSearch/pull/17726))
1415
- Added Kinesis support as a plugin for the pull-based ingestion ([#17615](https://github.com/opensearch-project/OpenSearch/pull/17615))
1516
- Add FilterFieldType for developers who want to wrap MappedFieldType ([#17627](https://github.com/opensearch-project/OpenSearch/pull/17627))
1617
- [Rule Based Auto-tagging] Add in-memory rule processing service ([#17365](https://github.com/opensearch-project/OpenSearch/pull/17365))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.tasks;
10+
11+
import org.opensearch.core.common.io.stream.StreamInput;
12+
import org.opensearch.core.common.io.stream.StreamOutput;
13+
import org.opensearch.core.common.io.stream.Writeable;
14+
import org.opensearch.core.xcontent.ToXContentObject;
15+
import org.opensearch.core.xcontent.XContentBuilder;
16+
17+
import java.io.IOException;
18+
import java.util.Objects;
19+
20+
/**
21+
* Base class for search task cancellation statistics.
22+
*/
23+
public abstract class BaseSearchTaskCancellationStats implements ToXContentObject, Writeable {
24+
25+
private final long currentLongRunningCancelledTaskCount;
26+
private final long totalLongRunningCancelledTaskCount;
27+
28+
public BaseSearchTaskCancellationStats(long currentTaskCount, long totalTaskCount) {
29+
this.currentLongRunningCancelledTaskCount = currentTaskCount;
30+
this.totalLongRunningCancelledTaskCount = totalTaskCount;
31+
}
32+
33+
public BaseSearchTaskCancellationStats(StreamInput in) throws IOException {
34+
this.currentLongRunningCancelledTaskCount = in.readVLong();
35+
this.totalLongRunningCancelledTaskCount = in.readVLong();
36+
}
37+
38+
protected long getCurrentLongRunningCancelledTaskCount() {
39+
return this.currentLongRunningCancelledTaskCount;
40+
}
41+
42+
protected long getTotalLongRunningCancelledTaskCount() {
43+
return this.totalLongRunningCancelledTaskCount;
44+
}
45+
46+
@Override
47+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
48+
builder.startObject();
49+
builder.field("current_count_post_cancel", currentLongRunningCancelledTaskCount);
50+
builder.field("total_count_post_cancel", totalLongRunningCancelledTaskCount);
51+
return builder.endObject();
52+
}
53+
54+
@Override
55+
public void writeTo(StreamOutput out) throws IOException {
56+
out.writeVLong(currentLongRunningCancelledTaskCount);
57+
out.writeVLong(totalLongRunningCancelledTaskCount);
58+
}
59+
60+
@Override
61+
public boolean equals(Object o) {
62+
if (this == o) return true;
63+
if (o == null || getClass() != o.getClass()) return false;
64+
BaseSearchTaskCancellationStats that = (BaseSearchTaskCancellationStats) o;
65+
return currentLongRunningCancelledTaskCount == that.currentLongRunningCancelledTaskCount
66+
&& totalLongRunningCancelledTaskCount == that.totalLongRunningCancelledTaskCount;
67+
}
68+
69+
@Override
70+
public int hashCode() {
71+
return Objects.hash(currentLongRunningCancelledTaskCount, totalLongRunningCancelledTaskCount);
72+
}
73+
}

server/src/main/java/org/opensearch/tasks/SearchShardTaskCancellationStats.java

+3-51
Original file line numberDiff line numberDiff line change
@@ -9,67 +9,19 @@
99
package org.opensearch.tasks;
1010

1111
import org.opensearch.core.common.io.stream.StreamInput;
12-
import org.opensearch.core.common.io.stream.StreamOutput;
13-
import org.opensearch.core.common.io.stream.Writeable;
14-
import org.opensearch.core.xcontent.ToXContentObject;
15-
import org.opensearch.core.xcontent.XContentBuilder;
1612

1713
import java.io.IOException;
18-
import java.util.Objects;
1914

2015
/**
2116
* Holds monitoring service stats specific to search shard task.
2217
*/
23-
public class SearchShardTaskCancellationStats implements ToXContentObject, Writeable {
24-
25-
private final long currentLongRunningCancelledTaskCount;
26-
private final long totalLongRunningCancelledTaskCount;
18+
public class SearchShardTaskCancellationStats extends BaseSearchTaskCancellationStats {
2719

2820
public SearchShardTaskCancellationStats(long currentTaskCount, long totalTaskCount) {
29-
this.currentLongRunningCancelledTaskCount = currentTaskCount;
30-
this.totalLongRunningCancelledTaskCount = totalTaskCount;
21+
super(currentTaskCount, totalTaskCount);
3122
}
3223

3324
public SearchShardTaskCancellationStats(StreamInput in) throws IOException {
34-
this.currentLongRunningCancelledTaskCount = in.readVLong();
35-
this.totalLongRunningCancelledTaskCount = in.readVLong();
36-
}
37-
38-
// package private for testing
39-
protected long getCurrentLongRunningCancelledTaskCount() {
40-
return this.currentLongRunningCancelledTaskCount;
41-
}
42-
43-
// package private for testing
44-
protected long getTotalLongRunningCancelledTaskCount() {
45-
return this.totalLongRunningCancelledTaskCount;
46-
}
47-
48-
@Override
49-
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
50-
builder.startObject();
51-
builder.field("current_count_post_cancel", currentLongRunningCancelledTaskCount);
52-
builder.field("total_count_post_cancel", totalLongRunningCancelledTaskCount);
53-
return builder.endObject();
54-
}
55-
56-
@Override
57-
public void writeTo(StreamOutput out) throws IOException {
58-
out.writeVLong(currentLongRunningCancelledTaskCount);
59-
out.writeVLong(totalLongRunningCancelledTaskCount);
60-
}
61-
62-
@Override
63-
public boolean equals(Object o) {
64-
if (this == o) return true;
65-
if (o == null || getClass() != o.getClass()) return false;
66-
SearchShardTaskCancellationStats that = (SearchShardTaskCancellationStats) o;
67-
return currentLongRunningCancelledTaskCount == that.currentLongRunningCancelledTaskCount
68-
&& totalLongRunningCancelledTaskCount == that.totalLongRunningCancelledTaskCount;
69-
}
70-
71-
@Override
72-
public int hashCode() {
73-
return Objects.hash(currentLongRunningCancelledTaskCount, totalLongRunningCancelledTaskCount);
25+
super(in);
7426
}
7527
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.tasks;
10+
11+
import org.opensearch.core.common.io.stream.StreamInput;
12+
13+
import java.io.IOException;
14+
15+
/**
16+
* Holds monitoring service stats specific to search task.
17+
*/
18+
public class SearchTaskCancellationStats extends BaseSearchTaskCancellationStats {
19+
20+
public SearchTaskCancellationStats(long currentTaskCount, long totalTaskCount) {
21+
super(currentTaskCount, totalTaskCount);
22+
}
23+
24+
public SearchTaskCancellationStats(StreamInput in) throws IOException {
25+
super(in);
26+
}
27+
}

server/src/main/java/org/opensearch/tasks/TaskCancellationMonitoringService.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.action.search.SearchShardTask;
14+
import org.opensearch.action.search.SearchTask;
1415
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
1516
import org.opensearch.common.metrics.CounterMetric;
1617
import org.opensearch.threadpool.Scheduler;
@@ -32,7 +33,7 @@
3233
public class TaskCancellationMonitoringService extends AbstractLifecycleComponent implements TaskManager.TaskEventListeners {
3334

3435
private static final Logger logger = LogManager.getLogger(TaskCancellationMonitoringService.class);
35-
private final static List<Class<? extends CancellableTask>> TASKS_TO_TRACK = Arrays.asList(SearchShardTask.class);
36+
private final static List<Class<? extends CancellableTask>> TASKS_TO_TRACK = Arrays.asList(SearchShardTask.class, SearchTask.class);
3637

3738
private volatile Scheduler.Cancellable scheduledFuture;
3839
private final ThreadPool threadPool;
@@ -146,6 +147,10 @@ public TaskCancellationStats stats() {
146147
Map<Class<? extends CancellableTask>, List<CancellableTask>> currentRunningCancelledTasks =
147148
getCurrentRunningTasksPostCancellation();
148149
return new TaskCancellationStats(
150+
new SearchTaskCancellationStats(
151+
Optional.of(currentRunningCancelledTasks).map(mapper -> mapper.get(SearchTask.class)).map(List::size).orElse(0),
152+
cancellationStatsHolder.get(SearchTask.class).totalLongRunningCancelledTaskCount.count()
153+
),
149154
new SearchShardTaskCancellationStats(
150155
Optional.of(currentRunningCancelledTasks).map(mapper -> mapper.get(SearchShardTask.class)).map(List::size).orElse(0),
151156
cancellationStatsHolder.get(SearchShardTask.class).totalLongRunningCancelledTaskCount.count()

server/src/main/java/org/opensearch/tasks/TaskCancellationStats.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.tasks;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.core.common.io.stream.StreamInput;
1213
import org.opensearch.core.common.io.stream.StreamOutput;
1314
import org.opensearch.core.common.io.stream.Writeable;
@@ -22,13 +23,23 @@
2223
*/
2324
public class TaskCancellationStats implements ToXContentFragment, Writeable {
2425

26+
private final SearchTaskCancellationStats searchTaskCancellationStats;
2527
private final SearchShardTaskCancellationStats searchShardTaskCancellationStats;
2628

27-
public TaskCancellationStats(SearchShardTaskCancellationStats searchShardTaskCancellationStats) {
29+
public TaskCancellationStats(
30+
SearchTaskCancellationStats searchTaskCancellationStats,
31+
SearchShardTaskCancellationStats searchShardTaskCancellationStats
32+
) {
33+
this.searchTaskCancellationStats = searchTaskCancellationStats;
2834
this.searchShardTaskCancellationStats = searchShardTaskCancellationStats;
2935
}
3036

3137
public TaskCancellationStats(StreamInput in) throws IOException {
38+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
39+
searchTaskCancellationStats = new SearchTaskCancellationStats(in);
40+
} else {
41+
searchTaskCancellationStats = new SearchTaskCancellationStats(0, 0);
42+
}
3243
searchShardTaskCancellationStats = new SearchShardTaskCancellationStats(in);
3344
}
3445

@@ -37,15 +48,24 @@ protected SearchShardTaskCancellationStats getSearchShardTaskCancellationStats()
3748
return this.searchShardTaskCancellationStats;
3849
}
3950

51+
// package private for testing
52+
protected SearchTaskCancellationStats getSearchTaskCancellationStats() {
53+
return this.searchTaskCancellationStats;
54+
}
55+
4056
@Override
4157
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
4258
builder.startObject("task_cancellation");
59+
builder.field("search_task", searchTaskCancellationStats);
4360
builder.field("search_shard_task", searchShardTaskCancellationStats);
4461
return builder.endObject();
4562
}
4663

4764
@Override
4865
public void writeTo(StreamOutput out) throws IOException {
66+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
67+
searchTaskCancellationStats.writeTo(out);
68+
}
4969
searchShardTaskCancellationStats.writeTo(out);
5070
}
5171

@@ -54,11 +74,12 @@ public boolean equals(Object o) {
5474
if (this == o) return true;
5575
if (o == null || getClass() != o.getClass()) return false;
5676
TaskCancellationStats that = (TaskCancellationStats) o;
57-
return Objects.equals(searchShardTaskCancellationStats, that.searchShardTaskCancellationStats);
77+
return Objects.equals(searchTaskCancellationStats, that.searchTaskCancellationStats)
78+
&& Objects.equals(searchShardTaskCancellationStats, that.searchShardTaskCancellationStats);
5879
}
5980

6081
@Override
6182
public int hashCode() {
62-
return Objects.hash(searchShardTaskCancellationStats);
83+
return Objects.hash(searchTaskCancellationStats, searchShardTaskCancellationStats);
6384
}
6485
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.tasks;
10+
11+
import org.opensearch.core.common.io.stream.Writeable;
12+
import org.opensearch.test.AbstractWireSerializingTestCase;
13+
14+
public class SearchTaskCancellationStatsTests extends AbstractWireSerializingTestCase<SearchTaskCancellationStats> {
15+
@Override
16+
protected Writeable.Reader<SearchTaskCancellationStats> instanceReader() {
17+
return SearchTaskCancellationStats::new;
18+
}
19+
20+
@Override
21+
protected SearchTaskCancellationStats createTestInstance() {
22+
return randomInstance();
23+
}
24+
25+
public static SearchTaskCancellationStats randomInstance() {
26+
return new SearchTaskCancellationStats(randomNonNegativeLong(), randomNonNegativeLong());
27+
}
28+
}

0 commit comments

Comments
 (0)