-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Restrict input stream modification in existing seekable stream supervisors #17955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restrict input stream modification in existing seekable stream supervisors #17955
Conversation
@@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend) | |||
); | |||
} | |||
|
|||
@Override | |||
public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably want to add a javadoc here now that I notice I didn't
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a useful validation, @capistrant !
I have left some suggestions.
* @param that the proposed supervisor spec | ||
* @throws IllegalArgumentException if the evolution is not allowed | ||
*/ | ||
void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use a simpler method name like validateSpecUpdateTo
, and the arg can be called proposedSpec
.
We should throw a DruidException (of type not supported or invalid input) instead.
Also, please add a default impl to avoid having to override in all impls where this is not needed right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for that default tip, cleaned up a ton :) also fixed the Exception type to use DruidException and made an attempt at rename
SupervisorSpec existingSpec = getSpec(spec.getId()); | ||
if (existingSpec != null) { | ||
existingSpec.validateProposedSpecEvolution(spec); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic should probably live in shouldUpdateSupervisor
method itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the move of this logic
@@ -134,6 +134,12 @@ If you enable multi-topic ingestion for a datasource, downgrading to a version o | |||
28.0.0 will cause the ingestion for that datasource to fail. | |||
::: | |||
|
|||
:::info | |||
Migrating an existing supervisor to use `topicPattern` instead of `topic` is not supported. It is also not supported to change the `topicPattern` of an existing supervisor to a different regex pattern. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this.
Maybe you can put it in a list format for better readability like
The following updates to a supervisor spec are not supported:
- item 1
- item 2
- item 3
Follow these steps instead:
- Step 1
- Step 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated with list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
@Override | ||
public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException | ||
{ | ||
// No validation logic for compaction spec as of now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this should have been the default impl in the interface itself.
// Future enhancements could allow for topicPattern to be changed in a way where the new source is additive to the | ||
// old source. If we did that now, there would be metadata issues due to {@link KafkaDataSourceMetadata} | ||
// implementation details that aren't set up to handle evolution of metadata in this way. | ||
if (!this.getSource().equals(other.getSource())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this impl be present for all SeekableStreamSupervisorSpec
s and not just Kafka?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I was not certain on if we wanted to modify behavior of other streaming ingestion types with this PR, but I can certainly look into it if you think it would make sense. I guess for each existing implementation (kinesis, etc.) we'd have to see if this source change is currently supported and if yes, add an override to their spec class to allow it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked into it and it turns out rabbit and kinesis also share the metadata management logic that wouldn't support updating streams so I made the update for all. I therefore put this in SeekableStreamSupervisorSpec. I still ended up having to have an override in Kafka cuz the multi topic single topic information created an edge case of moving between single/multi with the same exact string for topic/topicPattern. I'm not sure that my kafka override is the proper style. I do the edge case logic and then call into super implementation for the core logic. I thought that would save duplicate code, but wasn't sure if it is necessarily the right way from a java perspective
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I have left an alternative suggestion for this.
} | ||
KafkaSupervisorSpec other = (KafkaSupervisorSpec) that; | ||
if (this.getSource() == null || other.getSource() == null) { | ||
// I don't think this is possible, but covering just in case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// I don't think this is possible, but covering just in case. | |
// Not likely to happen, but covering just in case. |
"Your proposed KafkaSupervisorSpec evolution is invalid. You are attempting to change the topic/topicPattern " | ||
+ "from " + this.getSource() + " to " + other.getSource() + ". This is not supported. If you " | ||
+ "want to change the topic or topicPattern for a supervisor, you must first terminate the supervisor. " | ||
+ "Then create a new one in suspended state with the new topic or topicPattern. Lastly, you will have to " | ||
+ "reset the supervisor offsets. Finally, you can resume the new supervisor. Note that doing this reset can " | ||
+ "cause duplicate events or lost events if any topics who were in the previous supervisor remain in the new " | ||
+ "one."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion for more concise language and formatted message.
"Your proposed KafkaSupervisorSpec evolution is invalid. You are attempting to change the topic/topicPattern " | |
+ "from " + this.getSource() + " to " + other.getSource() + ". This is not supported. If you " | |
+ "want to change the topic or topicPattern for a supervisor, you must first terminate the supervisor. " | |
+ "Then create a new one in suspended state with the new topic or topicPattern. Lastly, you will have to " | |
+ "reset the supervisor offsets. Finally, you can resume the new supervisor. Note that doing this reset can " | |
+ "cause duplicate events or lost events if any topics who were in the previous supervisor remain in the new " | |
+ "one."); | |
"Update of topic/topicPattern from [%s] to [%s] is not supported for a running Kafka supervisor." | |
+ "%nTo perform the update safely, follow these steps:" | |
+ "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " | |
+ "%n(2) Create a new supervisor with the new topic or topicPattern." | |
+ "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too."); |
I have updated the steps, please fix it up if it doesn't seem correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, yes your step logic works and is more clear than mine!
@kfaraz Thanks for the review! All of the comments make sense to me, I will start working through them soon |
…e for Kafka Since all existing implementations of the SeekableStreamSupervisor do not support stream modification for existing supervisors, it makes sense to centralize a default implementation of spec update validation. Kafka requries some override because the idea of single and multi topic ingest within kafka is unique to kafka
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for incorporating the changes, @capistrant . Left some final suggestions.
We should be good to merge after these.
/** | ||
* Checks if a spec can be replaced with a proposed spec (proposesSpec). | ||
* <p> | ||
* By default, this method does no validation checks. Implementations of this method can choose to define rules |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* By default, this method does no validation checks. Implementations of this method can choose to define rules | |
* By default, this method does no validation checks. Implementations of this method can choose to define rules |
* </p> | ||
* | ||
* @param proposedSpec the proposed supervisor spec | ||
* @throws IllegalArgumentException if the spec update is not allowed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code throws DruidException
s of type invalid input now.
@@ -1298,6 +1302,135 @@ public void testGetContextVauleForKeyShouldReturnValue() | |||
Assert.assertEquals("value", spec.getContextValue("key")); | |||
} | |||
|
|||
@Test | |||
public void testValidateSpecUpdateToShortCircuits() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
My latest personal preference on test naming convention is something like this:
test_validateSpecUpdateTo_shortCircuits_whenSomethingHappens()
This is only slightly different from the convention you have already followed here,
in that it uses underscores. The underscores help in clearly spelling out which method
is being tested and under what conditions.
That said, you may stick to the current convention if you find it more appealing since
there is no established Druid naming convention yet as long as the test names make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this, it is easier on the eyes IMO. 👍
@@ -1412,4 +1545,38 @@ private static Map<String, Object> getScaleInProperties() | |||
return autoScalerConfig; | |||
} | |||
|
|||
private static class OtherSupervisorSpec implements SupervisorSpec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already a TestSupervisorSpec
class that you may be able to use.
{ | ||
if (!(proposedSpec instanceof KafkaSupervisorSpec)) { | ||
throw InvalidInput.exception( | ||
StringUtils.format("Cannot evolve to [%s] from [%s]", getClass().getName(), proposedSpec.getClass().getName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StringUtils.format("Cannot evolve to [%s] from [%s]", getClass().getName(), proposedSpec.getClass().getName()) | |
"Cannot change spec from type[%s] to type[%s]", getClass().getName(), proposedSpec.getClass().getName() |
); | ||
|
||
OtherSupervisorSpec otherSpec = new OtherSupervisorSpec(); | ||
assertThrows( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use DruidExceptionMatcher
to verify error message too.
} else { | ||
if (!Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) { | ||
// The spec bytes are different, so we need to check if the replacement is allowed | ||
currentSupervisor.rhs.validateSpecUpdateTo(spec); | ||
return true; | ||
} else { | ||
return false; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaner with the nesting reduced and positive condition first:
} else { | |
if (!Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) { | |
// The spec bytes are different, so we need to check if the replacement is allowed | |
currentSupervisor.rhs.validateSpecUpdateTo(spec); | |
return true; | |
} else { | |
return false; | |
} | |
} | |
} else if (Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) { | |
return false; | |
} else { | |
// The spec bytes are different, so we need to check if the update is allowed | |
currentSupervisor.rhs.validateSpecUpdateTo(spec); | |
return true; | |
} |
* @param proposedSource The proposed input source stream | ||
* @return A formatted error message | ||
*/ | ||
protected String getIllegalInputSourceUpdateErrorMessage(String existingSource, String proposedSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method shouldn't be a part of the contract of this interface.
I would suggest just making the string a constant and formatting it when needed.
} | ||
|
||
@Override | ||
protected String getIllegalInputSourceUpdateErrorMessage(String existingSource, String proposedSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need to override this method.
We can either use the constant defined in SeekableStreamSupervisorSpec
as is or a different String altogether.
A cleaner way to include the type name in that String would be to have a placeholder for type and fill it with getType()
while formatting.
); | ||
|
||
// Changing topic name is not allowed | ||
String invalidDestSpecTwoJson = "{\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to write the specs as json String or can we use objects and then just validate them, same as the other tests?
If using the json form is essential to the test, you can consider using the utility TestUtils.singleQuoteToStandardJson()
to make the json easier to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🚀
@capistrant , does this PR need the |
Yeah even I feel it should not require a design review. |
Description
Modify the supervisor API to add restrictions on modifications to existing Kafka Supervisor Specs. Prevent the changing of the "stream" for an existing spec. This effectively means, that you cannot submit a spec update that makes a change to
topic
, a migration betweentopic
andtopicPattern
or a change to atopicPattern
. The reasoning for this is that the system is not designed to gracefully handle such migrations. In the best case, tasks will fail. And in the worst case, tasks will succeed but the metadata will not be being persisted correctly, leading to eventual data integrity issues.SupervisorSpec Interface Modification
The core of the change lies here with a new interface method:
void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException;
This method is intended to determine if a proposed evolution of the existing spec to
that
proposed spec is allowed. The way it is spec'd out in this PR that an illegal proposed evolution results in the throwing of anIllegalArgumentException
.The only implementation that actually has logic is KafkaSupervisorSpec which prevents the changing of the topic/topicPattern for the existing supervisor. All other spec evolution is allowed.
Alternatives
Support changes in topic/topicPattern
An alternative approach to this would be modifying the system to properly handle change in the topic/topicPattern. I think that is still a good long term plan. But it would require defining exactly what evolution is allowed to occur. You could technically allow any kind of change in the topic/topicPattern, but it is debatable if you want to. For instance, allowing users unbounded ability to change the topic/topicPattern could lead to mistakes that behave how designed, but result in data issues for the user because they did something they didn't intend to, like remove a topic from their supervisor when they actually only meant to add a topic to the set of topics supplying data for the supervisor.
Allow the spec change but prevent the start of tasks if the underlying topic set doesn't match metadata store
Another approach I considered but did not pursue, so I don't know the true viability. would be to accept the spec submission, but not start up new tasks if the topic set in metadata didn't match what the new supervisor was actually seeing from Kafka. I think this would have allowed the change to stay confined to the kafka extension, with the tradeoff being that the feedback to the user wasn't as immediate as my implementation.
Other thoughts
Perhaps, an ideal world would be to identify a way to achieve this immediate negative feedback to the user, while still not modifying code outside of the kafka extension. I am open to hearing these ideas, so I labeled with design review.
Release note
Explicitly prevent Seekable Stream Supervisors (Kafka, Kinesis, and Rabbit) from updating the underlying "input stream" (i.e. topic for kafka) that is persisted for it. This action, while previously allowed by the API, is not fully supported by the underlying system. Going forward, a request to make such a change will result in a
400
error from the Supervisor API with details on the reason why it is not allowed. The docs and the message in the response describe a work-a-round for users who are adamant that they want to make such a change.Key changed/added classes in this PR
SupervisorSpec
KafkaSupervisorSpec
This PR has: