Skip to content

Restrict input stream modification in existing seekable stream supervisors #17955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/ingestion/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ If you enable multi-topic ingestion for a datasource, downgrading to a version o
28.0.0 will cause the ingestion for that datasource to fail.
:::

:::info
Migrating an existing supervisor to use `topicPattern` instead of `topic` is not supported. It is also not supported to change the `topicPattern` of an existing supervisor to a different regex pattern.
Copy link
Contributor

@kfaraz kfaraz Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this.

Maybe you can put it in a list format for better readability like

The following updates to a supervisor spec are not supported:
- item 1
- item 2
- item 3

Follow these steps instead: 
- Step 1
- Step 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with list

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

You can force the migration by terminating the existing supervisor and creating a new one with the new `topicPattern`. You will have to forcefully reset the offsets of the re-submitted supervisor to the earliest or latest offsets, depending on your spec.
Note that resetting the offsets can lead to data duplication or data loss depending on the offset reset policy.
:::

You can ingest data from one or multiple topics.
When ingesting data from multiple topics, Druid assigns partitions based on the hashcode of the topic name and the ID of the partition within that topic. The partition assignment might not be uniform across all the tasks. Druid assumes that partitions across individual topics have similar load. If you want to ingest from both high and low load topics in the same supervisor, it is recommended that you have a higher number of partitions for a high load topic and a lower number of partitions for a low load topic.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ public String getSource()
return getBaseDataSource();
}

@Override
public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException
{
// No validation logic for materialized view supervisor spec as of now
}

@Override
public String getId()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
Expand Down Expand Up @@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend)
);
}

@Override
public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably want to add a javadoc here now that I notice I didn't

{
if (!(that instanceof KafkaSupervisorSpec)) {
throw new IllegalArgumentException("Cannot evolve to " + that.getType() + " from " + getType());
}
KafkaSupervisorSpec other = (KafkaSupervisorSpec) that;
if (this.getSource() == null || other.getSource() == null) {
// I don't think this is possible, but covering just in case.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// I don't think this is possible, but covering just in case.
// Not likely to happen, but covering just in case.

throw new IllegalArgumentException(
"Cannot consider KafkaSupervisorSpec evolution when one or both of the specs have not provided either a "
+ "topic OR topicPattern");
}

// Future enhancements could allow for topicPattern to be changed in a way where the new source is additive to the
// old source. If we did that now, there would be metadata issues due to {@link KafkaDataSourceMetadata}
// implementation details that aren't set up to handle evolution of metadata in this way.
if (!this.getSource().equals(other.getSource())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this impl be present for all SeekableStreamSupervisorSpecs and not just Kafka?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I was not certain on if we wanted to modify behavior of other streaming ingestion types with this PR, but I can certainly look into it if you think it would make sense. I guess for each existing implementation (kinesis, etc.) we'd have to see if this source change is currently supported and if yes, add an override to their spec class to allow it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into it and it turns out rabbit and kinesis also share the metadata management logic that wouldn't support updating streams so I made the update for all. I therefore put this in SeekableStreamSupervisorSpec. I still ended up having to have an override in Kafka cuz the multi topic single topic information created an edge case of moving between single/multi with the same exact string for topic/topicPattern. I'm not sure that my kafka override is the proper style. I do the edge case logic and then call into super implementation for the core logic. I thought that would save duplicate code, but wasn't sure if it is necessarily the right way from a java perspective

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I have left an alternative suggestion for this.

throw new IllegalArgumentException(
"Your proposed KafkaSupervisorSpec evolution is invalid. You are attempting to change the topic/topicPattern "
+ "from " + this.getSource() + " to " + other.getSource() + ". This is not supported. If you "
+ "want to change the topic or topicPattern for a supervisor, you must first terminate the supervisor. "
+ "Then create a new one in suspended state with the new topic or topicPattern. Lastly, you will have to "
+ "reset the supervisor offsets. Finally, you can resume the new supervisor. Note that doing this reset can "
+ "cause duplicate events or lost events if any topics who were in the previous supervisor remain in the new "
+ "one.");
Copy link
Contributor

@kfaraz kfaraz Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion for more concise language and formatted message.

Suggested change
"Your proposed KafkaSupervisorSpec evolution is invalid. You are attempting to change the topic/topicPattern "
+ "from " + this.getSource() + " to " + other.getSource() + ". This is not supported. If you "
+ "want to change the topic or topicPattern for a supervisor, you must first terminate the supervisor. "
+ "Then create a new one in suspended state with the new topic or topicPattern. Lastly, you will have to "
+ "reset the supervisor offsets. Finally, you can resume the new supervisor. Note that doing this reset can "
+ "cause duplicate events or lost events if any topics who were in the previous supervisor remain in the new "
+ "one.");
"Update of topic/topicPattern from [%s] to [%s] is not supported for a running Kafka supervisor."
+ "%nTo perform the update safely, follow these steps:"
+ "%n(1) Suspend this supervisor, reset its offsets and then terminate it. "
+ "%n(2) Create a new supervisor with the new topic or topicPattern."
+ "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.");

I have updated the steps, please fix it up if it doesn't seem correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yes your step logic works and is more clear than mine!

}
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.kafka.supervisor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -40,6 +41,8 @@

import java.io.IOException;

import static org.junit.Assert.assertThrows;

public class KafkaSupervisorSpecTest
{
private final ObjectMapper mapper;
Expand Down Expand Up @@ -578,4 +581,254 @@ public void testSuspendResume() throws IOException

Assert.assertFalse(runningSpec.isSuspended());
}

@Test
public void testValidateProposedSpecEvolution() throws JsonProcessingException
{
String sourceSpecJson = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"metrics-kafka\",\n"
+ " \"parser\": {\n"
+ " \"type\": \"string\",\n"
+ " \"parseSpec\": {\n"
+ " \"format\": \"json\",\n"
+ " \"timestampSpec\": {\n"
+ " \"column\": \"timestamp\",\n"
+ " \"format\": \"auto\"\n"
+ " },\n"
+ " \"dimensionsSpec\": {\n"
+ " \"dimensions\": [],\n"
+ " \"dimensionExclusions\": [\n"
+ " \"timestamp\",\n"
+ " \"value\"\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"metricsSpec\": [\n"
+ " {\n"
+ " \"name\": \"count\",\n"
+ " \"type\": \"count\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_sum\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleSum\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_min\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMin\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_max\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMax\"\n"
+ " }\n"
+ " ],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"HOUR\",\n"
+ " \"queryGranularity\": \"NONE\"\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"topic\": \"metrics\",\n"
+ " \"consumerProperties\": {\n"
+ " \"bootstrap.servers\": \"localhost:9092\"\n"
+ " },\n"
+ " \"taskCount\": 1\n"
+ " }\n"
+ "}";
KafkaSupervisorSpec sourceSpec = mapper.readValue(sourceSpecJson, KafkaSupervisorSpec.class);

// Change from topic to topicPattern is not allowed
String invalidDestSpecJson = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"metrics-kafka\",\n"
+ " \"parser\": {\n"
+ " \"type\": \"string\",\n"
+ " \"parseSpec\": {\n"
+ " \"format\": \"json\",\n"
+ " \"timestampSpec\": {\n"
+ " \"column\": \"timestamp\",\n"
+ " \"format\": \"auto\"\n"
+ " },\n"
+ " \"dimensionsSpec\": {\n"
+ " \"dimensions\": [],\n"
+ " \"dimensionExclusions\": [\n"
+ " \"timestamp\",\n"
+ " \"value\"\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"metricsSpec\": [\n"
+ " {\n"
+ " \"name\": \"count\",\n"
+ " \"type\": \"count\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_sum\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleSum\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_min\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMin\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_max\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMax\"\n"
+ " }\n"
+ " ],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"HOUR\",\n"
+ " \"queryGranularity\": \"NONE\"\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"topicPattern\": \"metrics-.*\",\n"
+ " \"consumerProperties\": {\n"
+ " \"bootstrap.servers\": \"localhost:9092\"\n"
+ " },\n"
+ " \"taskCount\": 1\n"
+ " }\n"
+ "}";
KafkaSupervisorSpec invalidDestSpec = mapper.readValue(invalidDestSpecJson, KafkaSupervisorSpec.class);

assertThrows(
IllegalArgumentException.class,
() -> sourceSpec.validateProposedSpecEvolution(invalidDestSpec)
);

// Changing topic name is not allowed
String invalidDestSpecTwoJson = "{\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to write the specs as json String or can we use objects and then just validate them, same as the other tests?

If using the json form is essential to the test, you can consider using the utility TestUtils.singleQuoteToStandardJson() to make the json easier to read.

+ " \"type\": \"kafka\",\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"metrics-kafka\",\n"
+ " \"parser\": {\n"
+ " \"type\": \"string\",\n"
+ " \"parseSpec\": {\n"
+ " \"format\": \"json\",\n"
+ " \"timestampSpec\": {\n"
+ " \"column\": \"timestamp\",\n"
+ " \"format\": \"auto\"\n"
+ " },\n"
+ " \"dimensionsSpec\": {\n"
+ " \"dimensions\": [],\n"
+ " \"dimensionExclusions\": [\n"
+ " \"timestamp\",\n"
+ " \"value\"\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"metricsSpec\": [\n"
+ " {\n"
+ " \"name\": \"count\",\n"
+ " \"type\": \"count\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_sum\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleSum\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_min\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMin\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_max\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMax\"\n"
+ " }\n"
+ " ],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"HOUR\",\n"
+ " \"queryGranularity\": \"NONE\"\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"topic\": \"metrics-new\",\n"
+ " \"consumerProperties\": {\n"
+ " \"bootstrap.servers\": \"localhost:9092\"\n"
+ " },\n"
+ " \"taskCount\": 1\n"
+ " }\n"
+ "}";
KafkaSupervisorSpec invalidDestSpecTwo = mapper.readValue(invalidDestSpecTwoJson, KafkaSupervisorSpec.class);

assertThrows(
IllegalArgumentException.class,
() -> sourceSpec.validateProposedSpecEvolution(invalidDestSpecTwo)
);

// Changing non-source related field is allowed. We change taskCount to 3
String validDestSpecJson = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"metrics-kafka\",\n"
+ " \"parser\": {\n"
+ " \"type\": \"string\",\n"
+ " \"parseSpec\": {\n"
+ " \"format\": \"json\",\n"
+ " \"timestampSpec\": {\n"
+ " \"column\": \"timestamp\",\n"
+ " \"format\": \"auto\"\n"
+ " },\n"
+ " \"dimensionsSpec\": {\n"
+ " \"dimensions\": [],\n"
+ " \"dimensionExclusions\": [\n"
+ " \"timestamp\",\n"
+ " \"value\"\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"metricsSpec\": [\n"
+ " {\n"
+ " \"name\": \"count\",\n"
+ " \"type\": \"count\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_sum\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleSum\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_min\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMin\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_max\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMax\"\n"
+ " }\n"
+ " ],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"HOUR\",\n"
+ " \"queryGranularity\": \"NONE\"\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"topic\": \"metrics\",\n"
+ " \"consumerProperties\": {\n"
+ " \"bootstrap.servers\": \"localhost:9092\"\n"
+ " },\n"
+ " \"taskCount\": 3\n"
+ " }\n"
+ "}";
KafkaSupervisorSpec validDestSpec = mapper.readValue(validDestSpecJson, KafkaSupervisorSpec.class);
sourceSpec.validateProposedSpecEvolution(validDestSpec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public String getSource()
return "";
}

@Override
public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException
{
// No validation logic for compaction spec as of now
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this should have been the default impl in the interface itself.

}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
if (shouldUpdateSpec) {
SupervisorSpec existingSpec = getSpec(spec.getId());
if (existingSpec != null) {
existingSpec.validateProposedSpecEvolution(spec);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic should probably live in shouldUpdateSupervisor method itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the move of this logic

}
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
createAndStartSupervisorInternal(spec, shouldUpdateSpec);
return shouldUpdateSpec;
Expand Down Expand Up @@ -529,4 +535,13 @@ private StreamSupervisor requireStreamSupervisor(final String supervisorId, fina
);
}
}

@Nullable
private SupervisorSpec getSpec(String id)
{
synchronized (lock) {
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
return supervisor == null ? null : supervisor.rhs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ public String getSource()
return "";
}

@Override
public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException
{
// No validation logic for scheduled batch spec as of now.
}

public CronSchedulerConfig getSchedulerConfig()
{
return schedulerConfig;
Expand Down
Loading
Loading