-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Restrict input stream modification in existing seekable stream supervisors #17955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
155aa3b
349e3fa
162da49
ea56bdf
a9095cb
7a9bcd5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -30,6 +30,7 @@ | |||||||||||||||||||||||||
import org.apache.druid.indexing.overlord.TaskMaster; | ||||||||||||||||||||||||||
import org.apache.druid.indexing.overlord.TaskStorage; | ||||||||||||||||||||||||||
import org.apache.druid.indexing.overlord.supervisor.Supervisor; | ||||||||||||||||||||||||||
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; | ||||||||||||||||||||||||||
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; | ||||||||||||||||||||||||||
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; | ||||||||||||||||||||||||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter; | ||||||||||||||||||||||||||
|
@@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend) | |||||||||||||||||||||||||
); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||
public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I probably want to add a javadoc here now that I notice I didn't |
||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||
if (!(that instanceof KafkaSupervisorSpec)) { | ||||||||||||||||||||||||||
throw new IllegalArgumentException("Cannot evolve to " + that.getType() + " from " + getType()); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
KafkaSupervisorSpec other = (KafkaSupervisorSpec) that; | ||||||||||||||||||||||||||
if (this.getSource() == null || other.getSource() == null) { | ||||||||||||||||||||||||||
// I don't think this is possible, but covering just in case. | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||
throw new IllegalArgumentException( | ||||||||||||||||||||||||||
"Cannot consider KafkaSupervisorSpec evolution when one or both of the specs have not provided either a " | ||||||||||||||||||||||||||
+ "topic OR topicPattern"); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Future enhancements could allow for topicPattern to be changed in a way where the new source is additive to the | ||||||||||||||||||||||||||
// old source. If we did that now, there would be metadata issues due to {@link KafkaDataSourceMetadata} | ||||||||||||||||||||||||||
// implementation details that aren't set up to handle evolution of metadata in this way. | ||||||||||||||||||||||||||
if (!this.getSource().equals(other.getSource())) { | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this impl be present for all There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. I was not certain on if we wanted to modify behavior of other streaming ingestion types with this PR, but I can certainly look into it if you think it would make sense. I guess for each existing implementation (kinesis, etc.) we'd have to see if this source change is currently supported and if yes, add an override to their spec class to allow it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked into it and it turns out rabbit and kinesis also share the metadata management logic that wouldn't support updating streams so I made the update for all. I therefore put this in SeekableStreamSupervisorSpec. I still ended up having to have an override in Kafka cuz the multi topic single topic information created an edge case of moving between single/multi with the same exact string for topic/topicPattern. I'm not sure that my kafka override is the proper style. I do the edge case logic and then call into super implementation for the core logic. I thought that would save duplicate code, but wasn't sure if it is necessarily the right way from a java perspective There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! I have left an alternative suggestion for this. |
||||||||||||||||||||||||||
throw new IllegalArgumentException( | ||||||||||||||||||||||||||
"Your proposed KafkaSupervisorSpec evolution is invalid. You are attempting to change the topic/topicPattern " | ||||||||||||||||||||||||||
+ "from " + this.getSource() + " to " + other.getSource() + ". This is not supported. If you " | ||||||||||||||||||||||||||
+ "want to change the topic or topicPattern for a supervisor, you must first terminate the supervisor. " | ||||||||||||||||||||||||||
+ "Then create a new one in suspended state with the new topic or topicPattern. Lastly, you will have to " | ||||||||||||||||||||||||||
+ "reset the supervisor offsets. Finally, you can resume the new supervisor. Note that doing this reset can " | ||||||||||||||||||||||||||
+ "cause duplicate events or lost events if any topics who were in the previous supervisor remain in the new " | ||||||||||||||||||||||||||
+ "one."); | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion for more concise language and formatted message.
Suggested change
I have updated the steps, please fix it up if it doesn't seem correct. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, yes your step logic works and is more clear than mine! |
||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||
public String toString() | ||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
package org.apache.druid.indexing.kafka.supervisor; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.InjectableValues; | ||
import com.fasterxml.jackson.databind.Module; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
@@ -40,6 +41,8 @@ | |
|
||
import java.io.IOException; | ||
|
||
import static org.junit.Assert.assertThrows; | ||
|
||
public class KafkaSupervisorSpecTest | ||
{ | ||
private final ObjectMapper mapper; | ||
|
@@ -578,4 +581,254 @@ public void testSuspendResume() throws IOException | |
|
||
Assert.assertFalse(runningSpec.isSuspended()); | ||
} | ||
|
||
@Test | ||
public void testValidateProposedSpecEvolution() throws JsonProcessingException | ||
{ | ||
String sourceSpecJson = "{\n" | ||
+ " \"type\": \"kafka\",\n" | ||
+ " \"dataSchema\": {\n" | ||
+ " \"dataSource\": \"metrics-kafka\",\n" | ||
+ " \"parser\": {\n" | ||
+ " \"type\": \"string\",\n" | ||
+ " \"parseSpec\": {\n" | ||
+ " \"format\": \"json\",\n" | ||
+ " \"timestampSpec\": {\n" | ||
+ " \"column\": \"timestamp\",\n" | ||
+ " \"format\": \"auto\"\n" | ||
+ " },\n" | ||
+ " \"dimensionsSpec\": {\n" | ||
+ " \"dimensions\": [],\n" | ||
+ " \"dimensionExclusions\": [\n" | ||
+ " \"timestamp\",\n" | ||
+ " \"value\"\n" | ||
+ " ]\n" | ||
+ " }\n" | ||
+ " }\n" | ||
+ " },\n" | ||
+ " \"metricsSpec\": [\n" | ||
+ " {\n" | ||
+ " \"name\": \"count\",\n" | ||
+ " \"type\": \"count\"\n" | ||
+ " },\n" | ||
+ " {\n" | ||
+ " \"name\": \"value_sum\",\n" | ||
+ " \"fieldName\": \"value\",\n" | ||
+ " \"type\": \"doubleSum\"\n" | ||
+ " },\n" | ||
+ " {\n" | ||
+ " \"name\": \"value_min\",\n" | ||
+ " \"fieldName\": \"value\",\n" | ||
+ " \"type\": \"doubleMin\"\n" | ||
+ " },\n" | ||
+ " {\n" | ||
+ " \"name\": \"value_max\",\n" | ||
+ " \"fieldName\": \"value\",\n" | ||
+ " \"type\": \"doubleMax\"\n" | ||
+ " }\n" | ||
+ " ],\n" | ||
+ " \"granularitySpec\": {\n" | ||
+ " \"type\": \"uniform\",\n" | ||
+ " \"segmentGranularity\": \"HOUR\",\n" | ||
+ " \"queryGranularity\": \"NONE\"\n" | ||
+ " }\n" | ||
+ " },\n" | ||
+ " \"ioConfig\": {\n" | ||
+ " \"topic\": \"metrics\",\n" | ||
+ " \"consumerProperties\": {\n" | ||
+ " \"bootstrap.servers\": \"localhost:9092\"\n" | ||
+ " },\n" | ||
+ " \"taskCount\": 1\n" | ||
+ " }\n" | ||
+ "}"; | ||
KafkaSupervisorSpec sourceSpec = mapper.readValue(sourceSpecJson, KafkaSupervisorSpec.class); | ||
|
||
// Change from topic to topicPattern is not allowed | ||
String invalidDestSpecJson = "{\n" | ||
+ " \"type\": \"kafka\",\n" | ||
+ " \"dataSchema\": {\n" | ||
+ " \"dataSource\": \"metrics-kafka\",\n" | ||
+ " \"parser\": {\n" | ||
+ " \"type\": \"string\",\n" | ||
+ " \"parseSpec\": {\n" | ||
+ " \"format\": \"json\",\n" | ||
+ " \"timestampSpec\": {\n" | ||
+ " \"column\": \"timestamp\",\n" | ||
+ " \"format\": \"auto\"\n" | ||
+ " },\n" | ||
+ " \"dimensionsSpec\": {\n" | ||
+ " \"dimensions\": [],\n" | ||
+ " \"dimensionExclusions\": [\n" | ||
+ " \"timestamp\",\n" | ||
+ " \"value\"\n" | ||
+ " ]\n" | ||
+ " }\n" | ||
+ " }\n" | ||
+ " },\n" | ||
+ " \"metricsSpec\": [\n" | ||
+ " {\n" | ||
+ " \"name\": \"count\",\n" | ||
+ " \"type\": \"count\"\n" | ||
+ " },\n" | ||
+ " {\n" | ||
+ " \"name\": \"value_sum\",\n" | ||
+ " \"fieldName\": \"value\",\n" | ||
+ " \"type\": \"doubleSum\"\n" | ||
+ " },\n" | ||
+ " {\n" | ||
+ " \"name\": \"value_min\",\n" | ||
+ " \"fieldName\": \"value\",\n" | ||
+ " \"type\": \"doubleMin\"\n" | ||
+ " },\n" | ||
+ " {\n" | ||
+ " \"name\": \"value_max\",\n" | ||
+ " \"fieldName\": \"value\",\n" | ||
+ " \"type\": \"doubleMax\"\n" | ||
+ " }\n" | ||
+ " ],\n" | ||
+ " \"granularitySpec\": {\n" | ||
+ " \"type\": \"uniform\",\n" | ||
+ " \"segmentGranularity\": \"HOUR\",\n" | ||
+ " \"queryGranularity\": \"NONE\"\n" | ||
+ " }\n" | ||
+ " },\n" | ||
+ " \"ioConfig\": {\n" | ||
+ " \"topicPattern\": \"metrics-.*\",\n" | ||
+ " \"consumerProperties\": {\n" | ||
+ " \"bootstrap.servers\": \"localhost:9092\"\n" | ||
+ " },\n" | ||
+ " \"taskCount\": 1\n" | ||
+ " }\n" | ||
+ "}"; | ||
KafkaSupervisorSpec invalidDestSpec = mapper.readValue(invalidDestSpecJson, KafkaSupervisorSpec.class); | ||
|
||
assertThrows( | ||
IllegalArgumentException.class, | ||
() -> sourceSpec.validateProposedSpecEvolution(invalidDestSpec) | ||
); | ||
|
||
// Changing topic name is not allowed | ||
String invalidDestSpecTwoJson = "{\n" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to write the specs as json String or can we use objects and then just validate them, same as the other tests? If using the json form is essential to the test, you can consider using the utility |
||
+ " \"type\": \"kafka\",\n" | ||
+ " \"dataSchema\": {\n" | ||
+ " \"dataSource\": \"metrics-kafka\",\n" | ||
+ " \"parser\": {\n" | ||
+ " \"type\": \"string\",\n" | ||
+ " \"parseSpec\": {\n" | ||
+ " \"format\": \"json\",\n" | ||
+ " \"timestampSpec\": {\n" | ||
+ " \"column\": \"timestamp\",\n" | ||
+ " \"format\": \"auto\"\n" | ||
+ " },\n" | ||
+ " \"dimensionsSpec\": {\n" | ||
+ " \"dimensions\": [],\n" | ||
+ " \"dimensionExclusions\": [\n" | ||
+ " \"timestamp\",\n" | ||
+ " \"value\"\n" | ||
+ " ]\n" | ||
+ " }\n" | ||
+ " }\n" | ||
+ " },\n" | ||
+ " \"metricsSpec\": [\n" | ||
+ " {\n" | ||
+ " \"name\": \"count\",\n" | ||
+ " \"type\": \"count\"\n" | ||
+ " },\n" | ||
+ " {\n" | ||
+ " \"name\": \"value_sum\",\n" | ||
+ " \"fieldName\": \"value\",\n" | ||
+ " \"type\": \"doubleSum\"\n" | ||
+ " },\n" | ||
+ " {\n" | ||
+ " \"name\": \"value_min\",\n" | ||
+ " \"fieldName\": \"value\",\n" | ||
+ " \"type\": \"doubleMin\"\n" | ||
+ " },\n" | ||
+ " {\n" | ||
+ " \"name\": \"value_max\",\n" | ||
+ " \"fieldName\": \"value\",\n" | ||
+ " \"type\": \"doubleMax\"\n" | ||
+ " }\n" | ||
+ " ],\n" | ||
+ " \"granularitySpec\": {\n" | ||
+ " \"type\": \"uniform\",\n" | ||
+ " \"segmentGranularity\": \"HOUR\",\n" | ||
+ " \"queryGranularity\": \"NONE\"\n" | ||
+ " }\n" | ||
+ " },\n" | ||
+ " \"ioConfig\": {\n" | ||
+ " \"topic\": \"metrics-new\",\n" | ||
+ " \"consumerProperties\": {\n" | ||
+ " \"bootstrap.servers\": \"localhost:9092\"\n" | ||
+ " },\n" | ||
+ " \"taskCount\": 1\n" | ||
+ " }\n" | ||
+ "}"; | ||
KafkaSupervisorSpec invalidDestSpecTwo = mapper.readValue(invalidDestSpecTwoJson, KafkaSupervisorSpec.class); | ||
|
||
assertThrows( | ||
IllegalArgumentException.class, | ||
() -> sourceSpec.validateProposedSpecEvolution(invalidDestSpecTwo) | ||
); | ||
|
||
// Changing non-source related field is allowed. We change taskCount to 3 | ||
String validDestSpecJson = "{\n" | ||
+ " \"type\": \"kafka\",\n" | ||
+ " \"dataSchema\": {\n" | ||
+ " \"dataSource\": \"metrics-kafka\",\n" | ||
+ " \"parser\": {\n" | ||
+ " \"type\": \"string\",\n" | ||
+ " \"parseSpec\": {\n" | ||
+ " \"format\": \"json\",\n" | ||
+ " \"timestampSpec\": {\n" | ||
+ " \"column\": \"timestamp\",\n" | ||
+ " \"format\": \"auto\"\n" | ||
+ " },\n" | ||
+ " \"dimensionsSpec\": {\n" | ||
+ " \"dimensions\": [],\n" | ||
+ " \"dimensionExclusions\": [\n" | ||
+ " \"timestamp\",\n" | ||
+ " \"value\"\n" | ||
+ " ]\n" | ||
+ " }\n" | ||
+ " }\n" | ||
+ " },\n" | ||
+ " \"metricsSpec\": [\n" | ||
+ " {\n" | ||
+ " \"name\": \"count\",\n" | ||
+ " \"type\": \"count\"\n" | ||
+ " },\n" | ||
+ " {\n" | ||
+ " \"name\": \"value_sum\",\n" | ||
+ " \"fieldName\": \"value\",\n" | ||
+ " \"type\": \"doubleSum\"\n" | ||
+ " },\n" | ||
+ " {\n" | ||
+ " \"name\": \"value_min\",\n" | ||
+ " \"fieldName\": \"value\",\n" | ||
+ " \"type\": \"doubleMin\"\n" | ||
+ " },\n" | ||
+ " {\n" | ||
+ " \"name\": \"value_max\",\n" | ||
+ " \"fieldName\": \"value\",\n" | ||
+ " \"type\": \"doubleMax\"\n" | ||
+ " }\n" | ||
+ " ],\n" | ||
+ " \"granularitySpec\": {\n" | ||
+ " \"type\": \"uniform\",\n" | ||
+ " \"segmentGranularity\": \"HOUR\",\n" | ||
+ " \"queryGranularity\": \"NONE\"\n" | ||
+ " }\n" | ||
+ " },\n" | ||
+ " \"ioConfig\": {\n" | ||
+ " \"topic\": \"metrics\",\n" | ||
+ " \"consumerProperties\": {\n" | ||
+ " \"bootstrap.servers\": \"localhost:9092\"\n" | ||
+ " },\n" | ||
+ " \"taskCount\": 3\n" | ||
+ " }\n" | ||
+ "}"; | ||
KafkaSupervisorSpec validDestSpec = mapper.readValue(validDestSpecJson, KafkaSupervisorSpec.class); | ||
sourceSpec.validateProposedSpecEvolution(validDestSpec); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,6 +120,12 @@ public String getSource() | |
return ""; | ||
} | ||
|
||
@Override | ||
public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException | ||
{ | ||
// No validation logic for compaction spec as of now | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like this should have been the default impl in the interface itself. |
||
} | ||
|
||
@Override | ||
public boolean equals(Object o) | ||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -174,6 +174,12 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) | |
synchronized (lock) { | ||
Preconditions.checkState(started, "SupervisorManager not started"); | ||
final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec); | ||
if (shouldUpdateSpec) { | ||
SupervisorSpec existingSpec = getSpec(spec.getId()); | ||
if (existingSpec != null) { | ||
existingSpec.validateProposedSpecEvolution(spec); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic should probably live in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made the move of this logic |
||
} | ||
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false); | ||
createAndStartSupervisorInternal(spec, shouldUpdateSpec); | ||
return shouldUpdateSpec; | ||
|
@@ -529,4 +535,13 @@ private StreamSupervisor requireStreamSupervisor(final String supervisorId, fina | |
); | ||
} | ||
} | ||
|
||
@Nullable | ||
private SupervisorSpec getSpec(String id) | ||
{ | ||
synchronized (lock) { | ||
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id); | ||
return supervisor == null ? null : supervisor.rhs; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this.
Maybe you can put it in a list format for better readability like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated with list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!