From 155aa3ba58d75e86b7815b3f37787452f6e8e645 Mon Sep 17 00:00:00 2001 From: capistrant Date: Mon, 28 Apr 2025 15:27:01 -0500 Subject: [PATCH 1/6] Restrict kafka topic evolution in existing supervisors --- docs/ingestion/kafka-ingestion.md | 6 + .../MaterializedViewSupervisorSpec.java | 6 + .../kafka/supervisor/KafkaSupervisorSpec.java | 30 +++ .../supervisor/KafkaSupervisorSpecTest.java | 253 ++++++++++++++++++ .../compact/CompactionSupervisorSpec.java | 6 + .../supervisor/SupervisorManager.java | 15 ++ .../ScheduledBatchSupervisorSpec.java | 6 + .../SeekableStreamSupervisorSpec.java | 7 + .../OverlordSecurityResourceFilterTest.java | 6 + .../supervisor/SupervisorManagerTest.java | 42 +++ .../supervisor/SupervisorResourceTest.java | 6 + .../supervisor/NoopSupervisorSpec.java | 6 + .../overlord/supervisor/SupervisorSpec.java | 12 + .../supervisor/SupervisorSpecTest.java | 6 + .../SQLMetadataSupervisorManagerTest.java | 6 + .../druid/metadata/TestSupervisorSpec.java | 6 + 16 files changed, 419 insertions(+) diff --git a/docs/ingestion/kafka-ingestion.md b/docs/ingestion/kafka-ingestion.md index b7eccbf01512..03605f87cb34 100644 --- a/docs/ingestion/kafka-ingestion.md +++ b/docs/ingestion/kafka-ingestion.md @@ -134,6 +134,12 @@ If you enable multi-topic ingestion for a datasource, downgrading to a version o 28.0.0 will cause the ingestion for that datasource to fail. ::: +:::info +Migrating an existing supervisor to use `topicPattern` instead of `topic` is not supported. It is also not supported to change the `topicPattern` of an existing supervisor to a different regex pattern. +You can force the migration by terminating the existing supervisor and creating a new one with the new `topicPattern`. You will have to forcefully reset the offsets of the re-submitted supervisor to the earliest or latest offsets, depending on your spec. +Note that resetting the offsets can lead to data duplication or data loss depending on the offset reset policy. +::: + You can ingest data from one or multiple topics. When ingesting data from multiple topics, Druid assigns partitions based on the hashcode of the topic name and the ID of the partition within that topic. The partition assignment might not be uniform across all the tasks. Druid assumes that partitions across individual topics have similar load. If you want to ingest from both high and low load topics in the same supervisor, it is recommended that you have a higher number of partitions for a high load topic and a lower number of partitions for a low load topic. diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index b7b3249a8b91..573bc10fb133 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -340,6 +340,12 @@ public String getSource() return getBaseDataSource(); } + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + // No validation logic for materialized view supervisor spec as of now + } + @Override public String getId() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 5d84eeed1f83..321726123e3c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -30,6 +30,7 @@ import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -176,6 +177,35 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend) ); } + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + if (!(that instanceof KafkaSupervisorSpec)) { + throw new IllegalArgumentException("Cannot evolve to " + that.getType() + " from " + getType()); + } + KafkaSupervisorSpec other = (KafkaSupervisorSpec) that; + if (this.getSource() == null || other.getSource() == null) { + // I don't think this is possible, but covering just in case. + throw new IllegalArgumentException( + "Cannot consider KafkaSupervisorSpec evolution when one or both of the specs have not provided either a " + + "topic OR topicPattern"); + } + + // Future enhancements could allow for topicPattern to be changed in a way where the new source is additive to the + // old source. If we did that now, there would be metadata issues due to {@link KafkaDataSourceMetadata} + // implementation details that aren't set up to handle evolution of metadata in this way. + if (!this.getSource().equals(other.getSource())) { + throw new IllegalArgumentException( + "Your proposed KafkaSupervisorSpec evolution is invalid. You are attempting to change the topic/topicPattern " + + "from " + this.getSource() + " to " + other.getSource() + ". This is not supported. If you " + + "want to change the topic or topicPattern for a supervisor, you must first terminate the supervisor. " + + "Then create a new one in suspended state with the new topic or topicPattern. Lastly, you will have to " + + "reset the supervisor offsets. Finally, you can resume the new supervisor. Note that doing this reset can " + + "cause duplicate events or lost events if any topics who were in the previous supervisor remain in the new " + + "one."); + } + } + @Override public String toString() { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index f8feeae9be7b..ba4ec5c81746 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.kafka.supervisor; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; @@ -40,6 +41,8 @@ import java.io.IOException; +import static org.junit.Assert.assertThrows; + public class KafkaSupervisorSpecTest { private final ObjectMapper mapper; @@ -578,4 +581,254 @@ public void testSuspendResume() throws IOException Assert.assertFalse(runningSpec.isSuspended()); } + + @Test + public void testValidateProposedSpecEvolution() throws JsonProcessingException + { + String sourceSpecJson = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topic\": \"metrics\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + "}"; + KafkaSupervisorSpec sourceSpec = mapper.readValue(sourceSpecJson, KafkaSupervisorSpec.class); + + // Change from topic to topicPattern is not allowed + String invalidDestSpecJson = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topicPattern\": \"metrics-.*\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + "}"; + KafkaSupervisorSpec invalidDestSpec = mapper.readValue(invalidDestSpecJson, KafkaSupervisorSpec.class); + + assertThrows( + IllegalArgumentException.class, + () -> sourceSpec.validateProposedSpecEvolution(invalidDestSpec) + ); + + // Changing topic name is not allowed + String invalidDestSpecTwoJson = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topic\": \"metrics-new\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + "}"; + KafkaSupervisorSpec invalidDestSpecTwo = mapper.readValue(invalidDestSpecTwoJson, KafkaSupervisorSpec.class); + + assertThrows( + IllegalArgumentException.class, + () -> sourceSpec.validateProposedSpecEvolution(invalidDestSpecTwo) + ); + + // Changing non-source related field is allowed. We change taskCount to 3 + String validDestSpecJson = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topic\": \"metrics\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 3\n" + + " }\n" + + "}"; + KafkaSupervisorSpec validDestSpec = mapper.readValue(validDestSpecJson, KafkaSupervisorSpec.class); + sourceSpec.validateProposedSpecEvolution(validDestSpec); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java index 71864c333222..c8654bd5d9b8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java @@ -120,6 +120,12 @@ public String getSource() return ""; } + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + // No validation logic for compaction spec as of now + } + @Override public boolean equals(Object o) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 85b6d392550e..8b8cfd5ee68e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -174,6 +174,12 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) synchronized (lock) { Preconditions.checkState(started, "SupervisorManager not started"); final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec); + if (shouldUpdateSpec) { + SupervisorSpec existingSpec = getSpec(spec.getId()); + if (existingSpec != null) { + existingSpec.validateProposedSpecEvolution(spec); + } + } possiblyStopAndRemoveSupervisorInternal(spec.getId(), false); createAndStartSupervisorInternal(spec, shouldUpdateSpec); return shouldUpdateSpec; @@ -529,4 +535,13 @@ private StreamSupervisor requireStreamSupervisor(final String supervisorId, fina ); } } + + @Nullable + private SupervisorSpec getSpec(String id) + { + synchronized (lock) { + Pair supervisor = supervisors.get(id); + return supervisor == null ? null : supervisor.rhs; + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java index a5cc072aa7a8..0b73b0fd6b1d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java @@ -189,6 +189,12 @@ public String getSource() return ""; } + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + // No validation logic for scheduled batch spec as of now. + } + public CronSchedulerConfig getSchedulerConfig() { return schedulerConfig; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 7b5f46195e7b..a358fb4bc474 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -202,6 +202,13 @@ public boolean isSuspended() return suspended; } + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + // No vaildation logic in abstract class implementation. If concrete classes need to do evolution validation, + // they should override this method. + } + protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java index a25c6aec6215..2ad4e9bbe9f5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java @@ -169,6 +169,12 @@ public String getSource() { return null; } + + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + // No validation logic for test spec + } }; EasyMock.expect(supervisorManager.getSupervisorSpec(EasyMock.anyString())) .andReturn(Optional.of(supervisorSpec)) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index e7276c6aeadc..e5a6cf18a8cd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -145,6 +145,42 @@ public void testCreateUpdateAndRemoveSupervisor() Assert.assertTrue(manager.getSupervisorIds().isEmpty()); } + @Test + public void testCreateOrUpdateAndStartSupervisorIllegalEvolution() + { + SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1) + { + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + throw new IllegalArgumentException("Invalid spec evolution"); + } + }; + SupervisorSpec spec2 = new TestSupervisorSpec("id1", supervisor2); + + Assert.assertTrue(manager.getSupervisorIds().isEmpty()); + + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of()); + metadataSupervisorManager.insert("id1", spec); + supervisor1.start(); + replayAll(); + + manager.start(); + Assert.assertEquals(0, manager.getSupervisorIds().size()); + + manager.createOrUpdateAndStartSupervisor(spec); + Assert.assertEquals(1, manager.getSupervisorIds().size()); + Assert.assertEquals(spec, manager.getSupervisorSpec("id1").get()); + verifyAll(); + + resetAll(); + exception.expect(IllegalArgumentException.class); + replayAll(); + + manager.createOrUpdateAndStartSupervisor(spec2); + verifyAll(); + } + @Test public void testCreateOrUpdateAndStartSupervisorNotStarted() { @@ -761,6 +797,12 @@ public String getSource() return null; } + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + // no-op + } + @Override public List getDataSources() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index febc959b0d10..caee0604f1de 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -1348,6 +1348,12 @@ public String getSource() return "dummy"; } + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + // no validation in test spec + } + @Override public boolean equals(Object o) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 0414c24dcb4a..483538d7af4a 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -130,6 +130,12 @@ public String getSource() return source; } + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + // NoopSupervisorSpec does not check any evolution proposal + } + @Override public Supervisor createSupervisor() { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index 70e5fbd534ed..4d34c775478d 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -100,4 +100,16 @@ default Set getInputSourceResources() throws UnsupportedOperatio * @return source like stream or topic name */ String getSource(); + + /** + * Checks if a proposed evolution of the supervisor spec is allowed. + *

+ * SupervisorSpec `that` is proposed to replace the current supervisor spec. Implementations of this method determine + * if the system should allow this evolution. + *

+ * + * @param that the proposed supervisor spec + * @throws IllegalArgumentException if the evolution is not allowed + */ + void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException; } diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java index 4bbc9dbd9ae6..a16e3f3d13c0 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java @@ -58,6 +58,12 @@ public String getSource() { return null; } + + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + // No validation logic for test spec + } }; @Test diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java index ae9316974769..ab2a550066c0 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java @@ -358,5 +358,11 @@ public String getSource() return null; } + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + // No validation logic for test spec + } + } } diff --git a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java index ffacfa26b8ba..fd517165bbda 100644 --- a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java +++ b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java @@ -77,6 +77,12 @@ public String getSource() return null; } + @Override + public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + { + // No validation logic for test spec + } + @JsonProperty public Object getData() { From 349e3fab7be0e3b78c67d294e085189e5ad02df7 Mon Sep 17 00:00:00 2001 From: capistrant Date: Fri, 2 May 2025 11:14:40 -0500 Subject: [PATCH 2/6] Refactor stream validation code based on review --- .../MaterializedViewSupervisorSpec.java | 6 --- .../kafka/supervisor/KafkaSupervisorSpec.java | 34 +++++++++-------- .../supervisor/KafkaSupervisorSpecTest.java | 13 ++++--- .../compact/CompactionSupervisorSpec.java | 6 --- .../supervisor/SupervisorManager.java | 18 +++++---- .../ScheduledBatchSupervisorSpec.java | 6 --- .../SeekableStreamSupervisorSpec.java | 7 ---- .../OverlordSecurityResourceFilterTest.java | 5 --- .../supervisor/SupervisorManagerTest.java | 37 ++++++++++++++----- .../supervisor/SupervisorResourceTest.java | 6 --- .../supervisor/NoopSupervisorSpec.java | 6 --- .../overlord/supervisor/SupervisorSpec.java | 16 +++++--- .../supervisor/SupervisorSpecTest.java | 6 --- .../SQLMetadataSupervisorManagerTest.java | 6 --- .../druid/metadata/TestSupervisorSpec.java | 6 --- 15 files changed, 73 insertions(+), 105 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index 573bc10fb133..b7b3249a8b91 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -340,12 +340,6 @@ public String getSource() return getBaseDataSource(); } - @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException - { - // No validation logic for materialized view supervisor spec as of now - } - @Override public String getId() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 321726123e3c..217a9e034677 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.guice.annotations.Json; import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -33,6 +35,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; @@ -178,31 +181,30 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend) } @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidException { - if (!(that instanceof KafkaSupervisorSpec)) { - throw new IllegalArgumentException("Cannot evolve to " + that.getType() + " from " + getType()); + if (!(proposedSpec instanceof KafkaSupervisorSpec)) { + throw InvalidInput.exception("Cannot evolve to " + proposedSpec.getType() + " from " + getType()); } - KafkaSupervisorSpec other = (KafkaSupervisorSpec) that; + KafkaSupervisorSpec other = (KafkaSupervisorSpec) proposedSpec; if (this.getSource() == null || other.getSource() == null) { - // I don't think this is possible, but covering just in case. - throw new IllegalArgumentException( - "Cannot consider KafkaSupervisorSpec evolution when one or both of the specs have not provided either a " - + "topic OR topicPattern"); + // Not likely to happen, but covering just in case. + throw InvalidInput.exception("Cannot consider KafkaSupervisorSpec evolution when one or both of the specs " + + "have not provided either a topic OR topicPattern"); } // Future enhancements could allow for topicPattern to be changed in a way where the new source is additive to the // old source. If we did that now, there would be metadata issues due to {@link KafkaDataSourceMetadata} // implementation details that aren't set up to handle evolution of metadata in this way. if (!this.getSource().equals(other.getSource())) { - throw new IllegalArgumentException( - "Your proposed KafkaSupervisorSpec evolution is invalid. You are attempting to change the topic/topicPattern " - + "from " + this.getSource() + " to " + other.getSource() + ". This is not supported. If you " - + "want to change the topic or topicPattern for a supervisor, you must first terminate the supervisor. " - + "Then create a new one in suspended state with the new topic or topicPattern. Lastly, you will have to " - + "reset the supervisor offsets. Finally, you can resume the new supervisor. Note that doing this reset can " - + "cause duplicate events or lost events if any topics who were in the previous supervisor remain in the new " - + "one."); + throw InvalidInput.exception( + StringUtils.format( + "Update of topic/topicPattern from [%s] to [%s] is not supported for a running Kafka supervisor." + + "%nTo perform the update safely, follow these steps:" + + "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " + + "%n(2) Create a new supervisor with the new topic or topicPattern." + + "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.", + this.getSource(), other.getSource())); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index ba4ec5c81746..d9cb0cec8796 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -583,7 +584,7 @@ public void testSuspendResume() throws IOException } @Test - public void testValidateProposedSpecEvolution() throws JsonProcessingException + public void testValidateSpecUpdateTo() throws JsonProcessingException { String sourceSpecJson = "{\n" + " \"type\": \"kafka\",\n" @@ -703,8 +704,8 @@ public void testValidateProposedSpecEvolution() throws JsonProcessingException KafkaSupervisorSpec invalidDestSpec = mapper.readValue(invalidDestSpecJson, KafkaSupervisorSpec.class); assertThrows( - IllegalArgumentException.class, - () -> sourceSpec.validateProposedSpecEvolution(invalidDestSpec) + DruidException.class, + () -> sourceSpec.validateSpecUpdateTo(invalidDestSpec) ); // Changing topic name is not allowed @@ -767,8 +768,8 @@ public void testValidateProposedSpecEvolution() throws JsonProcessingException KafkaSupervisorSpec invalidDestSpecTwo = mapper.readValue(invalidDestSpecTwoJson, KafkaSupervisorSpec.class); assertThrows( - IllegalArgumentException.class, - () -> sourceSpec.validateProposedSpecEvolution(invalidDestSpecTwo) + DruidException.class, + () -> sourceSpec.validateSpecUpdateTo(invalidDestSpecTwo) ); // Changing non-source related field is allowed. We change taskCount to 3 @@ -829,6 +830,6 @@ public void testValidateProposedSpecEvolution() throws JsonProcessingException + " }\n" + "}"; KafkaSupervisorSpec validDestSpec = mapper.readValue(validDestSpecJson, KafkaSupervisorSpec.class); - sourceSpec.validateProposedSpecEvolution(validDestSpec); + sourceSpec.validateSpecUpdateTo(validDestSpec); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java index c8654bd5d9b8..71864c333222 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java @@ -120,12 +120,6 @@ public String getSource() return ""; } - @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException - { - // No validation logic for compaction spec as of now - } - @Override public boolean equals(Object o) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 8b8cfd5ee68e..ea8320c30280 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -174,12 +174,6 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) synchronized (lock) { Preconditions.checkState(started, "SupervisorManager not started"); final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec); - if (shouldUpdateSpec) { - SupervisorSpec existingSpec = getSpec(spec.getId()); - if (existingSpec != null) { - existingSpec.validateProposedSpecEvolution(spec); - } - } possiblyStopAndRemoveSupervisorInternal(spec.getId(), false); createAndStartSupervisorInternal(spec, shouldUpdateSpec); return shouldUpdateSpec; @@ -203,8 +197,16 @@ public boolean shouldUpdateSupervisor(SupervisorSpec spec) try { byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec); Pair currentSupervisor = supervisors.get(spec.getId()); - return currentSupervisor == null - || !Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs)); + if (currentSupervisor == null || currentSupervisor.rhs == null) { + return true; + } else { + if (!Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) { + currentSupervisor.rhs.validateSpecUpdateTo(spec); + return true; + } else { + return false; + } + } } catch (JsonProcessingException ex) { log.warn("Failed to write spec as bytes for spec_id[%s]", spec.getId()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java index 0b73b0fd6b1d..a5cc072aa7a8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchSupervisorSpec.java @@ -189,12 +189,6 @@ public String getSource() return ""; } - @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException - { - // No validation logic for scheduled batch spec as of now. - } - public CronSchedulerConfig getSchedulerConfig() { return schedulerConfig; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index a358fb4bc474..7b5f46195e7b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -202,13 +202,6 @@ public boolean isSuspended() return suspended; } - @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException - { - // No vaildation logic in abstract class implementation. If concrete classes need to do evolution validation, - // they should override this method. - } - protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java index 2ad4e9bbe9f5..2912e1da02e0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java @@ -170,11 +170,6 @@ public String getSource() return null; } - @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException - { - // No validation logic for test spec - } }; EasyMock.expect(supervisorManager.getSupervisorSpec(EasyMock.anyString())) .andReturn(Optional.of(supervisorSpec)) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index e5a6cf18a8cd..d9afd136654b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.DataSourceMetadata; @@ -151,9 +152,9 @@ public void testCreateOrUpdateAndStartSupervisorIllegalEvolution() SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1) { @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException + public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidException { - throw new IllegalArgumentException("Invalid spec evolution"); + throw InvalidInput.exception("Illegal spec update proposed"); } }; SupervisorSpec spec2 = new TestSupervisorSpec("id1", supervisor2); @@ -174,7 +175,7 @@ public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArg verifyAll(); resetAll(); - exception.expect(IllegalArgumentException.class); + exception.expect(DruidException.class); replayAll(); manager.createOrUpdateAndStartSupervisor(spec2); @@ -231,6 +232,30 @@ public void testShouldUpdateSupervisor() Assert.assertTrue(manager.shouldUpdateSupervisor(new NoopSupervisorSpec("id1", null))); } + @Test + public void testShouldUpdateSupervisorIllegalEvolution() + { + SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1) + { + @Override + public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidException + { + throw InvalidInput.exception("Illegal spec update proposed"); + } + }; + SupervisorSpec spec2 = new TestSupervisorSpec("id1", supervisor2); + Map existingSpecs = ImmutableMap.of( + "id1", spec + ); + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); + supervisor1.start(); + exception.expect(DruidException.class); + replayAll(); + manager.start(); + manager.shouldUpdateSupervisor(spec2); + verifyAll(); + } + @Test public void testStopAndRemoveSupervisorNotStarted() { @@ -797,12 +822,6 @@ public String getSource() return null; } - @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException - { - // no-op - } - @Override public List getDataSources() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index caee0604f1de..febc959b0d10 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -1348,12 +1348,6 @@ public String getSource() return "dummy"; } - @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException - { - // no validation in test spec - } - @Override public boolean equals(Object o) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 483538d7af4a..0414c24dcb4a 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -130,12 +130,6 @@ public String getSource() return source; } - @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException - { - // NoopSupervisorSpec does not check any evolution proposal - } - @Override public Supervisor createSupervisor() { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index 4d34c775478d..3297823a451a 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; @@ -102,14 +103,17 @@ default Set getInputSourceResources() throws UnsupportedOperatio String getSource(); /** - * Checks if a proposed evolution of the supervisor spec is allowed. + * Checks if a spec can be replaced with a proposed spec (proposesSpec). *

- * SupervisorSpec `that` is proposed to replace the current supervisor spec. Implementations of this method determine - * if the system should allow this evolution. + * By default, this method does no validation checks. Implementations of this method can choose to define rules + * for spec updates and throw an exception if the update is not allowed. *

* - * @param that the proposed supervisor spec - * @throws IllegalArgumentException if the evolution is not allowed + * @param proposedSpec the proposed supervisor spec + * @throws IllegalArgumentException if the spec update is not allowed */ - void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException; + default void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidException + { + // The default implementation does not do any validation checks. + } } diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java index a16e3f3d13c0..4bbc9dbd9ae6 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java @@ -58,12 +58,6 @@ public String getSource() { return null; } - - @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException - { - // No validation logic for test spec - } }; @Test diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java index ab2a550066c0..ae9316974769 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java @@ -358,11 +358,5 @@ public String getSource() return null; } - @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException - { - // No validation logic for test spec - } - } } diff --git a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java index fd517165bbda..ffacfa26b8ba 100644 --- a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java +++ b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java @@ -77,12 +77,6 @@ public String getSource() return null; } - @Override - public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException - { - // No validation logic for test spec - } - @JsonProperty public Object getData() { From 162da4929fa06591fae2c1796b6b9223875cb0dd Mon Sep 17 00:00:00 2001 From: capistrant Date: Wed, 7 May 2025 13:45:22 -0500 Subject: [PATCH 3/6] Provide default impl of validateSpecUpdateTo with an enhanced override for Kafka Since all existing implementations of the SeekableStreamSupervisor do not support stream modification for existing supervisors, it makes sense to centralize a default implementation of spec update validation. Kafka requries some override because the idea of single and multi topic ingest within kafka is unique to kafka --- .../kafka/supervisor/KafkaSupervisorSpec.java | 45 ++++++++----- .../supervisor/KafkaSupervisorSpecTest.java | 63 +++++++++++++++++++ .../supervisor/SupervisorManager.java | 1 + .../SeekableStreamSupervisorSpec.java | 51 +++++++++++++++ 4 files changed, 143 insertions(+), 17 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 217a9e034677..eab7193480e7 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -180,6 +180,15 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend) ); } + /** + * Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to ensure that the proposed spec and current spec are either both multi-topic or both single-topic. + *

+ * getSource() returns the same string (exampleTopic) for "topicPattern=exampleTopic" and "topic=exampleTopic". + * This override prevents this case from being considered a valid update. + *

+ * @param proposedSpec the proposed supervisor spec + * @throws DruidException if the proposed spec is not a Kafka spec or if the proposed spec changes from multi-topic to single-topic or vice versa + */ @Override public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidException { @@ -187,25 +196,27 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept throw InvalidInput.exception("Cannot evolve to " + proposedSpec.getType() + " from " + getType()); } KafkaSupervisorSpec other = (KafkaSupervisorSpec) proposedSpec; - if (this.getSource() == null || other.getSource() == null) { - // Not likely to happen, but covering just in case. - throw InvalidInput.exception("Cannot consider KafkaSupervisorSpec evolution when one or both of the specs " - + "have not provided either a topic OR topicPattern"); + if (this.getSpec().getIOConfig().isMultiTopic() != other.getSpec().getIOConfig().isMultiTopic()) { + throw InvalidInput.exception(getIllegalInputSourceUpdateErrorMessage("(%s) %s", "(%s) %s"), + this.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic", + this.getSource(), + other.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic", + other.getSource()); } - // Future enhancements could allow for topicPattern to be changed in a way where the new source is additive to the - // old source. If we did that now, there would be metadata issues due to {@link KafkaDataSourceMetadata} - // implementation details that aren't set up to handle evolution of metadata in this way. - if (!this.getSource().equals(other.getSource())) { - throw InvalidInput.exception( - StringUtils.format( - "Update of topic/topicPattern from [%s] to [%s] is not supported for a running Kafka supervisor." - + "%nTo perform the update safely, follow these steps:" - + "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " - + "%n(2) Create a new supervisor with the new topic or topicPattern." - + "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.", - this.getSource(), other.getSource())); - } + super.validateSpecUpdateTo(proposedSpec); + } + + @Override + protected String getIllegalInputSourceUpdateErrorMessage(String existingSource, String proposedSource) + { + return StringUtils.format( + "Update of topic/topicPattern from [%s] to [%s] is not supported for a running Kafka supervisor." + + "%nTo perform the update safely, follow these steps:" + + "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " + + "%n(2) Create a new supervisor with the new topic or topicPattern." + + "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.", + existingSource, proposedSource); } @Override diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index d9cb0cec8796..c1abd751edaa 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -708,6 +708,69 @@ public void testValidateSpecUpdateTo() throws JsonProcessingException () -> sourceSpec.validateSpecUpdateTo(invalidDestSpec) ); + // Change from topic to topicPattern is not allowed + String invalidDestSpecThreeJson = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topicPattern\": \"metrics\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + "}"; + KafkaSupervisorSpec invalidDestSpecThree = mapper.readValue(invalidDestSpecThreeJson, KafkaSupervisorSpec.class); + + assertThrows( + DruidException.class, + () -> sourceSpec.validateSpecUpdateTo(invalidDestSpecThree)); + // Changing topic name is not allowed String invalidDestSpecTwoJson = "{\n" + " \"type\": \"kafka\",\n" diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index ea8320c30280..b33a78ab233d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -201,6 +201,7 @@ public boolean shouldUpdateSupervisor(SupervisorSpec spec) return true; } else { if (!Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) { + // The spec bytes are different, so we need to check if the replacement is allowed currentSupervisor.rhs.validateSpecUpdateTo(spec); return true; } else { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 7b5f46195e7b..39284b76386f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.guice.annotations.Json; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; @@ -34,6 +36,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; @@ -202,6 +205,54 @@ public boolean isSuspended() return suspended; } + /** + * Default implementation that prevents unsupported evolution of the supervisor spec + *
    + *
  • You cannot migrate between types of supervisors.
  • + *
  • You cannot change the input source stream of a running supervisor.
  • + *
+ * @param proposedSpec the proposed supervisor spec + * @throws DruidException if the proposed spec update is not allowed + */ + @Override + public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidException + { + if (!(proposedSpec instanceof SeekableStreamSupervisorSpec) || + !proposedSpec.getType().equals(getType())) { + throw InvalidInput.exception("Cannot evolve to " + proposedSpec.getType() + " from " + getType()); + } + SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec) proposedSpec; + if (this.getSource() == null || other.getSource() == null) { + // Not likely to happen, but covering just in case. + throw InvalidInput.exception("Cannot consider SeekableStreamSupervisorSpec evolution when one or both of " + + "the specs have not provided an input source stream in the IOConfig."); + } + + if (!this.getSource().equals(other.getSource())) { + throw InvalidInput.exception(getIllegalInputSourceUpdateErrorMessage(this.getSource(), other.getSource())); + } + } + protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend); + /** + * Returns an error message for illegal update of the input source stream. + *

+ * This is a reasonable default message, but subclasses may override it to provide more domain specific terminology. + *

+ * @param existingSource The existing input source stream + * @param proposedSource The proposed input source stream + * @return A formatted error message + */ + protected String getIllegalInputSourceUpdateErrorMessage(String existingSource, String proposedSource) + { + return StringUtils.format( + "Update of the input source stream from [%s] to [%s] is not supported for a running supervisror." + + "%nTo perform the update safely, follow these steps:" + + "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " + + "%n(2) Create a new supervisor with the new input source stream." + + "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.", + existingSource, proposedSource); + } + } From ea56bdf153c23e5647f7fb4903d5634d268ef53f Mon Sep 17 00:00:00 2001 From: capistrant Date: Wed, 7 May 2025 15:26:13 -0500 Subject: [PATCH 4/6] Improve docs --- docs/ingestion/kafka-ingestion.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/ingestion/kafka-ingestion.md b/docs/ingestion/kafka-ingestion.md index 03605f87cb34..67b1a96a3d79 100644 --- a/docs/ingestion/kafka-ingestion.md +++ b/docs/ingestion/kafka-ingestion.md @@ -120,7 +120,7 @@ For configuration properties shared across all streaming ingestion methods, refe |Property|Type|Description|Required|Default| |--------|----|-----------|--------|-------| -|`topic`|String|The Kafka topic to read from. To ingest data from multiple topic, use `topicPattern`. |Yes if `topicPattern` isn't set.|| +|`topic`|String|The Kafka topic to read from. Note that once this value is established for a supervisor, updating it is not supported. To ingest data from multiple topic, use `topicPattern`. |Yes if `topicPattern` isn't set.|| |`topicPattern`|String|Multiple Kafka topics to read from, passed as a regex pattern. See [Ingest from multiple topics](#ingest-from-multiple-topics) for more information.|Yes if `topic` isn't set.|| |`consumerProperties`|String, Object|A map of properties to pass to the Kafka consumer. See [Consumer properties](#consumer-properties) for details.|Yes. At the minimum, you must set the `bootstrap.servers` property to establish the initial connection to the Kafka cluster.|| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds.|No|100| @@ -136,8 +136,10 @@ If you enable multi-topic ingestion for a datasource, downgrading to a version o :::info Migrating an existing supervisor to use `topicPattern` instead of `topic` is not supported. It is also not supported to change the `topicPattern` of an existing supervisor to a different regex pattern. -You can force the migration by terminating the existing supervisor and creating a new one with the new `topicPattern`. You will have to forcefully reset the offsets of the re-submitted supervisor to the earliest or latest offsets, depending on your spec. -Note that resetting the offsets can lead to data duplication or data loss depending on the offset reset policy. +You can force the migration by doing the following: +1. Suspend the supervisor. +2. Reset the offsets. +3. Submit updated supervisor. ::: You can ingest data from one or multiple topics. @@ -462,4 +464,4 @@ See the following topics for more information: * [Supervisor API](../api-reference/supervisor-api.md) for how to manage and monitor supervisors using the API. * [Supervisor](../ingestion/supervisor.md) for supervisor status and capacity planning. * [Loading from Apache Kafka](../tutorials/tutorial-kafka.md) for a tutorial on streaming data from Apache Kafka. -* [Kafka input format](../ingestion/data-formats.md#kafka) to learn about the `kafka` input format. \ No newline at end of file +* [Kafka input format](../ingestion/data-formats.md#kafka) to learn about the `kafka` input format. From a9095cb1175364f0f8eafa9754c661ddc8916b51 Mon Sep 17 00:00:00 2001 From: capistrant Date: Wed, 7 May 2025 15:31:48 -0500 Subject: [PATCH 5/6] Improve test coverage --- .../kafka/supervisor/KafkaSupervisorSpec.java | 4 +- .../supervisor/KafkaSupervisorSpecTest.java | 174 +++++++++++------- .../SeekableStreamSupervisorSpec.java | 7 +- .../SeekableStreamSupervisorSpecTest.java | 167 +++++++++++++++++ 4 files changed, 285 insertions(+), 67 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index eab7193480e7..e329323e1d79 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -193,7 +193,9 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend) public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidException { if (!(proposedSpec instanceof KafkaSupervisorSpec)) { - throw InvalidInput.exception("Cannot evolve to " + proposedSpec.getType() + " from " + getType()); + throw InvalidInput.exception( + StringUtils.format("Cannot evolve to [%s] from [%s]", getClass().getName(), proposedSpec.getClass().getName()) + ); } KafkaSupervisorSpec other = (KafkaSupervisorSpec) proposedSpec; if (this.getSpec().getIOConfig().isMultiTopic() != other.getSpec().getIOConfig().isMultiTopic()) { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index c1abd751edaa..4fd5dfec5065 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -29,6 +29,8 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -41,6 +43,7 @@ import org.junit.Test; import java.io.IOException; +import java.util.List; import static org.junit.Assert.assertThrows; @@ -644,6 +647,13 @@ public void testValidateSpecUpdateTo() throws JsonProcessingException + "}"; KafkaSupervisorSpec sourceSpec = mapper.readValue(sourceSpecJson, KafkaSupervisorSpec.class); + // Proposed spec being non-kafka is not allowed + OtherSupervisorSpec otherSpec = new OtherSupervisorSpec(); + assertThrows( + DruidException.class, + () -> sourceSpec.validateSpecUpdateTo(otherSpec) + ); + // Change from topic to topicPattern is not allowed String invalidDestSpecJson = "{\n" + " \"type\": \"kafka\",\n" @@ -708,69 +718,6 @@ public void testValidateSpecUpdateTo() throws JsonProcessingException () -> sourceSpec.validateSpecUpdateTo(invalidDestSpec) ); - // Change from topic to topicPattern is not allowed - String invalidDestSpecThreeJson = "{\n" - + " \"type\": \"kafka\",\n" - + " \"dataSchema\": {\n" - + " \"dataSource\": \"metrics-kafka\",\n" - + " \"parser\": {\n" - + " \"type\": \"string\",\n" - + " \"parseSpec\": {\n" - + " \"format\": \"json\",\n" - + " \"timestampSpec\": {\n" - + " \"column\": \"timestamp\",\n" - + " \"format\": \"auto\"\n" - + " },\n" - + " \"dimensionsSpec\": {\n" - + " \"dimensions\": [],\n" - + " \"dimensionExclusions\": [\n" - + " \"timestamp\",\n" - + " \"value\"\n" - + " ]\n" - + " }\n" - + " }\n" - + " },\n" - + " \"metricsSpec\": [\n" - + " {\n" - + " \"name\": \"count\",\n" - + " \"type\": \"count\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_sum\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleSum\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_min\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMin\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_max\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMax\"\n" - + " }\n" - + " ],\n" - + " \"granularitySpec\": {\n" - + " \"type\": \"uniform\",\n" - + " \"segmentGranularity\": \"HOUR\",\n" - + " \"queryGranularity\": \"NONE\"\n" - + " }\n" - + " },\n" - + " \"ioConfig\": {\n" - + " \"topicPattern\": \"metrics\",\n" - + " \"consumerProperties\": {\n" - + " \"bootstrap.servers\": \"localhost:9092\"\n" - + " },\n" - + " \"taskCount\": 1\n" - + " }\n" - + "}"; - KafkaSupervisorSpec invalidDestSpecThree = mapper.readValue(invalidDestSpecThreeJson, KafkaSupervisorSpec.class); - - assertThrows( - DruidException.class, - () -> sourceSpec.validateSpecUpdateTo(invalidDestSpecThree)); - // Changing topic name is not allowed String invalidDestSpecTwoJson = "{\n" + " \"type\": \"kafka\",\n" @@ -835,6 +782,73 @@ public void testValidateSpecUpdateTo() throws JsonProcessingException () -> sourceSpec.validateSpecUpdateTo(invalidDestSpecTwo) ); + // Change from topic to topicPattern is not allowed + String invalidDestSpecThreeJson = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topicPattern\": \"metrics\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + "}"; + KafkaSupervisorSpec invalidDestSpecThree = mapper.readValue(invalidDestSpecThreeJson, KafkaSupervisorSpec.class); + + assertThrows( + DruidException.class, + () -> sourceSpec.validateSpecUpdateTo(invalidDestSpecThree)); + // test the inverse as well + assertThrows( + DruidException.class, + () -> invalidDestSpecThree.validateSpecUpdateTo(sourceSpec)); + // Changing non-source related field is allowed. We change taskCount to 3 String validDestSpecJson = "{\n" + " \"type\": \"kafka\",\n" @@ -895,4 +909,38 @@ public void testValidateSpecUpdateTo() throws JsonProcessingException KafkaSupervisorSpec validDestSpec = mapper.readValue(validDestSpecJson, KafkaSupervisorSpec.class); sourceSpec.validateSpecUpdateTo(validDestSpec); } + + private static class OtherSupervisorSpec implements SupervisorSpec + { + + @Override + public String getId() + { + return ""; + } + + @Override + public Supervisor createSupervisor() + { + return null; + } + + @Override + public List getDataSources() + { + return List.of(); + } + + @Override + public String getType() + { + return ""; + } + + @Override + public String getSource() + { + return ""; + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 39284b76386f..17e0e0a48ff7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -217,9 +217,10 @@ public boolean isSuspended() @Override public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidException { - if (!(proposedSpec instanceof SeekableStreamSupervisorSpec) || - !proposedSpec.getType().equals(getType())) { - throw InvalidInput.exception("Cannot evolve to " + proposedSpec.getType() + " from " + getType()); + if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) { + throw InvalidInput.exception( + StringUtils.format("Cannot evolve to [%s] from [%s]", getClass().getName(), proposedSpec.getClass().getName()) + ); } SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec) proposedSpec; if (this.getSource() == null || other.getSource() == null) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 1a704729dafd..e31627337bd7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.error.DruidException; import org.apache.druid.indexer.granularity.UniformGranularitySpec; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.DataSourceMetadata; @@ -35,6 +36,7 @@ import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; @@ -84,6 +86,8 @@ import java.util.TreeMap; import java.util.concurrent.ScheduledExecutorService; +import static org.junit.Assert.assertThrows; + public class SeekableStreamSupervisorSpecTest extends EasyMockSupport { private SeekableStreamSupervisorIngestionSpec ingestionSchema; @@ -1298,6 +1302,135 @@ public void testGetContextVauleForKeyShouldReturnValue() Assert.assertEquals("value", spec.getContextValue("key")); } + @Test + public void testValidateSpecUpdateToShortCircuits() + { + mockIngestionSchema(); + TestSeekableStreamSupervisorSpec originalSpec = new TestSeekableStreamSupervisorSpec( + ingestionSchema, + ImmutableMap.of("key", "value"), + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ); + TestSeekableStreamSupervisorSpec proposedSpec = new TestSeekableStreamSupervisorSpec( + ingestionSchema, + ImmutableMap.of("key", "value"), + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ); + assertThrows( + DruidException.class, + () -> originalSpec.validateSpecUpdateTo(proposedSpec) + ); + + OtherSupervisorSpec otherSpec = new OtherSupervisorSpec(); + assertThrows( + DruidException.class, + () -> originalSpec.validateSpecUpdateTo(otherSpec) + ); + } + + @Test + public void testValidateSpecUpdateToSourceComparisons() + { + mockIngestionSchema(); + TestSeekableStreamSupervisorSpec originalSpec = new TestSeekableStreamSupervisorSpec( + ingestionSchema, + ImmutableMap.of("key", "value"), + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ) + { + @Override + public String getSource() + { + return "source1"; + } + }; + TestSeekableStreamSupervisorSpec proposedSpecDiffSource = new TestSeekableStreamSupervisorSpec( + ingestionSchema, + ImmutableMap.of("key", "value"), + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ) + { + @Override + public String getSource() + { + return "source2"; + } + }; + TestSeekableStreamSupervisorSpec proposedSpecSameSource = new TestSeekableStreamSupervisorSpec( + ingestionSchema, + ImmutableMap.of("key", "value"), + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ) + { + @Override + public String getSource() + { + return "source1"; + } + }; + + assertThrows( + DruidException.class, + () -> originalSpec.validateSpecUpdateTo(proposedSpecDiffSource) + ); + originalSpec.validateSpecUpdateTo(proposedSpecSameSource); + } + private void mockIngestionSchema() { EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); @@ -1412,4 +1545,38 @@ private static Map getScaleInProperties() return autoScalerConfig; } + private static class OtherSupervisorSpec implements SupervisorSpec + { + + @Override + public String getId() + { + return ""; + } + + @Override + public Supervisor createSupervisor() + { + return null; + } + + @Override + public List getDataSources() + { + return List.of(); + } + + @Override + public String getType() + { + return ""; + } + + @Override + public String getSource() + { + return ""; + } + } + } From 7a9bcd583e6d9b05594d14df5ade526a71585fdb Mon Sep 17 00:00:00 2001 From: capistrant Date: Thu, 8 May 2025 15:34:18 -0500 Subject: [PATCH 6/6] Test improvments and general cleanup following review round --- .../kafka-indexing-service/pom.xml | 5 + .../kafka/supervisor/KafkaSupervisorSpec.java | 24 +- .../supervisor/KafkaSupervisorSpecTest.java | 508 ++++++------------ .../supervisor/SupervisorManager.java | 12 +- .../SeekableStreamSupervisorSpec.java | 34 +- .../SeekableStreamSupervisorSpecTest.java | 93 ++-- .../overlord/supervisor/SupervisorSpec.java | 4 +- 7 files changed, 235 insertions(+), 445 deletions(-) diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml index 761d7be1ccac..0dc45401a981 100644 --- a/extensions-core/kafka-indexing-service/pom.xml +++ b/extensions-core/kafka-indexing-service/pom.xml @@ -113,6 +113,11 @@ hamcrest-core test + + org.hamcrest + hamcrest-all + test + com.fasterxml.jackson.core jackson-core diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index e329323e1d79..13b3972f2ec8 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -194,33 +194,21 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept { if (!(proposedSpec instanceof KafkaSupervisorSpec)) { throw InvalidInput.exception( - StringUtils.format("Cannot evolve to [%s] from [%s]", getClass().getName(), proposedSpec.getClass().getName()) + "Cannot change spec from type[%s] to type[%s]", getClass().getSimpleName(), proposedSpec.getClass().getSimpleName() ); } KafkaSupervisorSpec other = (KafkaSupervisorSpec) proposedSpec; if (this.getSpec().getIOConfig().isMultiTopic() != other.getSpec().getIOConfig().isMultiTopic()) { - throw InvalidInput.exception(getIllegalInputSourceUpdateErrorMessage("(%s) %s", "(%s) %s"), - this.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic", - this.getSource(), - other.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic", - other.getSource()); + throw InvalidInput.exception( + SeekableStreamSupervisorSpec.ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE, + StringUtils.format("(%s) %s", this.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic", this.getSource()), + StringUtils.format("(%s) %s", other.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic", other.getSource()) + ); } super.validateSpecUpdateTo(proposedSpec); } - @Override - protected String getIllegalInputSourceUpdateErrorMessage(String existingSource, String proposedSource) - { - return StringUtils.format( - "Update of topic/topicPattern from [%s] to [%s] is not supported for a running Kafka supervisor." - + "%nTo perform the update safely, follow these steps:" - + "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " - + "%n(2) Create a new supervisor with the new topic or topicPattern." - + "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.", - existingSource, proposedSource); - } - @Override public String toString() { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index 4fd5dfec5065..03c5820be0bb 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -19,31 +19,38 @@ package org.apache.druid.indexing.kafka.supervisor; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.indexer.granularity.UniformGranularitySpec; import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.indexing.overlord.supervisor.Supervisor; -import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.metadata.TestSupervisorSpec; +import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Test; import java.io.IOException; -import java.util.List; +import java.util.Map; import static org.junit.Assert.assertThrows; @@ -587,360 +594,173 @@ public void testSuspendResume() throws IOException } @Test - public void testValidateSpecUpdateTo() throws JsonProcessingException + public void test_validateSpecUpdateTo() { - String sourceSpecJson = "{\n" - + " \"type\": \"kafka\",\n" - + " \"dataSchema\": {\n" - + " \"dataSource\": \"metrics-kafka\",\n" - + " \"parser\": {\n" - + " \"type\": \"string\",\n" - + " \"parseSpec\": {\n" - + " \"format\": \"json\",\n" - + " \"timestampSpec\": {\n" - + " \"column\": \"timestamp\",\n" - + " \"format\": \"auto\"\n" - + " },\n" - + " \"dimensionsSpec\": {\n" - + " \"dimensions\": [],\n" - + " \"dimensionExclusions\": [\n" - + " \"timestamp\",\n" - + " \"value\"\n" - + " ]\n" - + " }\n" - + " }\n" - + " },\n" - + " \"metricsSpec\": [\n" - + " {\n" - + " \"name\": \"count\",\n" - + " \"type\": \"count\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_sum\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleSum\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_min\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMin\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_max\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMax\"\n" - + " }\n" - + " ],\n" - + " \"granularitySpec\": {\n" - + " \"type\": \"uniform\",\n" - + " \"segmentGranularity\": \"HOUR\",\n" - + " \"queryGranularity\": \"NONE\"\n" - + " }\n" - + " },\n" - + " \"ioConfig\": {\n" - + " \"topic\": \"metrics\",\n" - + " \"consumerProperties\": {\n" - + " \"bootstrap.servers\": \"localhost:9092\"\n" - + " },\n" - + " \"taskCount\": 1\n" - + " }\n" - + "}"; - KafkaSupervisorSpec sourceSpec = mapper.readValue(sourceSpecJson, KafkaSupervisorSpec.class); + KafkaSupervisorSpec sourceSpec = getSpec("metrics", null); // Proposed spec being non-kafka is not allowed - OtherSupervisorSpec otherSpec = new OtherSupervisorSpec(); - assertThrows( - DruidException.class, - () -> sourceSpec.validateSpecUpdateTo(otherSpec) + TestSupervisorSpec otherSpec = new TestSupervisorSpec("test", new Object()); + MatcherAssert.assertThat( + assertThrows(DruidException.class, () -> sourceSpec.validateSpecUpdateTo(otherSpec)), + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageIs( + StringUtils.format("Cannot change spec from type[%s] to type[%s]", sourceSpec.getClass().getSimpleName(), otherSpec.getClass().getSimpleName()) + ) ); - // Change from topic to topicPattern is not allowed - String invalidDestSpecJson = "{\n" - + " \"type\": \"kafka\",\n" - + " \"dataSchema\": {\n" - + " \"dataSource\": \"metrics-kafka\",\n" - + " \"parser\": {\n" - + " \"type\": \"string\",\n" - + " \"parseSpec\": {\n" - + " \"format\": \"json\",\n" - + " \"timestampSpec\": {\n" - + " \"column\": \"timestamp\",\n" - + " \"format\": \"auto\"\n" - + " },\n" - + " \"dimensionsSpec\": {\n" - + " \"dimensions\": [],\n" - + " \"dimensionExclusions\": [\n" - + " \"timestamp\",\n" - + " \"value\"\n" - + " ]\n" - + " }\n" - + " }\n" - + " },\n" - + " \"metricsSpec\": [\n" - + " {\n" - + " \"name\": \"count\",\n" - + " \"type\": \"count\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_sum\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleSum\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_min\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMin\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_max\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMax\"\n" - + " }\n" - + " ],\n" - + " \"granularitySpec\": {\n" - + " \"type\": \"uniform\",\n" - + " \"segmentGranularity\": \"HOUR\",\n" - + " \"queryGranularity\": \"NONE\"\n" - + " }\n" - + " },\n" - + " \"ioConfig\": {\n" - + " \"topicPattern\": \"metrics-.*\",\n" - + " \"consumerProperties\": {\n" - + " \"bootstrap.servers\": \"localhost:9092\"\n" - + " },\n" - + " \"taskCount\": 1\n" - + " }\n" - + "}"; - KafkaSupervisorSpec invalidDestSpec = mapper.readValue(invalidDestSpecJson, KafkaSupervisorSpec.class); + KafkaSupervisorSpec multiTopicProposedSpec = getSpec(null, "metrics-.*"); + MatcherAssert.assertThat( + assertThrows(DruidException.class, () -> sourceSpec.validateSpecUpdateTo(multiTopicProposedSpec)), + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageIs( + "Update of the input source stream from [(single-topic) metrics] to [(multi-topic) metrics-.*] is not supported for a running supervisor." + + "\nTo perform the update safely, follow these steps:" + + "\n(1) Suspend this supervisor, reset its offsets and then terminate it. " + + "\n(2) Create a new supervisor with the new input source stream." + + "\nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too." + ) + ); - assertThrows( - DruidException.class, - () -> sourceSpec.validateSpecUpdateTo(invalidDestSpec) + KafkaSupervisorSpec singleTopicNewStreamProposedSpec = getSpec("metricsNew", null); + MatcherAssert.assertThat( + assertThrows(DruidException.class, () -> sourceSpec.validateSpecUpdateTo(singleTopicNewStreamProposedSpec)), + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageIs( + "Update of the input source stream from [metrics] to [metricsNew] is not supported for a running supervisor." + + "\nTo perform the update safely, follow these steps:" + + "\n(1) Suspend this supervisor, reset its offsets and then terminate it. " + + "\n(2) Create a new supervisor with the new input source stream." + + "\nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too." + ) ); - // Changing topic name is not allowed - String invalidDestSpecTwoJson = "{\n" - + " \"type\": \"kafka\",\n" - + " \"dataSchema\": {\n" - + " \"dataSource\": \"metrics-kafka\",\n" - + " \"parser\": {\n" - + " \"type\": \"string\",\n" - + " \"parseSpec\": {\n" - + " \"format\": \"json\",\n" - + " \"timestampSpec\": {\n" - + " \"column\": \"timestamp\",\n" - + " \"format\": \"auto\"\n" - + " },\n" - + " \"dimensionsSpec\": {\n" - + " \"dimensions\": [],\n" - + " \"dimensionExclusions\": [\n" - + " \"timestamp\",\n" - + " \"value\"\n" - + " ]\n" - + " }\n" - + " }\n" - + " },\n" - + " \"metricsSpec\": [\n" - + " {\n" - + " \"name\": \"count\",\n" - + " \"type\": \"count\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_sum\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleSum\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_min\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMin\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_max\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMax\"\n" - + " }\n" - + " ],\n" - + " \"granularitySpec\": {\n" - + " \"type\": \"uniform\",\n" - + " \"segmentGranularity\": \"HOUR\",\n" - + " \"queryGranularity\": \"NONE\"\n" - + " }\n" - + " },\n" - + " \"ioConfig\": {\n" - + " \"topic\": \"metrics-new\",\n" - + " \"consumerProperties\": {\n" - + " \"bootstrap.servers\": \"localhost:9092\"\n" - + " },\n" - + " \"taskCount\": 1\n" - + " }\n" - + "}"; - KafkaSupervisorSpec invalidDestSpecTwo = mapper.readValue(invalidDestSpecTwoJson, KafkaSupervisorSpec.class); - - assertThrows( - DruidException.class, - () -> sourceSpec.validateSpecUpdateTo(invalidDestSpecTwo) + KafkaSupervisorSpec multiTopicMatchingSourceString = getSpec(null, "metrics"); + MatcherAssert.assertThat( + assertThrows(DruidException.class, () -> sourceSpec.validateSpecUpdateTo(multiTopicMatchingSourceString)), + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageIs( + "Update of the input source stream from [(single-topic) metrics] to [(multi-topic) metrics] is not supported for a running supervisor." + + "\nTo perform the update safely, follow these steps:" + + "\n(1) Suspend this supervisor, reset its offsets and then terminate it. " + + "\n(2) Create a new supervisor with the new input source stream." + + "\nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too." + ) ); - // Change from topic to topicPattern is not allowed - String invalidDestSpecThreeJson = "{\n" - + " \"type\": \"kafka\",\n" - + " \"dataSchema\": {\n" - + " \"dataSource\": \"metrics-kafka\",\n" - + " \"parser\": {\n" - + " \"type\": \"string\",\n" - + " \"parseSpec\": {\n" - + " \"format\": \"json\",\n" - + " \"timestampSpec\": {\n" - + " \"column\": \"timestamp\",\n" - + " \"format\": \"auto\"\n" - + " },\n" - + " \"dimensionsSpec\": {\n" - + " \"dimensions\": [],\n" - + " \"dimensionExclusions\": [\n" - + " \"timestamp\",\n" - + " \"value\"\n" - + " ]\n" - + " }\n" - + " }\n" - + " },\n" - + " \"metricsSpec\": [\n" - + " {\n" - + " \"name\": \"count\",\n" - + " \"type\": \"count\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_sum\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleSum\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_min\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMin\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_max\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMax\"\n" - + " }\n" - + " ],\n" - + " \"granularitySpec\": {\n" - + " \"type\": \"uniform\",\n" - + " \"segmentGranularity\": \"HOUR\",\n" - + " \"queryGranularity\": \"NONE\"\n" - + " }\n" - + " },\n" - + " \"ioConfig\": {\n" - + " \"topicPattern\": \"metrics\",\n" - + " \"consumerProperties\": {\n" - + " \"bootstrap.servers\": \"localhost:9092\"\n" - + " },\n" - + " \"taskCount\": 1\n" - + " }\n" - + "}"; - KafkaSupervisorSpec invalidDestSpecThree = mapper.readValue(invalidDestSpecThreeJson, KafkaSupervisorSpec.class); - - assertThrows( - DruidException.class, - () -> sourceSpec.validateSpecUpdateTo(invalidDestSpecThree)); // test the inverse as well - assertThrows( - DruidException.class, - () -> invalidDestSpecThree.validateSpecUpdateTo(sourceSpec)); - - // Changing non-source related field is allowed. We change taskCount to 3 - String validDestSpecJson = "{\n" - + " \"type\": \"kafka\",\n" - + " \"dataSchema\": {\n" - + " \"dataSource\": \"metrics-kafka\",\n" - + " \"parser\": {\n" - + " \"type\": \"string\",\n" - + " \"parseSpec\": {\n" - + " \"format\": \"json\",\n" - + " \"timestampSpec\": {\n" - + " \"column\": \"timestamp\",\n" - + " \"format\": \"auto\"\n" - + " },\n" - + " \"dimensionsSpec\": {\n" - + " \"dimensions\": [],\n" - + " \"dimensionExclusions\": [\n" - + " \"timestamp\",\n" - + " \"value\"\n" - + " ]\n" - + " }\n" - + " }\n" - + " },\n" - + " \"metricsSpec\": [\n" - + " {\n" - + " \"name\": \"count\",\n" - + " \"type\": \"count\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_sum\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleSum\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_min\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMin\"\n" - + " },\n" - + " {\n" - + " \"name\": \"value_max\",\n" - + " \"fieldName\": \"value\",\n" - + " \"type\": \"doubleMax\"\n" - + " }\n" - + " ],\n" - + " \"granularitySpec\": {\n" - + " \"type\": \"uniform\",\n" - + " \"segmentGranularity\": \"HOUR\",\n" - + " \"queryGranularity\": \"NONE\"\n" - + " }\n" - + " },\n" - + " \"ioConfig\": {\n" - + " \"topic\": \"metrics\",\n" - + " \"consumerProperties\": {\n" - + " \"bootstrap.servers\": \"localhost:9092\"\n" - + " },\n" - + " \"taskCount\": 3\n" - + " }\n" - + "}"; - KafkaSupervisorSpec validDestSpec = mapper.readValue(validDestSpecJson, KafkaSupervisorSpec.class); + MatcherAssert.assertThat( + assertThrows(DruidException.class, () -> multiTopicMatchingSourceString.validateSpecUpdateTo(sourceSpec)), + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageIs( + "Update of the input source stream from [(multi-topic) metrics] to [(single-topic) metrics] is not supported for a running supervisor." + + "\nTo perform the update safely, follow these steps:" + + "\n(1) Suspend this supervisor, reset its offsets and then terminate it. " + + "\n(2) Create a new supervisor with the new input source stream." + + "\nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too." + ) + ); + + // Test valid spec update. This spec changes context vs the sourceSpec + KafkaSupervisorSpec validDestSpec = new KafkaSupervisorSpec( + null, + DataSchema.builder().withDataSource("testDs").withAggregators(new CountAggregatorFactory("rows")).withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)).build(), + null, + new KafkaSupervisorIOConfig( + "metrics", + null, + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), + null, + null, + null, + Map.of("bootstrap.servers", "localhost:9092"), + null, + null, + null, + null, + true, + null, + null, + null, + null, + null, + null, + null, + false + ), + Map.of( + "key1", + "value1", + "key2", + "value2" + ), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); sourceSpec.validateSpecUpdateTo(validDestSpec); } - private static class OtherSupervisorSpec implements SupervisorSpec + private KafkaSupervisorSpec getSpec(String topic, String topicPattern) { - - @Override - public String getId() - { - return ""; - } - - @Override - public Supervisor createSupervisor() - { - return null; - } - - @Override - public List getDataSources() - { - return List.of(); - } - - @Override - public String getType() - { - return ""; - } - - @Override - public String getSource() - { - return ""; - } + return new KafkaSupervisorSpec( + null, + DataSchema.builder().withDataSource("testDs").withAggregators(new CountAggregatorFactory("rows")).withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)).build(), + null, + new KafkaSupervisorIOConfig( + topic, + topicPattern, + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), + null, + null, + null, + Map.of("bootstrap.servers", "localhost:9092"), + null, + null, + null, + null, + true, + null, + null, + null, + null, + null, + null, + null, + false + ), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index b33a78ab233d..0ada4679a637 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -199,14 +199,12 @@ public boolean shouldUpdateSupervisor(SupervisorSpec spec) Pair currentSupervisor = supervisors.get(spec.getId()); if (currentSupervisor == null || currentSupervisor.rhs == null) { return true; + } else if (Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) { + return false; } else { - if (!Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) { - // The spec bytes are different, so we need to check if the replacement is allowed - currentSupervisor.rhs.validateSpecUpdateTo(spec); - return true; - } else { - return false; - } + // The spec bytes are different, so we need to check if the update is allowed + currentSupervisor.rhs.validateSpecUpdateTo(spec); + return true; } } catch (JsonProcessingException ex) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 17e0e0a48ff7..ba0012b29c93 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -36,7 +36,6 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; @@ -48,6 +47,11 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec { + protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE = "Update of the input source stream from [%s] to [%s] is not supported for a running supervisor." + + "%nTo perform the update safely, follow these steps:" + + "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " + + "%n(2) Create a new supervisor with the new input source stream." + + "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too."; private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema( SeekableStreamSupervisorIngestionSpec ingestionSchema @@ -219,41 +223,21 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept { if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) { throw InvalidInput.exception( - StringUtils.format("Cannot evolve to [%s] from [%s]", getClass().getName(), proposedSpec.getClass().getName()) + "Cannot update supervisor spec from type[%s] to type[%s]", getClass().getSimpleName(), proposedSpec.getClass().getSimpleName() ); } SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec) proposedSpec; if (this.getSource() == null || other.getSource() == null) { // Not likely to happen, but covering just in case. - throw InvalidInput.exception("Cannot consider SeekableStreamSupervisorSpec evolution when one or both of " - + "the specs have not provided an input source stream in the IOConfig."); + throw InvalidInput.exception("Cannot update supervisor spec since one or both of " + + "the specs have not provided an input source stream in the 'ioConfig'."); } if (!this.getSource().equals(other.getSource())) { - throw InvalidInput.exception(getIllegalInputSourceUpdateErrorMessage(this.getSource(), other.getSource())); + throw InvalidInput.exception(ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE, this.getSource(), other.getSource()); } } protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend); - /** - * Returns an error message for illegal update of the input source stream. - *

- * This is a reasonable default message, but subclasses may override it to provide more domain specific terminology. - *

- * @param existingSource The existing input source stream - * @param proposedSource The proposed input source stream - * @return A formatted error message - */ - protected String getIllegalInputSourceUpdateErrorMessage(String existingSource, String proposedSource) - { - return StringUtils.format( - "Update of the input source stream from [%s] to [%s] is not supported for a running supervisror." - + "%nTo perform the update safely, follow these steps:" - + "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " - + "%n(2) Create a new supervisor with the new input source stream." - + "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.", - existingSource, proposedSource); - } - } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index e31627337bd7..e37324d2be62 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.indexer.granularity.UniformGranularitySpec; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.DataSourceMetadata; @@ -36,7 +37,6 @@ import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.Supervisor; -import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; @@ -55,11 +55,13 @@ import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.metadata.TestSupervisorSpec; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.segment.TestHelper; @@ -67,6 +69,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; +import org.hamcrest.MatcherAssert; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Period; @@ -1303,12 +1306,12 @@ public void testGetContextVauleForKeyShouldReturnValue() } @Test - public void testValidateSpecUpdateToShortCircuits() + public void test_validateSpecUpdateTo_ShortCircuits() { mockIngestionSchema(); TestSeekableStreamSupervisorSpec originalSpec = new TestSeekableStreamSupervisorSpec( ingestionSchema, - ImmutableMap.of("key", "value"), + Map.of("key", "value"), false, taskStorage, taskMaster, @@ -1324,7 +1327,7 @@ public void testValidateSpecUpdateToShortCircuits() ); TestSeekableStreamSupervisorSpec proposedSpec = new TestSeekableStreamSupervisorSpec( ingestionSchema, - ImmutableMap.of("key", "value"), + Map.of("key", "value"), false, taskStorage, taskMaster, @@ -1338,20 +1341,33 @@ public void testValidateSpecUpdateToShortCircuits() supervisor4, "id1" ); - assertThrows( - DruidException.class, - () -> originalSpec.validateSpecUpdateTo(proposedSpec) + MatcherAssert.assertThat( + assertThrows(DruidException.class, () -> originalSpec.validateSpecUpdateTo(proposedSpec)), + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageIs( + "Cannot update supervisor spec since one or both of the specs have not provided an input source stream in the 'ioConfig'." + ) ); - OtherSupervisorSpec otherSpec = new OtherSupervisorSpec(); - assertThrows( - DruidException.class, - () -> originalSpec.validateSpecUpdateTo(otherSpec) + + TestSupervisorSpec otherSpec = new TestSupervisorSpec("fake", new Object()); + MatcherAssert.assertThat( + assertThrows(DruidException.class, () -> originalSpec.validateSpecUpdateTo(otherSpec)), + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageIs( + StringUtils.format("Cannot update supervisor spec from type[%s] to type[%s]", proposedSpec.getClass().getSimpleName(), otherSpec.getClass().getSimpleName()) + ) ); } @Test - public void testValidateSpecUpdateToSourceComparisons() + public void test_validateSpecUpdateTo_SourceStringComparisons() { mockIngestionSchema(); TestSeekableStreamSupervisorSpec originalSpec = new TestSeekableStreamSupervisorSpec( @@ -1424,10 +1440,23 @@ public String getSource() } }; - assertThrows( - DruidException.class, - () -> originalSpec.validateSpecUpdateTo(proposedSpecDiffSource) + // Mistmatched stream strings test + MatcherAssert.assertThat( + assertThrows(DruidException.class, () -> originalSpec.validateSpecUpdateTo(proposedSpecDiffSource)), + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageIs( + "Update of the input source stream from [source1] to [source2] is not supported for a running supervisor." + + "\nTo perform the update safely, follow these steps:" + + "\n(1) Suspend this supervisor, reset its offsets and then terminate it. " + + "\n(2) Create a new supervisor with the new input source stream." + + "\nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too." + ) ); + + // Happy path test originalSpec.validateSpecUpdateTo(proposedSpecSameSource); } @@ -1545,38 +1574,4 @@ private static Map getScaleInProperties() return autoScalerConfig; } - private static class OtherSupervisorSpec implements SupervisorSpec - { - - @Override - public String getId() - { - return ""; - } - - @Override - public Supervisor createSupervisor() - { - return null; - } - - @Override - public List getDataSources() - { - return List.of(); - } - - @Override - public String getType() - { - return ""; - } - - @Override - public String getSource() - { - return ""; - } - } - } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index 3297823a451a..a1a4aaaae625 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -105,12 +105,12 @@ default Set getInputSourceResources() throws UnsupportedOperatio /** * Checks if a spec can be replaced with a proposed spec (proposesSpec). *

- * By default, this method does no validation checks. Implementations of this method can choose to define rules + * By default, this method does no validation checks. Implementations of this method can choose to define rules * for spec updates and throw an exception if the update is not allowed. *

* * @param proposedSpec the proposed supervisor spec - * @throws IllegalArgumentException if the spec update is not allowed + * @throws DruidException if the spec update is not allowed */ default void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidException {