Skip to content

[Pull-based Ingestion] Support versioning in pull-based ingestion #17918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add TermsQuery support to Search GRPC endpoint ([#17888](https://github.com/opensearch-project/OpenSearch/pull/17888))
- Support sub agg in filter rewrite optimization ([#17447](https://github.com/opensearch-project/OpenSearch/pull/17447)
- Disable scoring of keyword term search by default, fallback logic with new use_similarity:true parameter ([#17889](https://github.com/opensearch-project/OpenSearch/pull/17889))
- Add versioning support in pull-based ingestion ([#17918](https://github.com/opensearch-project/OpenSearch/pull/17918))

### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ protected void produceData(String id, String name, String age, long timestamp, S
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
}

protected void produceDataWithExternalVersion(String id, long version, String name, String age, long timestamp, String opType) {
String payload = String.format(
Locale.ROOT,
"{\"_id\":\"%s\", \"_version\":\"%d\", \"_op_type\":\"%s\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
id,
version,
opType,
name,
age
);
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
}

protected void produceData(String payload) {
producer.send(new ProducerRecord<>(topicName, null, defaultMessageTimestamp, "null", payload));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.client.Requests;
Expand Down Expand Up @@ -310,6 +312,122 @@ public void testPaginatedGetIngestionState() throws ExecutionException, Interrup
}));
}

public void testExternalVersioning() throws Exception {
// setup nodes and index
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();

createIndexWithDefaultSettings(1, 1);
ensureGreen(indexName);
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));

// validate next version docs get indexed
produceDataWithExternalVersion("1", 2, "name1", "30", defaultMessageTimestamp, "index");
produceDataWithExternalVersion("2", 2, "name2", "30", defaultMessageTimestamp, "index");
waitForState(() -> {
BoolQueryBuilder query1 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 1));
SearchResponse response1 = client().prepareSearch(indexName).setQuery(query1).get();
assertThat(response1.getHits().getTotalHits().value(), is(1L));
BoolQueryBuilder query2 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 2));
SearchResponse response2 = client().prepareSearch(indexName).setQuery(query2).get();
assertThat(response2.getHits().getTotalHits().value(), is(1L));
return 30 == (Integer) response1.getHits().getHits()[0].getSourceAsMap().get("age")
&& 30 == (Integer) response2.getHits().getHits()[0].getSourceAsMap().get("age");
});

// test out-of-order updates
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
produceDataWithExternalVersion("3", 1, "name3", "25", defaultMessageTimestamp, "index");
waitForSearchableDocs(3, Arrays.asList(nodeA, nodeB));

BoolQueryBuilder query1 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 1));
SearchResponse response1 = client().prepareSearch(indexName).setQuery(query1).get();
assertThat(response1.getHits().getTotalHits().value(), is(1L));
assertEquals(30, response1.getHits().getHits()[0].getSourceAsMap().get("age"));

BoolQueryBuilder query2 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 2));
SearchResponse response2 = client().prepareSearch(indexName).setQuery(query2).get();
assertThat(response2.getHits().getTotalHits().value(), is(1L));
assertEquals(30, response2.getHits().getHits()[0].getSourceAsMap().get("age"));

// test deletes with smaller version
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "delete");
produceDataWithExternalVersion("4", 1, "name4", "25", defaultMessageTimestamp, "index");
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(23);
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(4L));

// test deletes with correct version
produceDataWithExternalVersion("1", 3, "name1", "30", defaultMessageTimestamp, "delete");
produceDataWithExternalVersion("2", 3, "name2", "30", defaultMessageTimestamp, "delete");
waitForState(() -> {
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(23);
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
return true;
});
}

public void testExternalVersioningWithDisabledGCDeletes() throws Exception {
// setup nodes and index
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();

createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
.put("index.gc_deletes", "0")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

// insert documents
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
waitForState(() -> {
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(23);
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
return true;
});

// delete documents 1 and 2
produceDataWithExternalVersion("1", 2, "name1", "25", defaultMessageTimestamp, "delete");
produceDataWithExternalVersion("2", 2, "name2", "25", defaultMessageTimestamp, "delete");
produceDataWithExternalVersion("3", 1, "name3", "25", defaultMessageTimestamp, "index");
waitForState(() -> {
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 3));
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
return 25 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
});
waitForSearchableDocs(1, Arrays.asList(nodeA, nodeB));

// validate index operation with lower version creates new document
produceDataWithExternalVersion("1", 1, "name1", "35", defaultMessageTimestamp, "index");
produceDataWithExternalVersion("4", 1, "name4", "35", defaultMessageTimestamp, "index");
waitForState(() -> {
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(34);
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
return true;
});

}

private void verifyRemoteStoreEnabled(String node) {
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");
Expand Down
143 changes: 126 additions & 17 deletions server/src/main/java/org/opensearch/index/engine/IngestionEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
import org.opensearch.action.admin.indices.streamingingestion.state.ShardIngestionState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IngestionSource;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.index.IngestionConsumerFactory;
import org.opensearch.index.IngestionShardConsumer;
import org.opensearch.index.IngestionShardPointer;
import org.opensearch.index.VersionType;
import org.opensearch.index.mapper.DocumentMapperForType;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.ParseContext;
Expand Down Expand Up @@ -47,6 +51,7 @@
import java.util.function.BiFunction;

import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_LOCATION;
import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_SNAPSHOT;

/**
Expand Down Expand Up @@ -157,15 +162,45 @@
/**
* Indexes the document into the engine. This is used internally by the stream poller only.
* @param index the index request
* @return the index result
* @throws IOException if an error occurs
*/
public IndexResult indexInternal(Index index) throws IOException {
public void indexInternal(Index index) throws IOException {
// todo: add number of inserts/updates metric
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
ensureOpen();
final IndexResult indexResult;
indexResult = indexIntoLucene(index);
return indexResult;

try (
ReleasableLock releasableLock1 = readLock.acquire();
Releasable releasableLock2 = versionMap.acquireLock(index.uid().bytes())
) {
ensureOpen();
lastWriteNanos = index.startTime();
boolean isExternalVersioning = index.versionType() == VersionType.EXTERNAL;
if (index.getAutoGeneratedIdTimestamp() == UNSET_AUTO_GENERATED_TIMESTAMP) {
validateDocumentVersion(index);
}

if (isExternalVersioning) {
index.parsedDoc().version().setLongValue(index.version());

Check warning on line 183 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L183

Added line #L183 was not covered by tests
}

IndexResult indexResult = indexIntoLucene(index);
if (isExternalVersioning && indexResult.getResultType() == Result.Type.SUCCESS) {
versionMap.maybePutIndexUnderLock(
index.uid().bytes(),
new IndexVersionValue(EMPTY_TRANSLOG_LOCATION, index.version(), index.seqNo(), index.primaryTerm())

Check warning on line 190 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L188-L190

Added lines #L188 - L190 were not covered by tests
);
}
} catch (VersionConflictEngineException e) {
logger.debug("Version conflict encountered when processing index operation", e);
throw e;
} catch (RuntimeException | IOException e) {

Check warning on line 196 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L193-L196

Added lines #L193 - L196 were not covered by tests
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;

Check warning on line 202 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L198-L202

Added lines #L198 - L202 were not covered by tests
}
}

private IndexResult indexIntoLucene(Index index) throws IOException {
Expand Down Expand Up @@ -203,18 +238,56 @@
/**
* Processes delete operations. This is used internally by the stream poller only.
*/
public DeleteResult deleteInternal(Delete delete) throws IOException {
public void deleteInternal(Delete delete) throws IOException {
// todo: add number of deletes metric
versionMap.enforceSafeAccess();

Check warning on line 243 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L243

Added line #L243 was not covered by tests
assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field();
ensureOpen();
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.id());
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set ["
+ doc
+ " ]";
doc.add(softDeletesField);
indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField);
return new DeleteResult(1, delete.primaryTerm(), -1, true);
lastWriteNanos = delete.startTime();

Check warning on line 245 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L245

Added line #L245 was not covered by tests

try (
ReleasableLock releasableLock1 = readLock.acquire();
Releasable releasableLock2 = versionMap.acquireLock(delete.uid().bytes())

Check warning on line 249 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L248-L249

Added lines #L248 - L249 were not covered by tests
) {
ensureOpen();
validateDocumentVersion(delete);
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.id());

Check warning on line 253 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L251-L253

Added lines #L251 - L253 were not covered by tests
boolean isExternalVersioning = delete.versionType() == VersionType.EXTERNAL;
if (isExternalVersioning) {
tombstone.version().setLongValue(delete.version());

Check warning on line 256 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L256

Added line #L256 was not covered by tests
}

assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
final ParseContext.Document doc = tombstone.docs().get(0);

Check warning on line 260 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L260

Added line #L260 was not covered by tests
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set ["
+ doc
+ " ]";
doc.add(softDeletesField);

Check warning on line 264 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L264

Added line #L264 was not covered by tests

indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField);

Check warning on line 266 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L266

Added line #L266 was not covered by tests
if (isExternalVersioning) {
versionMap.putDeleteUnderLock(
delete.uid().bytes(),

Check warning on line 269 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L268-L269

Added lines #L268 - L269 were not covered by tests
new DeleteVersionValue(
delete.version(),
delete.seqNo(),
delete.primaryTerm(),
engineConfig.getThreadPool().relativeTimeInMillis()

Check warning on line 274 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L271-L274

Added lines #L271 - L274 were not covered by tests
)
);
}
} catch (VersionConflictEngineException e) {
logger.debug("Version conflict encountered when processing deletes", e);
throw e;
} catch (RuntimeException | IOException e) {

Check warning on line 281 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L278-L281

Added lines #L278 - L281 were not covered by tests
try {
maybeFailEngine("delete", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}

Check warning on line 288 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L283-L288

Added lines #L283 - L288 were not covered by tests

maybePruneDeletes();

Check warning on line 290 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L290

Added line #L290 was not covered by tests
}

@Override
Expand All @@ -229,6 +302,15 @@
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
}

@Override
protected void pruneDeletedTombstones() {
final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
final long maxTimestampToPrune = timeMSec - engineConfig.getIndexSettings().getGcDeletesInMillis();
// prune based only on timestamp and not sequence number
versionMap.pruneTombstones(maxTimestampToPrune, Long.MAX_VALUE);
lastDeleteVersionPruneTimeMSec = timeMSec;
}

@Override
public Translog.Snapshot newChangesSnapshot(
String source,
Expand Down Expand Up @@ -381,6 +463,33 @@
streamPoller.updateErrorStrategy(updatedIngestionErrorStrategy);
}

/**
* Validates document version for pull-based ingestion. Only external versioning is supported.
*/
private void validateDocumentVersion(final Operation operation) throws IOException {
if (operation.versionType() != VersionType.EXTERNAL) {
return;
}

versionMap.enforceSafeAccess();
final VersionValue versionValue = resolveDocVersion(operation, false);

Check warning on line 475 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L474-L475

Added lines #L474 - L475 were not covered by tests
final long currentVersion;
final boolean currentNotFoundOrDeleted;

if (versionValue == null) {
// todo: possible to optimize addDoc instead of updateDoc if version is not present?
currentVersion = Versions.NOT_FOUND;
currentNotFoundOrDeleted = true;

Check warning on line 482 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L481-L482

Added lines #L481 - L482 were not covered by tests
} else {
currentVersion = versionValue.version;
currentNotFoundOrDeleted = versionValue.isDelete();

Check warning on line 485 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L484-L485

Added lines #L484 - L485 were not covered by tests
}

if (operation.versionType().isVersionConflictForWrites(currentVersion, operation.version(), currentNotFoundOrDeleted)) {
throw new VersionConflictEngineException(shardId, operation, currentVersion, currentNotFoundOrDeleted);

Check warning on line 489 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L489

Added line #L489 was not covered by tests
}
}

Check warning on line 491 in server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java#L491

Added line #L491 was not covered by tests

/**
* Pause the poller. Used by management flows.
*/
Expand Down
Loading
Loading