Skip to content

Commit 46d1918

Browse files
authored
Merge branch 'main' into dependabot/gradle/reactor_netty-1.2.3
Signed-off-by: gaobinlong <[email protected]>
2 parents 2eeb683 + d9a9274 commit 46d1918

File tree

26 files changed

+414
-115
lines changed

26 files changed

+414
-115
lines changed

.github/workflows/changelog_verifier.yml

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,42 +14,5 @@ jobs:
1414
token: ${{ secrets.GITHUB_TOKEN }}
1515
ref: ${{ github.event.pull_request.head.sha }}
1616
- uses: dangoslen/changelog-enforcer@v3
17-
id: verify-changelog-3x
1817
with:
1918
skipLabels: "autocut, skip-changelog"
20-
changeLogPath: 'CHANGELOG-3.0.md'
21-
continue-on-error: true
22-
- uses: dangoslen/changelog-enforcer@v3
23-
id: verify-changelog
24-
with:
25-
skipLabels: "autocut, skip-changelog"
26-
changeLogPath: 'CHANGELOG.md'
27-
continue-on-error: true
28-
- run: |
29-
# The check was possibly skipped leading to success for both the jobs
30-
has_backport_label=${{ contains(join(github.event.pull_request.labels.*.name, ', '), 'backport')}}
31-
has_breaking_label=${{ contains(join(github.event.pull_request.labels.*.name, ', '), '>breaking')}}
32-
if [[ $has_breaking_label == true && $has_backport_label == true ]]; then
33-
echo "error: Please make sure that the PR does not have a backport label associated with it when making breaking changes"
34-
exit 1
35-
fi
36-
37-
if [[ ${{ steps.verify-changelog-3x.outcome }} == 'success' && ${{ steps.verify-changelog.outcome }} == 'success' ]]; then
38-
exit 0
39-
fi
40-
41-
if [[ ${{ steps.verify-changelog-3x.outcome }} == 'failure' && ${{ steps.verify-changelog.outcome }} == 'failure' ]]; then
42-
echo "error: Please ensure a changelog entry exists in CHANGELOG.md or CHANGELOG-3.0.md"
43-
exit 1
44-
fi
45-
46-
# Concatenates the labels and checks if the string contains "backport"
47-
if [[ ${{ steps.verify-changelog.outcome }} == 'success' && $has_backport_label == false ]]; then
48-
echo "error: Please make sure that the PR has a backport label associated with it when making an entry to the CHANGELOG.md file"
49-
exit 1
50-
fi
51-
52-
if [[ ${{ steps.verify-changelog-3x.outcome }} == 'success' && $has_backport_label == true ]]; then
53-
echo "error: Please make sure that the PR does not have a backport label associated with it when making an entry to the CHANGELOG-3.0.md file"
54-
exit 1
55-
fi

CHANGELOG-3.0.md

Lines changed: 0 additions & 21 deletions
This file was deleted.

CHANGELOG.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@ All notable changes to this project are documented in this file.
33

44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). See the [CONTRIBUTING guide](./CONTRIBUTING.md#Changelog) for instructions on how to add changelog entries.
55

6-
## [Unreleased 2.x]
6+
## [Unreleased 3.x]
77
### Added
8+
- Change priority for scheduling reroute during timeout([#16445](https://github.com/opensearch-project/OpenSearch/pull/16445))
89
- Renaming the node role search to warm ([#17573](https://github.com/opensearch-project/OpenSearch/pull/17573))
10+
- Introduce a new search node role to hold search only shards ([#17620](https://github.com/opensearch-project/OpenSearch/pull/17620))
11+
912
### Dependencies
1013
- Bump `reactor_netty` from 1.1.26 to 1.2.3 ([#17322](https://github.com/opensearch-project/OpenSearch/pull/17322), [#17377](https://github.com/opensearch-project/OpenSearch/pull/17377))
14+
- Bump `ch.qos.logback:logback-core` from 1.5.16 to 1.5.17 ([#17609](https://github.com/opensearch-project/OpenSearch/pull/17609))
15+
- Bump `org.jruby.joni:joni` from 2.2.3 to 2.2.5 ([#17608](https://github.com/opensearch-project/OpenSearch/pull/17608))
1116

1217
### Changed
1318

@@ -19,4 +24,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1924

2025
### Security
2126

22-
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.19...2.x
27+
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/f58d846f...main

CONTRIBUTING.md

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -146,15 +146,6 @@ Adding in the change is two step process:
146146
1. Add your changes to the corresponding section within the CHANGELOG file with dummy pull request information, publish the PR
147147
2. Update the entry for your change in [`CHANGELOG.md`](CHANGELOG.md) and make sure that you reference the pull request there.
148148

149-
### Where should I put my CHANGELOG entry?
150-
Please review the [branching strategy](https://github.com/opensearch-project/.github/blob/main/RELEASING.md#opensearch-branching) document. The changelog on the `main` branch will contain **two files**: `CHANGELOG.md` which corresponds to unreleased changes intended for the _next minor_ release and `CHANGELOG-3.0.md` which correspond to unreleased changes intended for the _next major_ release. Your entry should go into file corresponding to the version it is intended to be released in. In practice, most changes to `main` will be backported to the next minor release so most entries will be in the `CHANGELOG.md` file.
151-
152-
The following examples assume the _next major_ release on main is 3.0, then _next minor_ release is 2.5, and the _current_ release is 2.4.
153-
154-
- **Add a new feature to release in next minor:** Add a changelog entry to `[Unreleased 2.x]` in CHANGELOG.md on main, then backport to 2.x (including the changelog entry).
155-
- **Introduce a breaking API change to release in next major:** Add a changelog entry to `[Unreleased 3.0]` to CHANGELOG-3.0.md on main, do not backport.
156-
- **Upgrade a dependency to fix a CVE:** Add a changelog entry to `[Unreleased 2.x]` on main, then backport to 2.x (including the changelog entry), then backport to 2.4 and ensure the changelog entry is added to `[Unreleased 2.4.1]`.
157-
158149
## Review Process
159150

160151
We deeply appreciate everyone who takes the time to make a contribution. We will review all contributions as quickly as possible. As a reminder, [opening an issue](https://github.com/opensearch-project/OpenSearch/issues/new/choose) discussing your change before you make it is the best way to smooth the PR process. This will prevent a rejection because someone else is already working on the problem, or because the solution is incompatible with the architectural direction.

libs/grok/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
*/
3030

3131
dependencies {
32-
api 'org.jruby.joni:joni:2.2.3'
32+
api 'org.jruby.joni:joni:2.2.5'
3333
// joni dependencies:
3434
api 'org.jruby.jcodings:jcodings:1.0.63'
3535

libs/grok/licenses/joni-2.2.3.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
4ebafe67efa7395678a34d07e7585bed5ef0cc72

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import org.testcontainers.utility.DockerImageName;
3535

3636
/**
37-
* Base test class for Kafka ingestion tests
37+
* Base test class for Kafka ingestion tests.
3838
*/
3939
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class)
4040
public class KafkaIngestionBaseIT extends OpenSearchIntegTestCase {
@@ -135,6 +135,9 @@ protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
135135
.put("ingestion_source.param.topic", topicName)
136136
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
137137
.put("index.replication.type", "SEGMENT")
138+
// set custom kafka consumer properties
139+
.put("ingestion_source.param.fetch.min.bytes", 30000)
140+
.put("ingestion_source.param.enable.auto.commit", false)
138141
.build(),
139142
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
140143
);

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public void testSegmentReplicationWithRemoteStore() throws Exception {
4848
internalCluster().startClusterManagerOnlyNode();
4949
final String nodeA = internalCluster().startDataOnlyNode();
5050
createIndexWithDefaultSettings(1, 1);
51-
5251
ensureYellowAndNoInitializingShards(indexName);
5352
final String nodeB = internalCluster().startDataOnlyNode();
5453
ensureGreen(indexName);

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.opensearch.plugin.kafka;
1010

1111
import org.apache.kafka.clients.consumer.Consumer;
12-
import org.apache.kafka.clients.consumer.ConsumerConfig;
1312
import org.apache.kafka.clients.consumer.ConsumerRecord;
1413
import org.apache.kafka.clients.consumer.ConsumerRecords;
1514
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -99,9 +98,10 @@ protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaS
9998
Properties consumerProp = new Properties();
10099
consumerProp.put("bootstrap.servers", config.getBootstrapServers());
101100
consumerProp.put("client.id", clientId);
102-
if (config.getAutoOffsetResetConfig() != null && !config.getAutoOffsetResetConfig().isEmpty()) {
103-
consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffsetResetConfig());
104-
}
101+
102+
logger.info("Kafka consumer properties for topic {}: {}", config.getTopic(), config.getConsumerConfigurations());
103+
consumerProp.putAll(config.getConsumerConfigurations());
104+
105105
// TODO: why Class org.apache.kafka.common.serialization.StringDeserializer could not be found if set the deserializer as prop?
106106
// consumerProp.put("key.deserializer",
107107
// "org.apache.kafka.common.serialization.StringDeserializer");

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.opensearch.core.util.ConfigurationUtils;
1212

13+
import java.util.HashMap;
1314
import java.util.Map;
1415

1516
/**
@@ -18,21 +19,27 @@
1819
public class KafkaSourceConfig {
1920
private final String PROP_TOPIC = "topic";
2021
private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";
21-
// TODO: support pass any generic kafka configs
2222
private final String PROP_AUTO_OFFSET_RESET = "auto.offset.reset";
2323

2424
private final String topic;
2525
private final String bootstrapServers;
2626
private final String autoOffsetResetConfig;
2727

28+
private final Map<String, Object> consumerConfigsMap;
29+
2830
/**
29-
* Constructor
31+
* Extracts and look for required and optional kafka consumer configurations.
3032
* @param params the configuration parameters
3133
*/
3234
public KafkaSourceConfig(Map<String, Object> params) {
3335
this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
3436
this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
3537
this.autoOffsetResetConfig = ConfigurationUtils.readOptionalStringProperty(params, PROP_AUTO_OFFSET_RESET);
38+
this.consumerConfigsMap = new HashMap<>(params);
39+
40+
// remove above configurations
41+
consumerConfigsMap.remove(PROP_TOPIC);
42+
consumerConfigsMap.remove(PROP_BOOTSTRAP_SERVERS);
3643
}
3744

3845
/**
@@ -60,4 +67,8 @@ public String getBootstrapServers() {
6067
public String getAutoOffsetResetConfig() {
6168
return autoOffsetResetConfig;
6269
}
70+
71+
public Map<String, Object> getConsumerConfigurations() {
72+
return consumerConfigsMap;
73+
}
6374
}

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSourceConfigTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
public class KafkaSourceConfigTests extends OpenSearchTestCase {
1818

19-
public void testConstructorAndGetters() {
19+
public void testKafkaSourceConfig() {
2020
Map<String, Object> params = new HashMap<>();
2121
params.put("topic", "topic");
2222
params.put("bootstrap_servers", "bootstrap");
23+
params.put("fetch.min.bytes", 30000);
24+
params.put("enable.auto.commit", false);
2325

2426
KafkaSourceConfig config = new KafkaSourceConfig(params);
2527

@@ -29,5 +31,7 @@ public void testConstructorAndGetters() {
2931
"bootstrap",
3032
config.getBootstrapServers()
3133
);
34+
Assert.assertEquals("Incorrect fetch.min.bytes", 30000, config.getConsumerConfigurations().get("fetch.min.bytes"));
35+
Assert.assertEquals("Incorrect enable.auto.commit", false, config.getConsumerConfigurations().get("enable.auto.commit"));
3236
}
3337
}

server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private void waitForNodes(int numNodes) {
9090
public void testNodeCounts() {
9191
int total = 1;
9292
internalCluster().startNode();
93-
Map<String, Integer> expectedCounts = getExpectedCounts(1, 1, 1, 1, 1, 0, 0);
93+
Map<String, Integer> expectedCounts = getExpectedCounts(1, 1, 1, 1, 1, 0, 0, 0);
9494
int numNodes = randomIntBetween(1, 5);
9595

9696
ClusterStatsResponse response = client().admin()
@@ -159,7 +159,7 @@ public void testNodeCountsWithDeprecatedMasterRole() throws ExecutionException,
159159
internalCluster().startNode(settings);
160160
waitForNodes(total);
161161

162-
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 0, 0, 0);
162+
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 0, 0, 0, 0);
163163

164164
Client client = client();
165165
ClusterStatsResponse response = client.admin()
@@ -484,7 +484,7 @@ public void testNodeRolesWithMasterLegacySettings() throws ExecutionException, I
484484
internalCluster().startNodes(legacyMasterSettings);
485485
waitForNodes(total);
486486

487-
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);
487+
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0, 0);
488488

489489
Client client = client();
490490
ClusterStatsResponse clusterStatsResponse = client.admin()
@@ -518,7 +518,7 @@ public void testNodeRolesWithClusterManagerRole() throws ExecutionException, Int
518518
internalCluster().startNodes(clusterManagerNodeRoleSettings);
519519
waitForNodes(total);
520520

521-
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);
521+
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0, 0);
522522

523523
Client client = client();
524524
ClusterStatsResponse clusterStatsResponse = client.admin()
@@ -546,7 +546,7 @@ public void testNodeRolesWithSeedDataNodeLegacySettings() throws ExecutionExcept
546546
internalCluster().startNodes(legacySeedDataNodeSettings);
547547
waitForNodes(total);
548548

549-
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);
549+
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0, 0);
550550

551551
Client client = client();
552552
ClusterStatsResponse clusterStatsResponse = client.admin()
@@ -577,7 +577,7 @@ public void testNodeRolesWithDataNodeLegacySettings() throws ExecutionException,
577577
internalCluster().startNodes(legacyDataNodeSettings);
578578
waitForNodes(total);
579579

580-
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);
580+
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0, 0);
581581

582582
Client client = client();
583583
ClusterStatsResponse clusterStatsResponse = client.admin()
@@ -594,6 +594,29 @@ public void testNodeRolesWithDataNodeLegacySettings() throws ExecutionException,
594594
assertEquals(expectedNodesRoles, Set.of(getNodeRoles(client, 0), getNodeRoles(client, 1)));
595595
}
596596

597+
public void testNodeRolesWithSearchNode() throws ExecutionException, InterruptedException {
598+
int total = 2;
599+
internalCluster().startClusterManagerOnlyNodes(1);
600+
internalCluster().startSearchOnlyNode();
601+
waitForNodes(total);
602+
603+
Map<String, Integer> expectedRoleCounts = getExpectedCounts(0, 1, 1, 0, 0, 0, 1, 0);
604+
605+
Client client = client();
606+
ClusterStatsResponse clusterStatsResponse = client.admin()
607+
.cluster()
608+
.prepareClusterStats()
609+
.useAggregatedNodeLevelResponses(randomBoolean())
610+
.get();
611+
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);
612+
613+
Set<Set<String>> expectedNodesRoles = Set.of(
614+
Set.of(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE.roleName()),
615+
Set.of(DiscoveryNodeRole.SEARCH_ROLE.roleName())
616+
);
617+
assertEquals(expectedNodesRoles, Set.of(getNodeRoles(client, 0), getNodeRoles(client, 1)));
618+
}
619+
597620
public void testClusterStatsWithNodeMetricsFilter() {
598621
internalCluster().startNode();
599622
ensureGreen();
@@ -887,6 +910,7 @@ private Map<String, Integer> getExpectedCounts(
887910
int clusterManagerRoleCount,
888911
int ingestRoleCount,
889912
int remoteClusterClientRoleCount,
913+
int warmRoleCount,
890914
int searchRoleCount,
891915
int coordinatingOnlyCount
892916
) {
@@ -896,7 +920,8 @@ private Map<String, Integer> getExpectedCounts(
896920
expectedCounts.put(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE.roleName(), clusterManagerRoleCount);
897921
expectedCounts.put(DiscoveryNodeRole.INGEST_ROLE.roleName(), ingestRoleCount);
898922
expectedCounts.put(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName(), remoteClusterClientRoleCount);
899-
expectedCounts.put(DiscoveryNodeRole.WARM_ROLE.roleName(), searchRoleCount);
923+
expectedCounts.put(DiscoveryNodeRole.WARM_ROLE.roleName(), warmRoleCount);
924+
expectedCounts.put(DiscoveryNodeRole.SEARCH_ROLE.roleName(), searchRoleCount);
900925
expectedCounts.put(ClusterStatsNodes.Counts.COORDINATING_ONLY, coordinatingOnlyCount);
901926
return expectedCounts;
902927
}

server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,15 @@ public boolean isWarmNode() {
488488
return roles.contains(DiscoveryNodeRole.WARM_ROLE);
489489
}
490490

491+
/**
492+
* Returns whether the node is dedicated to host search replicas.
493+
*
494+
* @return true if the node contains a search role, false otherwise
495+
*/
496+
public boolean isSearchNode() {
497+
return roles.contains(DiscoveryNodeRole.SEARCH_ROLE);
498+
}
499+
491500
/**
492501
* Returns whether the node is a remote store node.
493502
*

server/src/main/java/org/opensearch/cluster/node/DiscoveryNodeRole.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,11 +310,39 @@ public Setting<Boolean> legacySetting() {
310310

311311
};
312312

313+
/**
314+
* Represents the role for a search node, which is dedicated to host search replicas.
315+
*/
316+
public static final DiscoveryNodeRole SEARCH_ROLE = new DiscoveryNodeRole("search", "s", true) {
317+
318+
@Override
319+
public Setting<Boolean> legacySetting() {
320+
// search role is added in 2.4 so doesn't need to configure legacy setting
321+
return null;
322+
}
323+
324+
@Override
325+
public void validateRole(List<DiscoveryNodeRole> roles) {
326+
for (DiscoveryNodeRole role : roles) {
327+
if (role.equals(DiscoveryNodeRole.SEARCH_ROLE) == false) {
328+
throw new IllegalArgumentException(
329+
String.format(
330+
Locale.ROOT,
331+
"%s role cannot be combined with any other role on a node.",
332+
DiscoveryNodeRole.SEARCH_ROLE.roleName()
333+
)
334+
);
335+
}
336+
}
337+
}
338+
339+
};
340+
313341
/**
314342
* The built-in node roles.
315343
*/
316344
public static SortedSet<DiscoveryNodeRole> BUILT_IN_ROLES = Collections.unmodifiableSortedSet(
317-
new TreeSet<>(Arrays.asList(DATA_ROLE, INGEST_ROLE, CLUSTER_MANAGER_ROLE, REMOTE_CLUSTER_CLIENT_ROLE, WARM_ROLE))
345+
new TreeSet<>(Arrays.asList(DATA_ROLE, INGEST_ROLE, CLUSTER_MANAGER_ROLE, REMOTE_CLUSTER_CLIENT_ROLE, WARM_ROLE, SEARCH_ROLE))
318346
);
319347

320348
/**

0 commit comments

Comments
 (0)