Skip to content

Restrict input stream modification in existing seekable stream supervisors #17955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

capistrant
Copy link
Contributor

@capistrant capistrant commented Apr 29, 2025

Description

Modify the supervisor API to add restrictions on modifications to existing Kafka Supervisor Specs. Prevent the changing of the "stream" for an existing spec. This effectively means, that you cannot submit a spec update that makes a change to topic, a migration between topic and topicPattern or a change to a topicPattern. The reasoning for this is that the system is not designed to gracefully handle such migrations. In the best case, tasks will fail. And in the worst case, tasks will succeed but the metadata will not be being persisted correctly, leading to eventual data integrity issues.

SupervisorSpec Interface Modification

The core of the change lies here with a new interface method: void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException;

This method is intended to determine if a proposed evolution of the existing spec to that proposed spec is allowed. The way it is spec'd out in this PR that an illegal proposed evolution results in the throwing of an IllegalArgumentException.

The only implementation that actually has logic is KafkaSupervisorSpec which prevents the changing of the topic/topicPattern for the existing supervisor. All other spec evolution is allowed.

Alternatives

Support changes in topic/topicPattern

An alternative approach to this would be modifying the system to properly handle change in the topic/topicPattern. I think that is still a good long term plan. But it would require defining exactly what evolution is allowed to occur. You could technically allow any kind of change in the topic/topicPattern, but it is debatable if you want to. For instance, allowing users unbounded ability to change the topic/topicPattern could lead to mistakes that behave how designed, but result in data issues for the user because they did something they didn't intend to, like remove a topic from their supervisor when they actually only meant to add a topic to the set of topics supplying data for the supervisor.

Allow the spec change but prevent the start of tasks if the underlying topic set doesn't match metadata store

Another approach I considered but did not pursue, so I don't know the true viability. would be to accept the spec submission, but not start up new tasks if the topic set in metadata didn't match what the new supervisor was actually seeing from Kafka. I think this would have allowed the change to stay confined to the kafka extension, with the tradeoff being that the feedback to the user wasn't as immediate as my implementation.

Other thoughts

Perhaps, an ideal world would be to identify a way to achieve this immediate negative feedback to the user, while still not modifying code outside of the kafka extension. I am open to hearing these ideas, so I labeled with design review.

Release note

Explicitly prevent Seekable Stream Supervisors (Kafka, Kinesis, and Rabbit) from updating the underlying "input stream" (i.e. topic for kafka) that is persisted for it. This action, while previously allowed by the API, is not fully supported by the underlying system. Going forward, a request to make such a change will result in a 400 error from the Supervisor API with details on the reason why it is not allowed. The docs and the message in the response describe a work-a-round for users who are adamant that they want to make such a change.


Key changed/added classes in this PR
  • SupervisorSpec
  • KafkaSupervisorSpec

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend)
);
}

@Override
public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably want to add a javadoc here now that I notice I didn't

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a useful validation, @capistrant !
I have left some suggestions.

* @param that the proposed supervisor spec
* @throws IllegalArgumentException if the evolution is not allowed
*/
void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use a simpler method name like validateSpecUpdateTo, and the arg can be called proposedSpec.

We should throw a DruidException (of type not supported or invalid input) instead.

Also, please add a default impl to avoid having to override in all impls where this is not needed right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for that default tip, cleaned up a ton :) also fixed the Exception type to use DruidException and made an attempt at rename

Comment on lines 178 to 181
SupervisorSpec existingSpec = getSpec(spec.getId());
if (existingSpec != null) {
existingSpec.validateProposedSpecEvolution(spec);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic should probably live in shouldUpdateSupervisor method itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the move of this logic

@@ -134,6 +134,12 @@ If you enable multi-topic ingestion for a datasource, downgrading to a version o
28.0.0 will cause the ingestion for that datasource to fail.
:::

:::info
Migrating an existing supervisor to use `topicPattern` instead of `topic` is not supported. It is also not supported to change the `topicPattern` of an existing supervisor to a different regex pattern.
Copy link
Contributor

@kfaraz kfaraz Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this.

Maybe you can put it in a list format for better readability like

The following updates to a supervisor spec are not supported:
- item 1
- item 2
- item 3

Follow these steps instead: 
- Step 1
- Step 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with list

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@Override
public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException
{
// No validation logic for compaction spec as of now
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this should have been the default impl in the interface itself.

// Future enhancements could allow for topicPattern to be changed in a way where the new source is additive to the
// old source. If we did that now, there would be metadata issues due to {@link KafkaDataSourceMetadata}
// implementation details that aren't set up to handle evolution of metadata in this way.
if (!this.getSource().equals(other.getSource())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this impl be present for all SeekableStreamSupervisorSpecs and not just Kafka?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I was not certain on if we wanted to modify behavior of other streaming ingestion types with this PR, but I can certainly look into it if you think it would make sense. I guess for each existing implementation (kinesis, etc.) we'd have to see if this source change is currently supported and if yes, add an override to their spec class to allow it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into it and it turns out rabbit and kinesis also share the metadata management logic that wouldn't support updating streams so I made the update for all. I therefore put this in SeekableStreamSupervisorSpec. I still ended up having to have an override in Kafka cuz the multi topic single topic information created an edge case of moving between single/multi with the same exact string for topic/topicPattern. I'm not sure that my kafka override is the proper style. I do the edge case logic and then call into super implementation for the core logic. I thought that would save duplicate code, but wasn't sure if it is necessarily the right way from a java perspective

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I have left an alternative suggestion for this.

}
KafkaSupervisorSpec other = (KafkaSupervisorSpec) that;
if (this.getSource() == null || other.getSource() == null) {
// I don't think this is possible, but covering just in case.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// I don't think this is possible, but covering just in case.
// Not likely to happen, but covering just in case.

Comment on lines 199 to 205
"Your proposed KafkaSupervisorSpec evolution is invalid. You are attempting to change the topic/topicPattern "
+ "from " + this.getSource() + " to " + other.getSource() + ". This is not supported. If you "
+ "want to change the topic or topicPattern for a supervisor, you must first terminate the supervisor. "
+ "Then create a new one in suspended state with the new topic or topicPattern. Lastly, you will have to "
+ "reset the supervisor offsets. Finally, you can resume the new supervisor. Note that doing this reset can "
+ "cause duplicate events or lost events if any topics who were in the previous supervisor remain in the new "
+ "one.");
Copy link
Contributor

@kfaraz kfaraz Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion for more concise language and formatted message.

Suggested change
"Your proposed KafkaSupervisorSpec evolution is invalid. You are attempting to change the topic/topicPattern "
+ "from " + this.getSource() + " to " + other.getSource() + ". This is not supported. If you "
+ "want to change the topic or topicPattern for a supervisor, you must first terminate the supervisor. "
+ "Then create a new one in suspended state with the new topic or topicPattern. Lastly, you will have to "
+ "reset the supervisor offsets. Finally, you can resume the new supervisor. Note that doing this reset can "
+ "cause duplicate events or lost events if any topics who were in the previous supervisor remain in the new "
+ "one.");
"Update of topic/topicPattern from [%s] to [%s] is not supported for a running Kafka supervisor."
+ "%nTo perform the update safely, follow these steps:"
+ "%n(1) Suspend this supervisor, reset its offsets and then terminate it. "
+ "%n(2) Create a new supervisor with the new topic or topicPattern."
+ "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.");

I have updated the steps, please fix it up if it doesn't seem correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yes your step logic works and is more clear than mine!

@capistrant
Copy link
Contributor Author

@kfaraz Thanks for the review! All of the comments make sense to me, I will start working through them soon

capistrant added 4 commits May 2, 2025 11:14
…e for Kafka

Since all existing implementations of the SeekableStreamSupervisor do not support stream modification for existing supervisors, it makes sense to centralize a default implementation of spec update validation. Kafka requries some override because the idea of single and multi topic ingest within kafka is unique to kafka
@capistrant capistrant changed the title Restrict kafka topic modification in existing kafka supervisors Restrict input stream modification in existing seekable stream supervisors May 7, 2025
@capistrant capistrant requested a review from kfaraz May 8, 2025 14:35
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for incorporating the changes, @capistrant . Left some final suggestions.
We should be good to merge after these.

/**
* Checks if a spec can be replaced with a proposed spec (proposesSpec).
* <p>
* By default, this method does no validation checks. Implementations of this method can choose to define rules
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* By default, this method does no validation checks. Implementations of this method can choose to define rules
* By default, this method does no validation checks. Implementations of this method can choose to define rules

* </p>
*
* @param proposedSpec the proposed supervisor spec
* @throws IllegalArgumentException if the spec update is not allowed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code throws DruidExceptions of type invalid input now.

@@ -1298,6 +1302,135 @@ public void testGetContextVauleForKeyShouldReturnValue()
Assert.assertEquals("value", spec.getContextValue("key"));
}

@Test
public void testValidateSpecUpdateToShortCircuits()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion:

My latest personal preference on test naming convention is something like this:

test_validateSpecUpdateTo_shortCircuits_whenSomethingHappens()

This is only slightly different from the convention you have already followed here,
in that it uses underscores. The underscores help in clearly spelling out which method
is being tested and under what conditions.

That said, you may stick to the current convention if you find it more appealing since
there is no established Druid naming convention yet as long as the test names make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this, it is easier on the eyes IMO. 👍

@@ -1412,4 +1545,38 @@ private static Map<String, Object> getScaleInProperties()
return autoScalerConfig;
}

private static class OtherSupervisorSpec implements SupervisorSpec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a TestSupervisorSpec class that you may be able to use.

{
if (!(proposedSpec instanceof KafkaSupervisorSpec)) {
throw InvalidInput.exception(
StringUtils.format("Cannot evolve to [%s] from [%s]", getClass().getName(), proposedSpec.getClass().getName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
StringUtils.format("Cannot evolve to [%s] from [%s]", getClass().getName(), proposedSpec.getClass().getName())
"Cannot change spec from type[%s] to type[%s]", getClass().getName(), proposedSpec.getClass().getName()

);

OtherSupervisorSpec otherSpec = new OtherSupervisorSpec();
assertThrows(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use DruidExceptionMatcher to verify error message too.

Comment on lines 202 to 210
} else {
if (!Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) {
// The spec bytes are different, so we need to check if the replacement is allowed
currentSupervisor.rhs.validateSpecUpdateTo(spec);
return true;
} else {
return false;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaner with the nesting reduced and positive condition first:

Suggested change
} else {
if (!Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) {
// The spec bytes are different, so we need to check if the replacement is allowed
currentSupervisor.rhs.validateSpecUpdateTo(spec);
return true;
} else {
return false;
}
}
} else if (Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) {
return false;
} else {
// The spec bytes are different, so we need to check if the update is allowed
currentSupervisor.rhs.validateSpecUpdateTo(spec);
return true;
}

* @param proposedSource The proposed input source stream
* @return A formatted error message
*/
protected String getIllegalInputSourceUpdateErrorMessage(String existingSource, String proposedSource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method shouldn't be a part of the contract of this interface.
I would suggest just making the string a constant and formatting it when needed.

}

@Override
protected String getIllegalInputSourceUpdateErrorMessage(String existingSource, String proposedSource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need to override this method.
We can either use the constant defined in SeekableStreamSupervisorSpec as is or a different String altogether.
A cleaner way to include the type name in that String would be to have a placeholder for type and fill it with getType() while formatting.

);

// Changing topic name is not allowed
String invalidDestSpecTwoJson = "{\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to write the specs as json String or can we use objects and then just validate them, same as the other tests?

If using the json form is essential to the test, you can consider using the utility TestUtils.singleQuoteToStandardJson() to make the json easier to read.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🚀

@kfaraz
Copy link
Contributor

kfaraz commented May 9, 2025

@capistrant , does this PR need the Design Review label?
If yes, then it will need another approval before being merged.

@cryptoe
Copy link
Contributor

cryptoe commented May 9, 2025

Yeah even I feel it should not require a design review.

@cryptoe cryptoe merged commit 1e87ec7 into apache:master May 9, 2025
75 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants