Skip to content

Commit 225c31a

Browse files
initial code for the sandbox resource tracking and cancellation framework
Signed-off-by: Kiran Prakash <[email protected]>
1 parent 4700be3 commit 225c31a

29 files changed

+1246
-3
lines changed

server/src/main/java/org/opensearch/action/search/SearchShardTask.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.opensearch.core.tasks.TaskId;
3838
import org.opensearch.search.fetch.ShardFetchSearchRequest;
3939
import org.opensearch.search.internal.ShardSearchRequest;
40+
import org.opensearch.search.sandboxing.SandboxTask;
4041
import org.opensearch.tasks.CancellableTask;
4142
import org.opensearch.tasks.SearchBackpressureTask;
4243

@@ -50,9 +51,10 @@
5051
* @opensearch.api
5152
*/
5253
@PublicApi(since = "1.0.0")
53-
public class SearchShardTask extends CancellableTask implements SearchBackpressureTask {
54+
public class SearchShardTask extends CancellableTask implements SearchBackpressureTask, SandboxTask {
5455
// generating metadata in a lazy way since source can be quite big
5556
private final MemoizedSupplier<String> metadataSupplier;
57+
private String sandboxId;
5658

5759
public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
5860
this(id, type, action, description, parentTaskId, headers, () -> "");
@@ -84,4 +86,14 @@ public boolean supportsResourceTracking() {
8486
public boolean shouldCancelChildrenOnCancellation() {
8587
return false;
8688
}
89+
90+
@Override
91+
public void setSandboxId(String sandboxId) {
92+
this.sandboxId = sandboxId;
93+
}
94+
95+
@Override
96+
public String getSandboxId() {
97+
return "sandboxId";
98+
}
8799
}

server/src/main/java/org/opensearch/action/search/SearchTask.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.common.annotation.PublicApi;
3636
import org.opensearch.common.unit.TimeValue;
3737
import org.opensearch.core.tasks.TaskId;
38+
import org.opensearch.search.sandboxing.SandboxTask;
3839
import org.opensearch.tasks.CancellableTask;
3940
import org.opensearch.tasks.SearchBackpressureTask;
4041

@@ -49,10 +50,11 @@
4950
* @opensearch.api
5051
*/
5152
@PublicApi(since = "1.0.0")
52-
public class SearchTask extends CancellableTask implements SearchBackpressureTask {
53+
public class SearchTask extends CancellableTask implements SearchBackpressureTask, SandboxTask {
5354
// generating description in a lazy way since source can be quite big
5455
private final Supplier<String> descriptionSupplier;
5556
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
57+
private String sandboxId;
5658

5759
public SearchTask(
5860
long id,
@@ -106,4 +108,13 @@ public final SearchProgressListener getProgressListener() {
106108
public boolean shouldCancelChildrenOnCancellation() {
107109
return true;
108110
}
111+
112+
public void setSandboxId(String sandboxId) {
113+
this.sandboxId = sandboxId;
114+
}
115+
116+
@Override
117+
public String getSandboxId() {
118+
return "sandboxId";
119+
}
109120
}

server/src/main/java/org/opensearch/cluster/metadata/Metadata.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,12 @@ public boolean isSegmentReplicationEnabled(String indexName) {
126126
.orElse(false);
127127
}
128128

129-
/**
129+
public Map<String, Sandbox> sandboxes() {
130+
// stub
131+
return Collections.emptyMap();
132+
}
133+
134+
/**
130135
* Context of the XContent.
131136
*
132137
* @opensearch.api
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.metadata;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType;
13+
14+
import java.util.Collections;
15+
import java.util.List;
16+
17+
@ExperimentalApi
18+
public class Sandbox {
19+
//TODO Kaushal should have implemented hashcode and equals
20+
private SandboxMode mode;
21+
22+
public SandboxMode getMode() {
23+
return mode;
24+
}
25+
26+
public ResourceLimit getResourceLimitFor(SandboxResourceType resourceType) {
27+
return null;
28+
}
29+
30+
public String getName() {
31+
return "";
32+
}
33+
34+
public String getId() {
35+
return "";
36+
}
37+
38+
public List<ResourceLimit> getResourceLimits() {
39+
return Collections.emptyList();
40+
}
41+
42+
@ExperimentalApi
43+
public class ResourceLimit {
44+
public Long getThresholdInLong() {
45+
return 0L;
46+
}
47+
48+
public SandboxResourceType getResourceType() {
49+
return null;
50+
}
51+
52+
public Long getThreshold() {
53+
return 0L;
54+
}
55+
}
56+
57+
@ExperimentalApi
58+
public enum SandboxMode {
59+
SOFT("soft"),
60+
ENFORCED("enforced"),
61+
MONITOR("monitor");
62+
63+
private final String name;
64+
65+
SandboxMode(String mode) {
66+
this.name = mode;
67+
}
68+
69+
public String getName() {
70+
return name;
71+
}
72+
73+
public static SandboxMode fromName(String s) {
74+
switch (s) {
75+
case "soft":
76+
return SOFT;
77+
case "enforced":
78+
return ENFORCED;
79+
case "monitor":
80+
return MONITOR;
81+
default:
82+
throw new IllegalArgumentException("Invalid value for SandboxMode: " + s);
83+
}
84+
}
85+
86+
}
87+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.sandboxing;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType;
13+
import org.opensearch.tasks.Task;
14+
15+
import java.util.ArrayList;
16+
import java.util.HashMap;
17+
import java.util.List;
18+
import java.util.Map;
19+
import java.util.Objects;
20+
21+
@ExperimentalApi
22+
public class SandboxLevelResourceUsageView {
23+
24+
private final String sandboxId;
25+
private final Map<SandboxResourceType, Long> resourceUsage;
26+
private final List<Task> activeTasks;
27+
28+
public SandboxLevelResourceUsageView(String sandboxId) {
29+
this.sandboxId = sandboxId;
30+
this.resourceUsage = new HashMap<>();
31+
this.activeTasks = new ArrayList<>();
32+
}
33+
34+
public SandboxLevelResourceUsageView(String sandboxId, Map<SandboxResourceType, Long> resourceUsage, List<Task> activeTasks) {
35+
this.sandboxId = sandboxId;
36+
this.resourceUsage = resourceUsage;
37+
this.activeTasks = activeTasks;
38+
}
39+
40+
public Map<SandboxResourceType, Long> getResourceUsageData() {
41+
return resourceUsage;
42+
}
43+
44+
public List<Task> getActiveTasks() {
45+
return activeTasks;
46+
}
47+
48+
@Override
49+
public boolean equals(Object o) {
50+
if (this == o) return true;
51+
if (o == null || getClass() != o.getClass()) return false;
52+
SandboxLevelResourceUsageView that = (SandboxLevelResourceUsageView) o;
53+
return Objects.equals(sandboxId, that.sandboxId);
54+
}
55+
56+
@Override
57+
public int hashCode() {
58+
return Objects.hashCode(sandboxId);
59+
}
60+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.sandboxing;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.cluster.metadata.Sandbox;
14+
import org.opensearch.cluster.service.ClusterService;
15+
import org.opensearch.common.inject.Inject;
16+
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
17+
import org.opensearch.search.sandboxing.cancellation.DefaultTaskCancellation;
18+
import org.opensearch.search.sandboxing.cancellation.LongestRunningTaskFirstStrategy;
19+
import org.opensearch.search.sandboxing.tracker.SandboxUsageTracker;
20+
import org.opensearch.threadpool.Scheduler;
21+
import org.opensearch.threadpool.ThreadPool;
22+
23+
import java.io.IOException;
24+
import java.util.HashSet;
25+
import java.util.Map;
26+
import java.util.Set;
27+
28+
/**
29+
* Main service which will run periodically to track and cancel resource constraint violating tasks in sandboxes
30+
*/
31+
public class SandboxService extends AbstractLifecycleComponent {
32+
private static final Logger logger = LogManager.getLogger(SandboxService.class);
33+
34+
private final SandboxUsageTracker sandboxUsageTracker;
35+
private volatile Scheduler.Cancellable scheduledFuture;
36+
private final ThreadPool threadPool;
37+
private final ClusterService clusterService;
38+
39+
/**
40+
* Guice managed constructor
41+
*
42+
* @param sandboxUsageTracker
43+
* @param sandboxPruner
44+
* @param sandboxServiceSettings
45+
* @param threadPool
46+
*/
47+
@Inject
48+
public SandboxService(
49+
SandboxUsageTracker sandboxUsageTracker,
50+
ClusterService clusterService,
51+
ThreadPool threadPool
52+
) {
53+
this.sandboxUsageTracker = sandboxUsageTracker;
54+
this.sandboxServiceSettings = sandboxServiceSettings;
55+
this.sandboxPruner = sandboxPruner;
56+
this.clusterService = clusterService;
57+
this.threadPool = threadPool;
58+
}
59+
60+
/**
61+
* run at regular interval
62+
*/
63+
private void doRun() {
64+
Map<String, SandboxLevelResourceUsageView> sandboxLevelResourceUsageViews = sandboxUsageTracker.constructSandboxLevelUsageViews();
65+
Set<Sandbox> activeSandboxes = getActiveSandboxes();
66+
DefaultTaskCancellation taskCancellation = new DefaultTaskCancellation(
67+
new LongestRunningTaskFirstStrategy(),
68+
sandboxLevelResourceUsageViews,
69+
activeSandboxes
70+
);
71+
taskCancellation.cancelTasks();
72+
// TODO Prune the sandboxes
73+
}
74+
75+
private Set<Sandbox> getActiveSandboxes() {
76+
return new HashSet<>(clusterService.state().metadata().sandboxes().values());
77+
}
78+
79+
/**
80+
* {@link AbstractLifecycleComponent} lifecycle method
81+
*/
82+
@Override
83+
protected void doStart() {
84+
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> {
85+
try {
86+
doRun();
87+
} catch (Exception e) {
88+
logger.debug("Exception occurred in Query Sandbox service", e);
89+
}
90+
}, sandboxServiceSettings.getRunIntervalMillis(), ThreadPool.Names.GENERIC);
91+
}
92+
93+
@Override
94+
protected void doStop() {
95+
if (scheduledFuture != null) {
96+
scheduledFuture.cancel();
97+
}
98+
}
99+
100+
@Override
101+
protected void doClose() throws IOException {
102+
}
103+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.sandboxing;
10+
11+
/**
12+
* This interface can be implemented by tasks which will be tracked and monitored using {@link org.opensearch.cluster.metadata.ResourceLimitGroup}
13+
*/
14+
public interface SandboxTask {
15+
void setSandboxId(String sandboxId);
16+
17+
String getSandboxId();
18+
}

0 commit comments

Comments
 (0)