Skip to content

Commit 6dad756

Browse files
Fix conflicts
Signed-off-by: Prudhvi Godithi <[email protected]>
2 parents bbe949c + fe4a98d commit 6dad756

File tree

70 files changed

+2302
-494
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+2302
-494
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3131
- Upgrade crypto kms plugin dependencies for AWS SDK v2.x. ([#18268](https://github.com/opensearch-project/OpenSearch/pull/18268))
3232
- Add support for `matched_fields` with the unified highlighter ([#18164](https://github.com/opensearch-project/OpenSearch/issues/18164))
3333
- [repository-s3] Add support for SSE-KMS and S3 bucket owner verification ([#18312](https://github.com/opensearch-project/OpenSearch/pull/18312))
34+
- Optimize gRPC perf by passing by reference ([#18303](https://github.com/opensearch-project/OpenSearch/pull/18303))
3435
- Added File Cache Stats - Involves Block level as well as full file level stats ([#17538](https://github.com/opensearch-project/OpenSearch/issues/17479))
36+
- Added File Cache Pinning ([#17617](https://github.com/opensearch-project/OpenSearch/issues/13648))
37+
- Support consumer reset in Resume API for pull-based ingestion. This PR includes a breaking change for the experimental pull-based ingestion feature. ([#18332](https://github.com/opensearch-project/OpenSearch/pull/18332))
3538

3639
### Changed
3740
- Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269)))
41+
- Change implementation for `percentiles` aggregation for latency improvement [#18124](https://github.com/opensearch-project/OpenSearch/pull/18124)
3842

3943
### Dependencies
4044
- Update Apache Lucene from 10.1.0 to 10.2.1 ([#17961](https://github.com/opensearch-project/OpenSearch/pull/17961))

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ protobuf = "3.25.5"
2525
jakarta_annotation = "1.3.5"
2626
google_http_client = "1.44.1"
2727
google_auth = "1.29.0"
28-
tdigest = "3.3"
28+
tdigest = "3.3" # Warning: Before updating tdigest, ensure its serialization code for MergingDigest hasn't changed
2929
hdrhistogram = "2.2.2"
3030
grpc = "1.68.2"
3131
json_smart = "2.5.2"

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void testKafkaIngestion_RewindByTimeStamp() {
8585
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
8686
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
8787
.put("ingestion_source.type", "kafka")
88-
.put("ingestion_source.pointer.init.reset", "rewind_by_timestamp")
88+
.put("ingestion_source.pointer.init.reset", "reset_by_timestamp")
8989
// 1739459500000 is the timestamp of the first message
9090
// 1739459800000 is the timestamp of the second message
9191
// by resetting to 1739459600000, only the second message will be ingested
@@ -115,7 +115,7 @@ public void testKafkaIngestion_RewindByOffset() {
115115
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
116116
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
117117
.put("ingestion_source.type", "kafka")
118-
.put("ingestion_source.pointer.init.reset", "rewind_by_offset")
118+
.put("ingestion_source.pointer.init.reset", "reset_by_offset")
119119
.put("ingestion_source.pointer.init.reset.value", "1")
120120
.put("ingestion_source.param.topic", "test")
121121
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.kafka.clients.producer.ProducerRecord;
1616
import org.apache.kafka.common.serialization.StringSerializer;
1717
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
18+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
1819
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
1920
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
2021
import org.opensearch.action.pagination.PageParams;
@@ -176,6 +177,15 @@ protected ResumeIngestionResponse resumeIngestion(String indexName) throws Execu
176177
return client().admin().indices().resumeIngestion(Requests.resumeIngestionRequest(indexName)).get();
177178
}
178179

180+
protected ResumeIngestionResponse resumeIngestion(
181+
String index,
182+
int shard,
183+
ResumeIngestionRequest.ResetSettings.ResetMode mode,
184+
String value
185+
) throws ExecutionException, InterruptedException {
186+
return client().admin().indices().resumeIngestion(Requests.resumeIngestionRequest(index, shard, mode, value)).get();
187+
}
188+
179189
protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
180190
createIndexWithDefaultSettings(indexName, numShards, numReplicas, 1);
181191
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
1414
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
15+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
1516
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
1617
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
1718
import org.opensearch.action.pagination.PageParams;
@@ -498,6 +499,132 @@ public void testClusterWriteBlock() throws Exception {
498499
waitForSearchableDocs(4, Arrays.asList(nodeB, nodeC));
499500
}
500501

502+
public void testOffsetUpdateOnBlockErrorPolicy() throws Exception {
503+
// setup nodes and index using block strategy
504+
// produce one invalid message to block the processor
505+
produceData("1", "name1", "21");
506+
produceData("{\"_op_type\":\"invalid\",\"_source\":{\"name\":\"name4\", \"age\": 25}}");
507+
produceData("2", "name2", "22");
508+
produceData("3", "name3", "24");
509+
produceData("4", "name4", "24");
510+
internalCluster().startClusterManagerOnlyNode();
511+
final String nodeA = internalCluster().startDataOnlyNode();
512+
final String nodeB = internalCluster().startDataOnlyNode();
513+
514+
createIndex(
515+
indexName,
516+
Settings.builder()
517+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
518+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
519+
.put("ingestion_source.type", "kafka")
520+
.put("ingestion_source.error_strategy", "block")
521+
.put("ingestion_source.pointer.init.reset", "earliest")
522+
.put("ingestion_source.internal_queue_size", "1000")
523+
.put("ingestion_source.param.topic", topicName)
524+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
525+
.put("index.replication.type", "SEGMENT")
526+
.build(),
527+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
528+
);
529+
530+
ensureGreen(indexName);
531+
// expect only 1 document to be successfully indexed
532+
waitForSearchableDocs(1, Arrays.asList(nodeA, nodeB));
533+
534+
// pause ingestion
535+
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
536+
assertTrue(pauseResponse.isAcknowledged());
537+
assertTrue(pauseResponse.isShardsAcknowledged());
538+
waitForState(() -> {
539+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
540+
return ingestionState.getFailedShards() == 0
541+
&& Arrays.stream(ingestionState.getShardStates())
542+
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
543+
});
544+
// revalidate that only 1 document is visible
545+
waitForSearchableDocs(1, Arrays.asList(nodeA, nodeB));
546+
547+
// update offset to skip past the invalid message
548+
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "2");
549+
assertTrue(resumeResponse.isAcknowledged());
550+
assertTrue(resumeResponse.isShardsAcknowledged());
551+
waitForState(() -> {
552+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
553+
return Arrays.stream(ingestionState.getShardStates())
554+
.allMatch(
555+
state -> state.isPollerPaused() == false
556+
&& (state.pollerState().equalsIgnoreCase("polling") || state.pollerState().equalsIgnoreCase("processing"))
557+
);
558+
});
559+
560+
// validate remaining messages are successfully indexed
561+
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
562+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
563+
.getPollingIngestStats();
564+
assertThat(stats.getConsumerStats().totalDuplicateMessageSkippedCount(), is(0L));
565+
}
566+
567+
public void testConsumerResetByTimestamp() throws Exception {
568+
produceData("1", "name1", "21", 100, "index");
569+
produceData("2", "name2", "22", 105, "index");
570+
produceData("3", "name3", "24", 110, "index");
571+
produceData("4", "name4", "24", 120, "index");
572+
internalCluster().startClusterManagerOnlyNode();
573+
final String nodeA = internalCluster().startDataOnlyNode();
574+
final String nodeB = internalCluster().startDataOnlyNode();
575+
576+
createIndex(
577+
indexName,
578+
Settings.builder()
579+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
580+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
581+
.put("ingestion_source.type", "kafka")
582+
.put("ingestion_source.error_strategy", "drop")
583+
.put("ingestion_source.pointer.init.reset", "earliest")
584+
.put("ingestion_source.internal_queue_size", "1000")
585+
.put("ingestion_source.param.topic", topicName)
586+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
587+
.put("index.replication.type", "SEGMENT")
588+
.build(),
589+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
590+
);
591+
592+
ensureGreen(indexName);
593+
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
594+
595+
// expect error response since ingestion not yet paused
596+
ResumeIngestionResponse resumeResponse = resumeIngestion(
597+
indexName,
598+
0,
599+
ResumeIngestionRequest.ResetSettings.ResetMode.TIMESTAMP,
600+
"100"
601+
);
602+
assertTrue(resumeResponse.isAcknowledged());
603+
assertFalse(resumeResponse.isShardsAcknowledged());
604+
assertEquals(1, resumeResponse.getShardFailures().length);
605+
606+
// pause ingestion
607+
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
608+
assertTrue(pauseResponse.isAcknowledged());
609+
assertTrue(pauseResponse.isShardsAcknowledged());
610+
waitForState(() -> {
611+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
612+
return ingestionState.getFailedShards() == 0
613+
&& Arrays.stream(ingestionState.getShardStates())
614+
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
615+
});
616+
617+
// reset consumer by a timestamp after first message was produced
618+
resumeResponse = resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.TIMESTAMP, "102");
619+
assertTrue(resumeResponse.isAcknowledged());
620+
assertTrue(resumeResponse.isShardsAcknowledged());
621+
waitForState(() -> {
622+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
623+
.getPollingIngestStats();
624+
return stats.getConsumerStats().totalDuplicateMessageSkippedCount() == 3;
625+
});
626+
}
627+
501628
private void verifyRemoteStoreEnabled(String node) {
502629
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
503630
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.kafka;
10+
11+
import com.carrotsearch.randomizedtesting.ThreadFilter;
12+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
13+
14+
import org.apache.kafka.clients.producer.KafkaProducer;
15+
import org.apache.kafka.clients.producer.Producer;
16+
import org.apache.kafka.clients.producer.ProducerRecord;
17+
import org.apache.kafka.common.serialization.StringSerializer;
18+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
19+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
20+
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
21+
import org.opensearch.action.search.SearchResponse;
22+
import org.opensearch.cluster.metadata.IndexMetadata;
23+
import org.opensearch.common.settings.Settings;
24+
import org.opensearch.index.query.RangeQueryBuilder;
25+
import org.opensearch.indices.pollingingest.PollingIngestStats;
26+
import org.opensearch.plugins.Plugin;
27+
import org.opensearch.test.OpenSearchSingleNodeTestCase;
28+
import org.opensearch.transport.client.Requests;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
32+
import java.util.Arrays;
33+
import java.util.Collection;
34+
import java.util.Collections;
35+
import java.util.Properties;
36+
import java.util.concurrent.Callable;
37+
import java.util.concurrent.ExecutionException;
38+
import java.util.concurrent.TimeUnit;
39+
40+
import org.testcontainers.containers.KafkaContainer;
41+
import org.testcontainers.utility.DockerImageName;
42+
43+
@ThreadLeakFilters(filters = KafkaSingleNodeTests.TestContainerThreadLeakFilter.class)
44+
public class KafkaSingleNodeTests extends OpenSearchSingleNodeTestCase {
45+
private KafkaContainer kafka;
46+
private Producer<String, String> producer;
47+
private final String topicName = "test";
48+
private final String indexName = "testindex";
49+
private final String mappings = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}";
50+
51+
@Override
52+
protected Collection<Class<? extends Plugin>> getPlugins() {
53+
return Collections.singleton(KafkaPlugin.class);
54+
}
55+
56+
@Before
57+
public void setup() {
58+
setupKafka();
59+
}
60+
61+
@After
62+
public void cleanup() {
63+
stopKafka();
64+
}
65+
66+
public void testPauseAndResumeAPIs() throws Exception {
67+
produceData("{\"_id\":\"1\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 25}}");
68+
produceData("{\"_id\":\"2\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 25}}");
69+
70+
createIndexWithMappingSource(
71+
indexName,
72+
Settings.builder()
73+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
74+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
75+
.put("ingestion_source.type", "kafka")
76+
.put("ingestion_source.pointer.init.reset", "earliest")
77+
.put("ingestion_source.param.topic", topicName)
78+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
79+
.put("index.replication.type", "SEGMENT")
80+
.build(),
81+
mappings
82+
);
83+
ensureGreen(indexName);
84+
85+
waitForState(() -> {
86+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
87+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
88+
return response.getHits().getTotalHits().value() == 2;
89+
});
90+
91+
ResumeIngestionResponse resumeResponse = client().admin()
92+
.indices()
93+
.resumeIngestion(Requests.resumeIngestionRequest(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0"))
94+
.get();
95+
assertTrue(resumeResponse.isAcknowledged());
96+
assertFalse(resumeResponse.isShardsAcknowledged());
97+
assertEquals(1, resumeResponse.getShardFailures().length);
98+
99+
// pause ingestion
100+
client().admin().indices().pauseIngestion(Requests.pauseIngestionRequest(indexName)).get();
101+
waitForState(() -> {
102+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
103+
return ingestionState.getFailedShards() == 0
104+
&& Arrays.stream(ingestionState.getShardStates())
105+
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
106+
});
107+
108+
produceData("{\"_id\":\"1\",\"_version\":\"2\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 30}}");
109+
produceData("{\"_id\":\"2\",\"_version\":\"2\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 30}}");
110+
111+
// resume ingestion with offset reset
112+
client().admin()
113+
.indices()
114+
.resumeIngestion(Requests.resumeIngestionRequest(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0"))
115+
.get();
116+
waitForState(() -> {
117+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
118+
return Arrays.stream(ingestionState.getShardStates())
119+
.allMatch(
120+
state -> state.isPollerPaused() == false
121+
&& (state.pollerState().equalsIgnoreCase("polling") || state.pollerState().equalsIgnoreCase("processing"))
122+
);
123+
});
124+
125+
// validate duplicate messages are skipped
126+
waitForState(() -> {
127+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
128+
.getPollingIngestStats();
129+
return stats.getConsumerStats().totalDuplicateMessageSkippedCount() == 2;
130+
});
131+
}
132+
133+
private void setupKafka() {
134+
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
135+
// disable topic auto creation
136+
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
137+
kafka.start();
138+
139+
// setup producer
140+
String boostrapServers = kafka.getBootstrapServers();
141+
KafkaUtils.createTopic(topicName, 1, boostrapServers);
142+
Properties props = new Properties();
143+
props.put("bootstrap.servers", kafka.getBootstrapServers());
144+
producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
145+
}
146+
147+
private void stopKafka() {
148+
if (producer != null) {
149+
producer.close();
150+
}
151+
152+
if (kafka != null) {
153+
kafka.stop();
154+
}
155+
}
156+
157+
private void produceData(String payload) {
158+
producer.send(new ProducerRecord<>(topicName, null, 1739459500000L, "null", payload));
159+
}
160+
161+
protected GetIngestionStateResponse getIngestionState(String indexName) throws ExecutionException, InterruptedException {
162+
return client().admin().indices().getIngestionState(Requests.getIngestionStateRequest(indexName)).get();
163+
}
164+
165+
protected void waitForState(Callable<Boolean> checkState) throws Exception {
166+
assertBusy(() -> {
167+
if (checkState.call() == false) {
168+
fail("Provided state requirements not met");
169+
}
170+
}, 1, TimeUnit.MINUTES);
171+
}
172+
173+
public static final class TestContainerThreadLeakFilter implements ThreadFilter {
174+
@Override
175+
public boolean reject(Thread t) {
176+
return t.getName().startsWith("testcontainers-pull-watchdog-")
177+
|| t.getName().startsWith("testcontainers-ryuk")
178+
|| t.getName().startsWith("stream-poller-consumer");
179+
}
180+
}
181+
}

0 commit comments

Comments
 (0)