Skip to content

Restrict input stream modification in existing seekable stream supervisors #17955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/ingestion/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ For configuration properties shared across all streaming ingestion methods, refe

|Property|Type|Description|Required|Default|
|--------|----|-----------|--------|-------|
|`topic`|String|The Kafka topic to read from. To ingest data from multiple topic, use `topicPattern`. |Yes if `topicPattern` isn't set.||
|`topic`|String|The Kafka topic to read from. Note that once this value is established for a supervisor, updating it is not supported. To ingest data from multiple topic, use `topicPattern`. |Yes if `topicPattern` isn't set.||
|`topicPattern`|String|Multiple Kafka topics to read from, passed as a regex pattern. See [Ingest from multiple topics](#ingest-from-multiple-topics) for more information.|Yes if `topic` isn't set.||
|`consumerProperties`|String, Object|A map of properties to pass to the Kafka consumer. See [Consumer properties](#consumer-properties) for details.|Yes. At the minimum, you must set the `bootstrap.servers` property to establish the initial connection to the Kafka cluster.||
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds.|No|100|
Expand All @@ -134,6 +134,14 @@ If you enable multi-topic ingestion for a datasource, downgrading to a version o
28.0.0 will cause the ingestion for that datasource to fail.
:::

:::info
Migrating an existing supervisor to use `topicPattern` instead of `topic` is not supported. It is also not supported to change the `topicPattern` of an existing supervisor to a different regex pattern.
Copy link
Contributor

@kfaraz kfaraz Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this.

Maybe you can put it in a list format for better readability like

The following updates to a supervisor spec are not supported:
- item 1
- item 2
- item 3

Follow these steps instead: 
- Step 1
- Step 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with list

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

You can force the migration by doing the following:
1. Suspend the supervisor.
2. Reset the offsets.
3. Submit updated supervisor.
:::

You can ingest data from one or multiple topics.
When ingesting data from multiple topics, Druid assigns partitions based on the hashcode of the topic name and the ID of the partition within that topic. The partition assignment might not be uniform across all the tasks. Druid assumes that partitions across individual topics have similar load. If you want to ingest from both high and low load topics in the same supervisor, it is recommended that you have a higher number of partitions for a high load topic and a lower number of partitions for a low load topic.

Expand Down Expand Up @@ -456,4 +464,4 @@ See the following topics for more information:
* [Supervisor API](../api-reference/supervisor-api.md) for how to manage and monitor supervisors using the API.
* [Supervisor](../ingestion/supervisor.md) for supervisor status and capacity planning.
* [Loading from Apache Kafka](../tutorials/tutorial-kafka.md) for a tutorial on streaming data from Apache Kafka.
* [Kafka input format](../ingestion/data-formats.md#kafka) to learn about the `kafka` input format.
* [Kafka input format](../ingestion/data-formats.md#kafka) to learn about the `kafka` input format.
5 changes: 5 additions & 0 deletions extensions-core/kafka-indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
Expand Down Expand Up @@ -176,6 +180,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend)
);
}

/**
* Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to ensure that the proposed spec and current spec are either both multi-topic or both single-topic.
* <p>
* getSource() returns the same string (exampleTopic) for "topicPattern=exampleTopic" and "topic=exampleTopic".
* This override prevents this case from being considered a valid update.
* </p>
* @param proposedSpec the proposed supervisor spec
* @throws DruidException if the proposed spec is not a Kafka spec or if the proposed spec changes from multi-topic to single-topic or vice versa
*/
@Override
public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidException
{
if (!(proposedSpec instanceof KafkaSupervisorSpec)) {
throw InvalidInput.exception(
"Cannot change spec from type[%s] to type[%s]", getClass().getSimpleName(), proposedSpec.getClass().getSimpleName()
);
}
KafkaSupervisorSpec other = (KafkaSupervisorSpec) proposedSpec;
if (this.getSpec().getIOConfig().isMultiTopic() != other.getSpec().getIOConfig().isMultiTopic()) {
throw InvalidInput.exception(
SeekableStreamSupervisorSpec.ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE,
StringUtils.format("(%s) %s", this.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic", this.getSource()),
StringUtils.format("(%s) %s", other.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic", other.getSource())
);
}

super.validateSpecUpdateTo(proposedSpec);
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,37 @@
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.TestSupervisorSpec;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.Map;

import static org.junit.Assert.assertThrows;

public class KafkaSupervisorSpecTest
{
Expand Down Expand Up @@ -578,4 +592,175 @@ public void testSuspendResume() throws IOException

Assert.assertFalse(runningSpec.isSuspended());
}

@Test
public void test_validateSpecUpdateTo()
{
KafkaSupervisorSpec sourceSpec = getSpec("metrics", null);

// Proposed spec being non-kafka is not allowed
TestSupervisorSpec otherSpec = new TestSupervisorSpec("test", new Object());
MatcherAssert.assertThat(
assertThrows(DruidException.class, () -> sourceSpec.validateSpecUpdateTo(otherSpec)),
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"invalidInput"
).expectMessageIs(
StringUtils.format("Cannot change spec from type[%s] to type[%s]", sourceSpec.getClass().getSimpleName(), otherSpec.getClass().getSimpleName())
)
);

KafkaSupervisorSpec multiTopicProposedSpec = getSpec(null, "metrics-.*");
MatcherAssert.assertThat(
assertThrows(DruidException.class, () -> sourceSpec.validateSpecUpdateTo(multiTopicProposedSpec)),
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"invalidInput"
).expectMessageIs(
"Update of the input source stream from [(single-topic) metrics] to [(multi-topic) metrics-.*] is not supported for a running supervisor."
+ "\nTo perform the update safely, follow these steps:"
+ "\n(1) Suspend this supervisor, reset its offsets and then terminate it. "
+ "\n(2) Create a new supervisor with the new input source stream."
+ "\nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too."
)
);

KafkaSupervisorSpec singleTopicNewStreamProposedSpec = getSpec("metricsNew", null);
MatcherAssert.assertThat(
assertThrows(DruidException.class, () -> sourceSpec.validateSpecUpdateTo(singleTopicNewStreamProposedSpec)),
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"invalidInput"
).expectMessageIs(
"Update of the input source stream from [metrics] to [metricsNew] is not supported for a running supervisor."
+ "\nTo perform the update safely, follow these steps:"
+ "\n(1) Suspend this supervisor, reset its offsets and then terminate it. "
+ "\n(2) Create a new supervisor with the new input source stream."
+ "\nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too."
)
);

KafkaSupervisorSpec multiTopicMatchingSourceString = getSpec(null, "metrics");
MatcherAssert.assertThat(
assertThrows(DruidException.class, () -> sourceSpec.validateSpecUpdateTo(multiTopicMatchingSourceString)),
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"invalidInput"
).expectMessageIs(
"Update of the input source stream from [(single-topic) metrics] to [(multi-topic) metrics] is not supported for a running supervisor."
+ "\nTo perform the update safely, follow these steps:"
+ "\n(1) Suspend this supervisor, reset its offsets and then terminate it. "
+ "\n(2) Create a new supervisor with the new input source stream."
+ "\nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too."
)
);

// test the inverse as well
MatcherAssert.assertThat(
assertThrows(DruidException.class, () -> multiTopicMatchingSourceString.validateSpecUpdateTo(sourceSpec)),
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"invalidInput"
).expectMessageIs(
"Update of the input source stream from [(multi-topic) metrics] to [(single-topic) metrics] is not supported for a running supervisor."
+ "\nTo perform the update safely, follow these steps:"
+ "\n(1) Suspend this supervisor, reset its offsets and then terminate it. "
+ "\n(2) Create a new supervisor with the new input source stream."
+ "\nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too."
)
);

// Test valid spec update. This spec changes context vs the sourceSpec
KafkaSupervisorSpec validDestSpec = new KafkaSupervisorSpec(
null,
DataSchema.builder().withDataSource("testDs").withAggregators(new CountAggregatorFactory("rows")).withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)).build(),
null,
new KafkaSupervisorIOConfig(
"metrics",
null,
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
null,
Map.of("bootstrap.servers", "localhost:9092"),
null,
null,
null,
null,
true,
null,
null,
null,
null,
null,
null,
null,
false
),
Map.of(
"key1",
"value1",
"key2",
"value2"
),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
sourceSpec.validateSpecUpdateTo(validDestSpec);
}

private KafkaSupervisorSpec getSpec(String topic, String topicPattern)
{
return new KafkaSupervisorSpec(
null,
DataSchema.builder().withDataSource("testDs").withAggregators(new CountAggregatorFactory("rows")).withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)).build(),
null,
new KafkaSupervisorIOConfig(
topic,
topicPattern,
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
null,
Map.of("bootstrap.servers", "localhost:9092"),
null,
null,
null,
null,
true,
null,
null,
null,
null,
null,
null,
null,
false
),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,15 @@ public boolean shouldUpdateSupervisor(SupervisorSpec spec)
try {
byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec);
Pair<Supervisor, SupervisorSpec> currentSupervisor = supervisors.get(spec.getId());
return currentSupervisor == null
|| !Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs));
if (currentSupervisor == null || currentSupervisor.rhs == null) {
return true;
} else if (Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) {
return false;
} else {
// The spec bytes are different, so we need to check if the update is allowed
currentSupervisor.rhs.validateSpecUpdateTo(spec);
return true;
}
}
catch (JsonProcessingException ex) {
log.warn("Failed to write spec as bytes for spec_id[%s]", spec.getId());
Expand Down Expand Up @@ -529,4 +536,13 @@ private StreamSupervisor requireStreamSupervisor(final String supervisorId, fina
);
}
}

@Nullable
private SupervisorSpec getSpec(String id)
{
synchronized (lock) {
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
return supervisor == null ? null : supervisor.rhs;
}
}
}
Loading
Loading