Skip to content

Commit 1f5f393

Browse files
deshsiddansjcyjed326
authored and
wangdongyu.danny
committed
Create listener to refresh search thread resource usage (opensearch-project#14832)
* [bug fix] fix incorrect coordinator node search resource usages Signed-off-by: Chenyang Ji <[email protected]> * fix bug on serialization when passing task resource usage to coordinator Signed-off-by: Chenyang Ji <[email protected]> * add more unit tests Signed-off-by: Chenyang Ji <[email protected]> * remove query insights plugin related code Signed-off-by: Chenyang Ji <[email protected]> * create per request listener to refresh task resource usage Signed-off-by: Chenyang Ji <[email protected]> * Make new listener API public Signed-off-by: Siddhant Deshmukh <[email protected]> * Add changelog Signed-off-by: Siddhant Deshmukh <[email protected]> * Remove wrong files added Signed-off-by: Siddhant Deshmukh <[email protected]> * Address review comments Signed-off-by: Siddhant Deshmukh <[email protected]> * Build fix Signed-off-by: Siddhant Deshmukh <[email protected]> * Make singleton Signed-off-by: Siddhant Deshmukh <[email protected]> * Address review comments Signed-off-by: Siddhant Deshmukh <[email protected]> * Make sure listener runs before plugin listeners Signed-off-by: Siddhant Deshmukh <[email protected]> * Spotless Signed-off-by: Siddhant Deshmukh <[email protected]> * Minor fix Signed-off-by: Siddhant Deshmukh <[email protected]> --------- Signed-off-by: Chenyang Ji <[email protected]> Signed-off-by: Siddhant Deshmukh <[email protected]> Signed-off-by: Jay Deng <[email protected]> Co-authored-by: Chenyang Ji <[email protected]> Co-authored-by: Jay Deng <[email protected]>
1 parent 3f8b3d6 commit 1f5f393

File tree

3 files changed

+42
-7
lines changed

3 files changed

+42
-7
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2727
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
2828
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))
2929
- Add persian_stem filter (([#14847](https://github.com/opensearch-project/OpenSearch/pull/14847)))
30+
- Create listener to refresh search thread resource usage ([#14832](https://github.com/opensearch-project/OpenSearch/pull/14832))
3031
- Add rest, transport layer changes for hot to warm tiering - dedicated setup (([#13980](https://github.com/opensearch-project/OpenSearch/pull/13980))
3132

3233
### Dependencies
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.search;
10+
11+
import org.opensearch.tasks.TaskResourceTrackingService;
12+
13+
/**
14+
* SearchTaskRequestOperationsListener subscriber for operations on search tasks resource usages.
15+
* Listener ensures to refreshResourceStats on request end capturing the search task resource usage
16+
* upon request completion.
17+
*
18+
*/
19+
public final class SearchTaskRequestOperationsListener extends SearchRequestOperationsListener {
20+
private final TaskResourceTrackingService taskResourceTrackingService;
21+
22+
public SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) {
23+
this.taskResourceTrackingService = taskResourceTrackingService;
24+
}
25+
26+
@Override
27+
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
28+
taskResourceTrackingService.refreshResourceStats(context.getTask());
29+
}
30+
}

server/src/main/java/org/opensearch/node/Node.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.opensearch.action.search.SearchRequestOperationsListener;
5353
import org.opensearch.action.search.SearchRequestSlowLog;
5454
import org.opensearch.action.search.SearchRequestStats;
55+
import org.opensearch.action.search.SearchTaskRequestOperationsListener;
5556
import org.opensearch.action.search.SearchTransportService;
5657
import org.opensearch.action.support.TransportAction;
5758
import org.opensearch.action.update.UpdateHelper;
@@ -855,8 +856,17 @@ protected Node(
855856
threadPool
856857
);
857858

859+
final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService(
860+
settings,
861+
clusterService.getClusterSettings(),
862+
threadPool
863+
);
864+
858865
final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService.getClusterSettings());
859866
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService);
867+
final SearchTaskRequestOperationsListener searchTaskRequestOperationsListener = new SearchTaskRequestOperationsListener(
868+
taskResourceTrackingService
869+
);
860870

861871
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings);
862872
CacheModule cacheModule = new CacheModule(pluginsService.filterPlugins(CachePlugin.class), settings);
@@ -988,7 +998,7 @@ protected Node(
988998
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
989999
new SearchRequestOperationsCompositeListenerFactory(
9901000
Stream.concat(
991-
Stream.of(searchRequestStats, searchRequestSlowLog),
1001+
Stream.of(searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener),
9921002
pluginComponents.stream()
9931003
.filter(p -> p instanceof SearchRequestOperationsListener)
9941004
.map(p -> (SearchRequestOperationsListener) p)
@@ -1117,12 +1127,6 @@ protected Node(
11171127
// development. Then we can deprecate Getter and Setter for IndexingPressureService in ClusterService (#478).
11181128
clusterService.setIndexingPressureService(indexingPressureService);
11191129

1120-
final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService(
1121-
settings,
1122-
clusterService.getClusterSettings(),
1123-
threadPool
1124-
);
1125-
11261130
final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings(
11271131
settings,
11281132
clusterService.getClusterSettings()

0 commit comments

Comments
 (0)