Skip to content

Commit ff518ad

Browse files
soosinhadk2k
authored andcommitted
Reconfigure remote state thread pool count (opensearch-project#16245)
* Reconfigure remote state thread pool count Signed-off-by: Sooraj Sinha <[email protected]>
1 parent 9aa62b8 commit ff518ad

File tree

4 files changed

+84
-8
lines changed

4 files changed

+84
-8
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ private static int allocatedProcessors(Settings settings) {
188188
}
189189

190190
private static int urgentPoolCount(Settings settings) {
191-
return boundedBy((allocatedProcessors(settings) + 7) / 8, 1, 2);
191+
return boundedBy((allocatedProcessors(settings) + 1) / 2, 1, 2);
192192
}
193193

194194
private static int priorityPoolCount(Settings settings) {
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.s3;
10+
11+
import org.opensearch.common.settings.Settings;
12+
import org.opensearch.common.unit.SizeUnit;
13+
import org.opensearch.common.unit.SizeValue;
14+
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
15+
import org.opensearch.test.OpenSearchTestCase;
16+
import org.opensearch.threadpool.ExecutorBuilder;
17+
import org.opensearch.threadpool.ThreadPool;
18+
import org.opensearch.threadpool.ThreadPool.ThreadPoolType;
19+
20+
import java.io.IOException;
21+
import java.nio.file.Path;
22+
import java.util.List;
23+
import java.util.concurrent.Executor;
24+
25+
import static org.hamcrest.CoreMatchers.instanceOf;
26+
import static org.hamcrest.Matchers.equalTo;
27+
import static org.hamcrest.Matchers.notNullValue;
28+
29+
public class S3RepositoryPluginTests extends OpenSearchTestCase {
30+
31+
private static final String URGENT_FUTURE_COMPLETION = "urgent_future_completion";
32+
33+
public void testGetExecutorBuilders() throws IOException {
34+
final int processors = randomIntBetween(1, 64);
35+
Settings settings = Settings.builder().put("node.name", "test").put("node.processors", processors).build();
36+
Path configPath = createTempDir();
37+
ThreadPool threadPool = null;
38+
try (S3RepositoryPlugin plugin = new S3RepositoryPlugin(settings, configPath)) {
39+
List<ExecutorBuilder<?>> executorBuilders = plugin.getExecutorBuilders(settings);
40+
assertNotNull(executorBuilders);
41+
assertFalse(executorBuilders.isEmpty());
42+
threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder<?>[0]));
43+
final Executor executor = threadPool.executor(URGENT_FUTURE_COMPLETION);
44+
assertNotNull(executor);
45+
assertThat(executor, instanceOf(OpenSearchThreadPoolExecutor.class));
46+
final OpenSearchThreadPoolExecutor openSearchThreadPoolExecutor = (OpenSearchThreadPoolExecutor) executor;
47+
final ThreadPool.Info info = threadPool.info(URGENT_FUTURE_COMPLETION);
48+
int size = boundedBy((processors + 1) / 2, 1, 2);
49+
assertThat(info.getName(), equalTo(URGENT_FUTURE_COMPLETION));
50+
assertThat(info.getThreadPoolType(), equalTo(ThreadPoolType.FIXED));
51+
assertThat(info.getQueueSize(), notNullValue());
52+
assertThat(info.getQueueSize(), equalTo(new SizeValue(10, SizeUnit.KILO)));
53+
assertThat(openSearchThreadPoolExecutor.getQueue().remainingCapacity(), equalTo(10_000));
54+
55+
assertThat(info.getMin(), equalTo(size));
56+
assertThat(openSearchThreadPoolExecutor.getCorePoolSize(), equalTo(size));
57+
assertThat(info.getMax(), equalTo(size));
58+
assertThat(openSearchThreadPoolExecutor.getMaximumPoolSize(), equalTo(size));
59+
60+
final int availableProcessors = Runtime.getRuntime().availableProcessors();
61+
if (processors > availableProcessors) {
62+
assertWarnings(
63+
"setting [node.processors] to value ["
64+
+ processors
65+
+ "] which is more than available processors ["
66+
+ availableProcessors
67+
+ "] is deprecated"
68+
);
69+
}
70+
} finally {
71+
if (threadPool != null) {
72+
terminate(threadPool);
73+
}
74+
}
75+
}
76+
77+
private static int boundedBy(int value, int min, int max) {
78+
return Math.min(max, Math.max(min, value));
79+
}
80+
81+
}

server/src/main/java/org/opensearch/threadpool/ThreadPool.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -293,12 +293,7 @@ public ThreadPool(
293293
);
294294
builders.put(
295295
Names.REMOTE_STATE_READ,
296-
new ScalingExecutorBuilder(
297-
Names.REMOTE_STATE_READ,
298-
1,
299-
twiceAllocatedProcessors(allocatedProcessors),
300-
TimeValue.timeValueMinutes(5)
301-
)
296+
new ScalingExecutorBuilder(Names.REMOTE_STATE_READ, 1, boundedBy(4 * allocatedProcessors, 4, 32), TimeValue.timeValueMinutes(5))
302297
);
303298
builders.put(
304299
Names.INDEX_SEARCHER,

server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso
156156
sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessors);
157157
sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessors);
158158
sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::twiceAllocatedProcessors);
159-
sizes.put(ThreadPool.Names.REMOTE_STATE_READ, ThreadPool::twiceAllocatedProcessors);
159+
sizes.put(ThreadPool.Names.REMOTE_STATE_READ, n -> ThreadPool.boundedBy(4 * n, 4, 32));
160160
return sizes.get(threadPoolName).apply(numberOfProcessors);
161161
}
162162

0 commit comments

Comments
 (0)