Skip to content

Commit 6426616

Browse files
Support versioning in pull-based ingestion
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 7eeb323 commit 6426616

File tree

6 files changed

+284
-26
lines changed

6 files changed

+284
-26
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4040
- Add TermsQuery support to Search GRPC endpoint ([#17888](https://github.com/opensearch-project/OpenSearch/pull/17888))
4141
- Support sub agg in filter rewrite optimization ([#17447](https://github.com/opensearch-project/OpenSearch/pull/17447)
4242
- Disable scoring of keyword term search by default, fallback logic with new use_similarity:true parameter ([#17889](https://github.com/opensearch-project/OpenSearch/pull/17889))
43+
- Add versioning support in pull-based ingestion ([#17918](https://github.com/opensearch-project/OpenSearch/pull/17918))
4344

4445
### Changed
4546
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

+13
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,19 @@ protected void produceData(String id, String name, String age, long timestamp, S
108108
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
109109
}
110110

111+
protected void produceDataWithExternalVersion(String id, long version, String name, String age, long timestamp, String opType) {
112+
String payload = String.format(
113+
Locale.ROOT,
114+
"{\"_id\":\"%s\", \"_version\":\"%d\", \"_op_type\":\"%s\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
115+
id,
116+
version,
117+
opType,
118+
name,
119+
age
120+
);
121+
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
122+
}
123+
111124
protected void produceData(String payload) {
112125
producer.send(new ProducerRecord<>(topicName, null, defaultMessageTimestamp, "null", payload));
113126
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

+118
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import org.opensearch.cluster.metadata.IndexMetadata;
2020
import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
2121
import org.opensearch.common.settings.Settings;
22+
import org.opensearch.index.query.BoolQueryBuilder;
2223
import org.opensearch.index.query.RangeQueryBuilder;
24+
import org.opensearch.index.query.TermQueryBuilder;
2325
import org.opensearch.test.InternalTestCluster;
2426
import org.opensearch.test.OpenSearchIntegTestCase;
2527
import org.opensearch.transport.client.Requests;
@@ -310,6 +312,122 @@ public void testPaginatedGetIngestionState() throws ExecutionException, Interrup
310312
}));
311313
}
312314

315+
public void testExternalVersioning() throws Exception {
316+
// setup nodes and index
317+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
318+
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
319+
internalCluster().startClusterManagerOnlyNode();
320+
final String nodeA = internalCluster().startDataOnlyNode();
321+
final String nodeB = internalCluster().startDataOnlyNode();
322+
323+
createIndexWithDefaultSettings(1, 1);
324+
ensureGreen(indexName);
325+
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));
326+
327+
// validate next version docs get indexed
328+
produceDataWithExternalVersion("1", 2, "name1", "30", defaultMessageTimestamp, "index");
329+
produceDataWithExternalVersion("2", 2, "name2", "30", defaultMessageTimestamp, "index");
330+
waitForState(() -> {
331+
BoolQueryBuilder query1 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 1));
332+
SearchResponse response1 = client().prepareSearch(indexName).setQuery(query1).get();
333+
assertThat(response1.getHits().getTotalHits().value(), is(1L));
334+
BoolQueryBuilder query2 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 2));
335+
SearchResponse response2 = client().prepareSearch(indexName).setQuery(query2).get();
336+
assertThat(response2.getHits().getTotalHits().value(), is(1L));
337+
return 30 == (Integer) response1.getHits().getHits()[0].getSourceAsMap().get("age")
338+
&& 30 == (Integer) response2.getHits().getHits()[0].getSourceAsMap().get("age");
339+
});
340+
341+
// test out-of-order updates
342+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
343+
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
344+
produceDataWithExternalVersion("3", 1, "name3", "25", defaultMessageTimestamp, "index");
345+
waitForSearchableDocs(3, Arrays.asList(nodeA, nodeB));
346+
347+
BoolQueryBuilder query1 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 1));
348+
SearchResponse response1 = client().prepareSearch(indexName).setQuery(query1).get();
349+
assertThat(response1.getHits().getTotalHits().value(), is(1L));
350+
assertEquals(30, response1.getHits().getHits()[0].getSourceAsMap().get("age"));
351+
352+
BoolQueryBuilder query2 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 2));
353+
SearchResponse response2 = client().prepareSearch(indexName).setQuery(query2).get();
354+
assertThat(response2.getHits().getTotalHits().value(), is(1L));
355+
assertEquals(30, response2.getHits().getHits()[0].getSourceAsMap().get("age"));
356+
357+
// test deletes with smaller version
358+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "delete");
359+
produceDataWithExternalVersion("4", 1, "name4", "25", defaultMessageTimestamp, "index");
360+
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
361+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(23);
362+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
363+
assertThat(response.getHits().getTotalHits().value(), is(4L));
364+
365+
// test deletes with correct version
366+
produceDataWithExternalVersion("1", 3, "name1", "30", defaultMessageTimestamp, "delete");
367+
produceDataWithExternalVersion("2", 3, "name2", "30", defaultMessageTimestamp, "delete");
368+
waitForState(() -> {
369+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(23);
370+
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
371+
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
372+
return true;
373+
});
374+
}
375+
376+
public void testExternalVersioningWithDisabledGCDeletes() throws Exception {
377+
// setup nodes and index
378+
internalCluster().startClusterManagerOnlyNode();
379+
final String nodeA = internalCluster().startDataOnlyNode();
380+
final String nodeB = internalCluster().startDataOnlyNode();
381+
382+
createIndex(
383+
indexName,
384+
Settings.builder()
385+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
386+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
387+
.put("ingestion_source.type", "kafka")
388+
.put("ingestion_source.pointer.init.reset", "earliest")
389+
.put("ingestion_source.param.topic", topicName)
390+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
391+
.put("index.replication.type", "SEGMENT")
392+
.put("index.gc_deletes", "0")
393+
.build(),
394+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
395+
);
396+
397+
// insert documents
398+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
399+
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
400+
waitForState(() -> {
401+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(23);
402+
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
403+
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
404+
return true;
405+
});
406+
407+
// delete documents 1 and 2
408+
produceDataWithExternalVersion("1", 2, "name1", "25", defaultMessageTimestamp, "delete");
409+
produceDataWithExternalVersion("2", 2, "name2", "25", defaultMessageTimestamp, "delete");
410+
produceDataWithExternalVersion("3", 1, "name3", "25", defaultMessageTimestamp, "index");
411+
waitForState(() -> {
412+
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 3));
413+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
414+
assertThat(response.getHits().getTotalHits().value(), is(1L));
415+
return 25 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
416+
});
417+
waitForSearchableDocs(1, Arrays.asList(nodeA, nodeB));
418+
419+
// validate index operation with lower version creates new document
420+
produceDataWithExternalVersion("1", 1, "name1", "35", defaultMessageTimestamp, "index");
421+
produceDataWithExternalVersion("4", 1, "name4", "35", defaultMessageTimestamp, "index");
422+
waitForState(() -> {
423+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(34);
424+
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
425+
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
426+
return true;
427+
});
428+
429+
}
430+
313431
private void verifyRemoteStoreEnabled(String node) {
314432
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
315433
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

+124-15
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
import org.opensearch.action.admin.indices.streamingingestion.state.ShardIngestionState;
1717
import org.opensearch.cluster.metadata.IndexMetadata;
1818
import org.opensearch.cluster.metadata.IngestionSource;
19+
import org.opensearch.common.lease.Releasable;
1920
import org.opensearch.common.lucene.Lucene;
21+
import org.opensearch.common.lucene.uid.Versions;
22+
import org.opensearch.common.util.concurrent.ReleasableLock;
2023
import org.opensearch.index.IngestionConsumerFactory;
2124
import org.opensearch.index.IngestionShardConsumer;
2225
import org.opensearch.index.IngestionShardPointer;
26+
import org.opensearch.index.VersionType;
2327
import org.opensearch.index.mapper.DocumentMapperForType;
2428
import org.opensearch.index.mapper.IdFieldMapper;
2529
import org.opensearch.index.mapper.ParseContext;
@@ -47,6 +51,7 @@
4751
import java.util.function.BiFunction;
4852

4953
import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
54+
import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_LOCATION;
5055
import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_SNAPSHOT;
5156

5257
/**
@@ -157,15 +162,45 @@ public IndexResult index(Index index) throws IOException {
157162
/**
158163
* Indexes the document into the engine. This is used internally by the stream poller only.
159164
* @param index the index request
160-
* @return the index result
161165
* @throws IOException if an error occurs
162166
*/
163-
public IndexResult indexInternal(Index index) throws IOException {
167+
public void indexInternal(Index index) throws IOException {
168+
// todo: add number of inserts/updates metric
164169
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
165170
ensureOpen();
166-
final IndexResult indexResult;
167-
indexResult = indexIntoLucene(index);
168-
return indexResult;
171+
172+
try (
173+
ReleasableLock releasableLock1 = readLock.acquire();
174+
Releasable releasableLock2 = versionMap.acquireLock(index.uid().bytes())
175+
) {
176+
lastWriteNanos = index.startTime();
177+
boolean isExternalVersioning = index.versionType() == VersionType.EXTERNAL;
178+
if (index.getAutoGeneratedIdTimestamp() == UNSET_AUTO_GENERATED_TIMESTAMP) {
179+
validateDocumentVersion(index);
180+
}
181+
182+
if (isExternalVersioning) {
183+
index.parsedDoc().version().setLongValue(index.version());
184+
}
185+
186+
IndexResult indexResult = indexIntoLucene(index);
187+
if (isExternalVersioning && indexResult.getResultType() == Result.Type.SUCCESS) {
188+
versionMap.maybePutIndexUnderLock(
189+
index.uid().bytes(),
190+
new IndexVersionValue(EMPTY_TRANSLOG_LOCATION, index.version(), index.seqNo(), index.primaryTerm())
191+
);
192+
}
193+
} catch (VersionConflictEngineException e) {
194+
logger.debug("Version conflict encountered when processing index operation", e);
195+
throw e;
196+
} catch (RuntimeException | IOException e) {
197+
try {
198+
maybeFailEngine("index", e);
199+
} catch (Exception inner) {
200+
e.addSuppressed(inner);
201+
}
202+
throw e;
203+
}
169204
}
170205

171206
private IndexResult indexIntoLucene(Index index) throws IOException {
@@ -203,18 +238,56 @@ public DeleteResult delete(Delete delete) throws IOException {
203238
/**
204239
* Processes delete operations. This is used internally by the stream poller only.
205240
*/
206-
public DeleteResult deleteInternal(Delete delete) throws IOException {
241+
public void deleteInternal(Delete delete) throws IOException {
242+
// todo: add number of deletes metric
243+
versionMap.enforceSafeAccess();
207244
assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field();
208245
ensureOpen();
209-
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.id());
210-
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
211-
final ParseContext.Document doc = tombstone.docs().get(0);
212-
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set ["
213-
+ doc
214-
+ " ]";
215-
doc.add(softDeletesField);
216-
indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField);
217-
return new DeleteResult(1, delete.primaryTerm(), -1, true);
246+
lastWriteNanos = delete.startTime();
247+
248+
try (
249+
ReleasableLock releasableLock1 = readLock.acquire();
250+
Releasable releasableLock2 = versionMap.acquireLock(delete.uid().bytes())
251+
) {
252+
validateDocumentVersion(delete);
253+
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.id());
254+
boolean isExternalVersioning = delete.versionType() == VersionType.EXTERNAL;
255+
if (isExternalVersioning) {
256+
tombstone.version().setLongValue(delete.version());
257+
}
258+
259+
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
260+
final ParseContext.Document doc = tombstone.docs().get(0);
261+
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set ["
262+
+ doc
263+
+ " ]";
264+
doc.add(softDeletesField);
265+
266+
indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField);
267+
if (isExternalVersioning) {
268+
versionMap.putDeleteUnderLock(
269+
delete.uid().bytes(),
270+
new DeleteVersionValue(
271+
delete.version(),
272+
delete.seqNo(),
273+
delete.primaryTerm(),
274+
engineConfig.getThreadPool().relativeTimeInMillis()
275+
)
276+
);
277+
}
278+
} catch (VersionConflictEngineException e) {
279+
logger.debug("Version conflict encountered when processing deletes", e);
280+
throw e;
281+
} catch (RuntimeException | IOException e) {
282+
try {
283+
maybeFailEngine("delete", e);
284+
} catch (Exception inner) {
285+
e.addSuppressed(inner);
286+
}
287+
throw e;
288+
}
289+
290+
maybePruneDeletes();
218291
}
219292

220293
@Override
@@ -229,6 +302,15 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
229302
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
230303
}
231304

305+
@Override
306+
protected void pruneDeletedTombstones() {
307+
final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
308+
final long maxTimestampToPrune = timeMSec - engineConfig.getIndexSettings().getGcDeletesInMillis();
309+
// prune based only on timestamp and not sequence number
310+
versionMap.pruneTombstones(maxTimestampToPrune, Long.MAX_VALUE);
311+
lastDeleteVersionPruneTimeMSec = timeMSec;
312+
}
313+
232314
@Override
233315
public Translog.Snapshot newChangesSnapshot(
234316
String source,
@@ -381,6 +463,33 @@ private void updateErrorHandlingStrategy(IngestionErrorStrategy.ErrorStrategy er
381463
streamPoller.updateErrorStrategy(updatedIngestionErrorStrategy);
382464
}
383465

466+
/**
467+
* Validates document version for pull-based ingestion. Only external versioning is supported.
468+
*/
469+
private void validateDocumentVersion(final Operation operation) throws IOException {
470+
if (operation.versionType() != VersionType.EXTERNAL) {
471+
return;
472+
}
473+
474+
versionMap.enforceSafeAccess();
475+
final VersionValue versionValue = resolveDocVersion(operation, false);
476+
final long currentVersion;
477+
final boolean currentNotFoundOrDeleted;
478+
479+
if (versionValue == null) {
480+
// todo: possible to optimize addDoc instead of updateDoc if version is not present?
481+
currentVersion = Versions.NOT_FOUND;
482+
currentNotFoundOrDeleted = true;
483+
} else {
484+
currentVersion = versionValue.version;
485+
currentNotFoundOrDeleted = versionValue.isDelete();
486+
}
487+
488+
if (operation.versionType().isVersionConflictForWrites(currentVersion, operation.version(), currentNotFoundOrDeleted)) {
489+
throw new VersionConflictEngineException(shardId, operation, currentVersion, currentNotFoundOrDeleted);
490+
}
491+
}
492+
384493
/**
385494
* Pause the poller. Used by management flows.
386495
*/

0 commit comments

Comments
 (0)