Skip to content

Commit a3a23a7

Browse files
committed
Merge remote-tracking branch 'origin/main' into dev/PublishCheckpointAction_use_never_give_up_retry_strategy
2 parents 3eb976e + 1be3e46 commit a3a23a7

36 files changed

+346
-213
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4040
- Add TermsQuery support to Search GRPC endpoint ([#17888](https://github.com/opensearch-project/OpenSearch/pull/17888))
4141
- Support sub agg in filter rewrite optimization ([#17447](https://github.com/opensearch-project/OpenSearch/pull/17447)
4242
- Disable scoring of keyword term search by default, fallback logic with new use_similarity:true parameter ([#17889](https://github.com/opensearch-project/OpenSearch/pull/17889))
43+
- Add versioning support in pull-based ingestion ([#17918](https://github.com/opensearch-project/OpenSearch/pull/17918))
4344

4445
### Changed
4546
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

+13
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,19 @@ protected void produceData(String id, String name, String age, long timestamp, S
108108
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
109109
}
110110

111+
protected void produceDataWithExternalVersion(String id, long version, String name, String age, long timestamp, String opType) {
112+
String payload = String.format(
113+
Locale.ROOT,
114+
"{\"_id\":\"%s\", \"_version\":\"%d\", \"_op_type\":\"%s\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
115+
id,
116+
version,
117+
opType,
118+
name,
119+
age
120+
);
121+
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
122+
}
123+
111124
protected void produceData(String payload) {
112125
producer.send(new ProducerRecord<>(topicName, null, defaultMessageTimestamp, "null", payload));
113126
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

+118
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import org.opensearch.cluster.metadata.IndexMetadata;
2020
import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
2121
import org.opensearch.common.settings.Settings;
22+
import org.opensearch.index.query.BoolQueryBuilder;
2223
import org.opensearch.index.query.RangeQueryBuilder;
24+
import org.opensearch.index.query.TermQueryBuilder;
2325
import org.opensearch.test.InternalTestCluster;
2426
import org.opensearch.test.OpenSearchIntegTestCase;
2527
import org.opensearch.transport.client.Requests;
@@ -310,6 +312,122 @@ public void testPaginatedGetIngestionState() throws ExecutionException, Interrup
310312
}));
311313
}
312314

315+
public void testExternalVersioning() throws Exception {
316+
// setup nodes and index
317+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
318+
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
319+
internalCluster().startClusterManagerOnlyNode();
320+
final String nodeA = internalCluster().startDataOnlyNode();
321+
final String nodeB = internalCluster().startDataOnlyNode();
322+
323+
createIndexWithDefaultSettings(1, 1);
324+
ensureGreen(indexName);
325+
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));
326+
327+
// validate next version docs get indexed
328+
produceDataWithExternalVersion("1", 2, "name1", "30", defaultMessageTimestamp, "index");
329+
produceDataWithExternalVersion("2", 2, "name2", "30", defaultMessageTimestamp, "index");
330+
waitForState(() -> {
331+
BoolQueryBuilder query1 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 1));
332+
SearchResponse response1 = client().prepareSearch(indexName).setQuery(query1).get();
333+
assertThat(response1.getHits().getTotalHits().value(), is(1L));
334+
BoolQueryBuilder query2 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 2));
335+
SearchResponse response2 = client().prepareSearch(indexName).setQuery(query2).get();
336+
assertThat(response2.getHits().getTotalHits().value(), is(1L));
337+
return 30 == (Integer) response1.getHits().getHits()[0].getSourceAsMap().get("age")
338+
&& 30 == (Integer) response2.getHits().getHits()[0].getSourceAsMap().get("age");
339+
});
340+
341+
// test out-of-order updates
342+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
343+
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
344+
produceDataWithExternalVersion("3", 1, "name3", "25", defaultMessageTimestamp, "index");
345+
waitForSearchableDocs(3, Arrays.asList(nodeA, nodeB));
346+
347+
BoolQueryBuilder query1 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 1));
348+
SearchResponse response1 = client().prepareSearch(indexName).setQuery(query1).get();
349+
assertThat(response1.getHits().getTotalHits().value(), is(1L));
350+
assertEquals(30, response1.getHits().getHits()[0].getSourceAsMap().get("age"));
351+
352+
BoolQueryBuilder query2 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 2));
353+
SearchResponse response2 = client().prepareSearch(indexName).setQuery(query2).get();
354+
assertThat(response2.getHits().getTotalHits().value(), is(1L));
355+
assertEquals(30, response2.getHits().getHits()[0].getSourceAsMap().get("age"));
356+
357+
// test deletes with smaller version
358+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "delete");
359+
produceDataWithExternalVersion("4", 1, "name4", "25", defaultMessageTimestamp, "index");
360+
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
361+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(23);
362+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
363+
assertThat(response.getHits().getTotalHits().value(), is(4L));
364+
365+
// test deletes with correct version
366+
produceDataWithExternalVersion("1", 3, "name1", "30", defaultMessageTimestamp, "delete");
367+
produceDataWithExternalVersion("2", 3, "name2", "30", defaultMessageTimestamp, "delete");
368+
waitForState(() -> {
369+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(23);
370+
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
371+
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
372+
return true;
373+
});
374+
}
375+
376+
public void testExternalVersioningWithDisabledGCDeletes() throws Exception {
377+
// setup nodes and index
378+
internalCluster().startClusterManagerOnlyNode();
379+
final String nodeA = internalCluster().startDataOnlyNode();
380+
final String nodeB = internalCluster().startDataOnlyNode();
381+
382+
createIndex(
383+
indexName,
384+
Settings.builder()
385+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
386+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
387+
.put("ingestion_source.type", "kafka")
388+
.put("ingestion_source.pointer.init.reset", "earliest")
389+
.put("ingestion_source.param.topic", topicName)
390+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
391+
.put("index.replication.type", "SEGMENT")
392+
.put("index.gc_deletes", "0")
393+
.build(),
394+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
395+
);
396+
397+
// insert documents
398+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
399+
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
400+
waitForState(() -> {
401+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(23);
402+
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
403+
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
404+
return true;
405+
});
406+
407+
// delete documents 1 and 2
408+
produceDataWithExternalVersion("1", 2, "name1", "25", defaultMessageTimestamp, "delete");
409+
produceDataWithExternalVersion("2", 2, "name2", "25", defaultMessageTimestamp, "delete");
410+
produceDataWithExternalVersion("3", 1, "name3", "25", defaultMessageTimestamp, "index");
411+
waitForState(() -> {
412+
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 3));
413+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
414+
assertThat(response.getHits().getTotalHits().value(), is(1L));
415+
return 25 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
416+
});
417+
waitForSearchableDocs(1, Arrays.asList(nodeA, nodeB));
418+
419+
// validate index operation with lower version creates new document
420+
produceDataWithExternalVersion("1", 1, "name1", "35", defaultMessageTimestamp, "index");
421+
produceDataWithExternalVersion("4", 1, "name4", "35", defaultMessageTimestamp, "index");
422+
waitForState(() -> {
423+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(34);
424+
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
425+
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
426+
return true;
427+
});
428+
429+
}
430+
313431
private void verifyRemoteStoreEnabled(String node) {
314432
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
315433
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

server/src/internalClusterTest/java/org/opensearch/action/admin/indices/scale/searchonly/ScaleIndexIT.java

-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.opensearch.cluster.routing.IndexShardRoutingTable;
1919
import org.opensearch.cluster.routing.ShardRouting;
2020
import org.opensearch.common.settings.Settings;
21-
import org.opensearch.common.util.FeatureFlags;
2221
import org.opensearch.core.rest.RestStatus;
2322
import org.opensearch.indices.replication.common.ReplicationType;
2423
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
@@ -41,11 +40,6 @@ public class ScaleIndexIT extends RemoteStoreBaseIntegTestCase {
4140

4241
private static final String TEST_INDEX = "test_scale_index";
4342

44-
@Override
45-
protected Settings featureFlagSettings() {
46-
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
47-
}
48-
4943
public Settings indexSettings() {
5044
return Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
5145
}

server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAllocationIT.java

-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.opensearch.cluster.routing.IndexShardRoutingTable;
1313
import org.opensearch.cluster.routing.ShardRouting;
1414
import org.opensearch.common.settings.Settings;
15-
import org.opensearch.common.util.FeatureFlags;
1615
import org.opensearch.indices.replication.common.ReplicationType;
1716
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
1817
import org.opensearch.test.OpenSearchIntegTestCase;
@@ -25,11 +24,6 @@
2524
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
2625
public class SearchReplicaAllocationIT extends RemoteStoreBaseIntegTestCase {
2726

28-
@Override
29-
protected Settings featureFlagSettings() {
30-
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
31-
}
32-
3327
public void testSearchReplicaAllocatedToDedicatedSearchNode() {
3428
internalCluster().startClusterManagerOnlyNode();
3529
String primaryNode = internalCluster().startDataOnlyNode();

server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java

-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
2020
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
2121
import org.opensearch.common.settings.Settings;
22-
import org.opensearch.common.util.FeatureFlags;
2322
import org.opensearch.indices.replication.common.ReplicationType;
2423
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
2524
import org.opensearch.test.OpenSearchIntegTestCase;
@@ -40,11 +39,6 @@ public class SearchReplicaAwarenessAllocationIT extends RemoteStoreBaseIntegTest
4039

4140
private final Logger logger = LogManager.getLogger(SearchReplicaAwarenessAllocationIT.class);
4241

43-
@Override
44-
protected Settings featureFlagSettings() {
45-
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
46-
}
47-
4842
public void testAllocationAwarenessZones() {
4943
Settings commonSettings = Settings.builder()
5044
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b")

server/src/internalClusterTest/java/org/opensearch/cluster/metadata/AutoExpandSearchReplicasIT.java

-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.opensearch.cluster.routing.UnassignedInfo;
1212
import org.opensearch.common.settings.Settings;
1313
import org.opensearch.common.unit.TimeValue;
14-
import org.opensearch.common.util.FeatureFlags;
1514
import org.opensearch.indices.replication.common.ReplicationType;
1615
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
1716
import org.opensearch.test.InternalTestCluster;
@@ -22,11 +21,6 @@
2221
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
2322
public class AutoExpandSearchReplicasIT extends RemoteStoreBaseIntegTestCase {
2423

25-
@Override
26-
protected Settings featureFlagSettings() {
27-
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
28-
}
29-
3024
public void testAutoExpandSearchReplica() throws Exception {
3125
String indexName = "test";
3226
internalCluster().startClusterManagerOnlyNode();

server/src/internalClusterTest/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceIT.java

-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.opensearch.action.search.SearchResponse;
1212
import org.opensearch.action.support.WriteRequest;
1313
import org.opensearch.common.settings.Settings;
14-
import org.opensearch.common.util.FeatureFlags;
1514
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
1615
import org.opensearch.test.OpenSearchIntegTestCase;
1716

@@ -28,11 +27,6 @@ public class MetadataIndexStateServiceIT extends RemoteStoreBaseIntegTestCase {
2827

2928
private static final String TEST_INDEX = "test_open_close_index";
3029

31-
@Override
32-
protected Settings featureFlagSettings() {
33-
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
34-
}
35-
3630
public void testIndexCloseAndOpen() throws Exception {
3731
internalCluster().startClusterManagerOnlyNode();
3832
internalCluster().startDataOnlyNodes(2);

server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationAndRecoveryIT.java

-6
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.opensearch.cluster.metadata.IndexMetadata;
2222
import org.opensearch.cluster.routing.RecoverySource;
2323
import org.opensearch.common.settings.Settings;
24-
import org.opensearch.common.util.FeatureFlags;
2524
import org.opensearch.index.SegmentReplicationPerGroupStats;
2625
import org.opensearch.index.SegmentReplicationShardStats;
2726
import org.opensearch.indices.recovery.RecoveryState;
@@ -72,11 +71,6 @@ public Settings indexSettings() {
7271
.build();
7372
}
7473

75-
@Override
76-
protected Settings featureFlagSettings() {
77-
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
78-
}
79-
8074
public void testReplication() throws Exception {
8175
internalCluster().startClusterManagerOnlyNode();
8276
final String primary = internalCluster().startDataOnlyNode();

server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java

-6
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.opensearch.cluster.metadata.IndexMetadata;
1717
import org.opensearch.cluster.metadata.Metadata;
1818
import org.opensearch.common.settings.Settings;
19-
import org.opensearch.common.util.FeatureFlags;
2019
import org.opensearch.index.query.QueryBuilders;
2120
import org.opensearch.indices.replication.common.ReplicationType;
2221
import org.opensearch.remotestore.RemoteSnapshotIT;
@@ -39,11 +38,6 @@ public class SearchReplicaRestoreIT extends RemoteSnapshotIT {
3938
private static final String FS_REPOSITORY_TYPE = "fs";
4039
private static final int DOC_COUNT = 10;
4140

42-
@Override
43-
protected Settings featureFlagSettings() {
44-
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
45-
}
46-
4741
public void testSearchReplicaRestore_WhenSnapshotOnSegRep_RestoreOnDocRepWithSearchReplica() throws Exception {
4842
bootstrapIndexWithOutSearchReplicas(ReplicationType.SEGMENT);
4943
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java

-56
This file was deleted.

server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java

-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.opensearch.cluster.routing.Preference;
1919
import org.opensearch.cluster.routing.ShardRouting;
2020
import org.opensearch.common.settings.Settings;
21-
import org.opensearch.common.util.FeatureFlags;
2221
import org.opensearch.index.query.QueryBuilders;
2322
import org.opensearch.indices.replication.common.ReplicationType;
2423
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
@@ -40,11 +39,6 @@ public class SearchOnlyReplicaIT extends RemoteStoreBaseIntegTestCase {
4039

4140
private static final String TEST_INDEX = "test_index";
4241

43-
@Override
44-
protected Settings featureFlagSettings() {
45-
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
46-
}
47-
4842
private final String expectedFailureMessage = "To set index.number_of_search_replicas, index.replication.type must be set to SEGMENT";
4943

5044
@Override

0 commit comments

Comments
 (0)