diff --git a/docs/api-reference/service-status-api.md b/docs/api-reference/service-status-api.md index 48679658d440..47d2a5a6d3f2 100644 --- a/docs/api-reference/service-status-api.md +++ b/docs/api-reference/service-status-api.md @@ -782,7 +782,7 @@ Host: http://COORDINATOR_IP:COORDINATOR_PORT { "host": "localhost", "port": 8082, - "syncTimeInMs": 1745756337472 + "lastSyncTimestampMillis": 1745756337472 } ] } diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index 78aec8f9f286..0a277949fdda 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -66,7 +66,7 @@ See [SQL query context](sql-query-context.md) for other query context parameters |`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:
- Log the stack trace of the exception (if any) produced by the query | |`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. | |`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.| -|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `clonesPreferred`. `excludeClones` means that clone Historicals are not queried by the broker. `clonesPreferred` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones; Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries; MSQ does not query Historicals directly, and Dart will not respect this context parameter.| +|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `preferClones`. `excludeClones` means that clone Historicals are not queried by the broker. `preferClones` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones. Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries. MSQ does not query Historicals directly, and Dart will not respect this context parameter.| ## Parameters by query type diff --git a/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java b/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java index 5334fff0731b..fd6957bc4e65 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java @@ -166,7 +166,7 @@ private synchronized Set getCurrentServersToIgnore(CloneQueryMode cloneQ // Don't remove either. return Set.of(); default: - throw DruidException.defensive("Unexpected value: [%s]", cloneQueryMode); + throw DruidException.defensive("Unexpected value of cloneQueryMode[%s]", cloneQueryMode); } } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java index 939686256eea..70b4e0a950b8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java @@ -105,7 +105,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) if (!targetProjectedSegments.contains(segment) && loadQueueManager.loadSegment(segment, targetServer, SegmentAction.LOAD)) { stats.add( Stats.Segments.ASSIGNED_TO_CLONE, - RowKey.of(Dimension.SERVER, targetServer.getServer().getHost()), + RowKey.of(Dimension.SERVER, targetServer.getServer().getName()), 1L ); } @@ -116,7 +116,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) if (!sourceProjectedSegments.contains(segment) && loadQueueManager.dropSegment(segment, targetServer)) { stats.add( Stats.Segments.DROPPED_FROM_CLONE, - RowKey.of(Dimension.SERVER, targetServer.getServer().getHost()), + RowKey.of(Dimension.SERVER, targetServer.getServer().getName()), 1L ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index 0571245c9853..f4ae1b5f8a9e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -186,4 +186,14 @@ public static class Balancer public static final CoordinatorStat COMPUTE_THREADS = CoordinatorStat.toDebugOnly("balancerComputeThreads"); public static final CoordinatorStat MAX_TO_MOVE = CoordinatorStat.toDebugOnly("maxToMove"); } + + public static class Configuration + { + public static final CoordinatorStat BROKER_SYNC_TIME + = CoordinatorStat.toDebugAndEmit("brokerSyncTime", "config/brokerSync/time"); + public static final CoordinatorStat TOTAL_SYNC_TIME + = CoordinatorStat.toDebugAndEmit("totalBrokerSyncTime", "config/brokerSync/total/time"); + public static final CoordinatorStat BROKER_SYNC_ERROR + = CoordinatorStat.toDebugAndEmit("configSyncFailure", "config/brokerSync/error"); + } } diff --git a/server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java b/server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java index 483be1db5bd4..15970a8bf4a0 100644 --- a/server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java +++ b/server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java @@ -32,25 +32,25 @@ public class BrokerSyncStatus { private final String host; private final int port; - private final long syncTimeInMs; + private final long lastSyncTimestampMillis; @JsonCreator public BrokerSyncStatus( @JsonProperty("host") String host, @JsonProperty("port") int port, - @JsonProperty("syncTimeInMs") long syncTimeInMs + @JsonProperty("lastSyncTimestampMillis") long lastSyncTimestampMillis ) { this.host = host; this.port = port; - this.syncTimeInMs = syncTimeInMs; + this.lastSyncTimestampMillis = lastSyncTimestampMillis; } - public BrokerSyncStatus(ServiceLocation broker, long syncTimeInMs) + public BrokerSyncStatus(ServiceLocation broker, long lastSyncTimestampMillis) { this.host = broker.getHost(); this.port = broker.getTlsPort() > 0 ? broker.getTlsPort() : broker.getPlaintextPort(); - this.syncTimeInMs = syncTimeInMs; + this.lastSyncTimestampMillis = lastSyncTimestampMillis; } @JsonProperty @@ -66,9 +66,9 @@ public int getPort() } @JsonProperty - public long getSyncTimeInMs() + public long getLastSyncTimestampMillis() { - return syncTimeInMs; + return lastSyncTimestampMillis; } @Override @@ -96,7 +96,7 @@ public String toString() return "BrokerSyncStatus{" + "host='" + host + '\'' + ", port=" + port + - ", syncTimeInMs=" + syncTimeInMs + + ", lastSyncTimestampMillis=" + lastSyncTimestampMillis + '}'; } } diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java index cdf4611a365d..1e286219aeb0 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java @@ -20,6 +20,7 @@ package org.apache.druid.server.http; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import org.apache.druid.client.broker.BrokerClient; @@ -29,8 +30,12 @@ import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.rpc.FixedServiceLocator; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.ServiceLocation; @@ -38,15 +43,19 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.stats.CoordinatorStat; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; import javax.annotation.Nullable; +import java.util.Collection; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; /** * Updates all brokers with the latest coordinator dynamic config. @@ -61,6 +70,7 @@ public class CoordinatorDynamicConfigSyncer private final ServiceClientFactory clientFactory; private final ScheduledExecutorService exec; + private final ServiceEmitter emitter; private @Nullable Future syncFuture = null; @GuardedBy("this") @@ -72,7 +82,8 @@ public CoordinatorDynamicConfigSyncer( @EscalatedGlobal final ServiceClientFactory clientFactory, final CoordinatorConfigManager configManager, @Json final ObjectMapper jsonMapper, - final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + final ServiceEmitter emitter ) { this.clientFactory = clientFactory; @@ -81,6 +92,7 @@ public CoordinatorDynamicConfigSyncer( this.druidNodeDiscovery = druidNodeDiscoveryProvider; this.exec = Execs.scheduledSingleThreaded("CoordinatorDynamicConfigSyncer-%d"); this.inSyncBrokers = ConcurrentHashMap.newKeySet(); + this.emitter = emitter; } /** @@ -95,12 +107,19 @@ public void queueBroadcastConfigToBrokers() * Push the latest coordinator dynamic config, provided by the configManager to all currently known Brokers. Also * invalidates the set of inSyncBrokers if the config has changed. */ - private void broadcastConfigToBrokers() + @VisibleForTesting + void broadcastConfigToBrokers() { invalidateInSyncBrokersIfNeeded(); - for (ServiceLocation broker : getKnownBrokers()) { + final Stopwatch stopwatch = Stopwatch.createStarted(); + for (DiscoveryDruidNode broker : getKnownBrokers()) { pushConfigToBroker(broker); } + emitStat( + Stats.Configuration.TOTAL_SYNC_TIME, + RowKey.empty(), + stopwatch.millisElapsed() + ); } /** @@ -136,12 +155,20 @@ public void onLeaderStop() } } + @LifecycleStop + public void stop() + { + exec.shutdownNow(); + } + /** * Push the latest coordinator dynamic config, provided by the configManager to the Broker at the brokerLocation * param. */ - private void pushConfigToBroker(ServiceLocation brokerLocation) + private void pushConfigToBroker(DiscoveryDruidNode broker) { + final Stopwatch stopwatch = Stopwatch.createStarted(); + final ServiceLocation brokerLocation = CoordinatorDynamicConfigSyncer.convertDiscoveryNodeToServiceLocation(broker); final BrokerClient brokerClient = new BrokerClientImpl( clientFactory.makeClient( NodeRole.BROKER.getJsonName(), @@ -161,19 +188,26 @@ private void pushConfigToBroker(ServiceLocation brokerLocation) catch (Exception e) { // Catch and ignore the exception, wait for the next sync. log.error(e, "Exception while syncing dynamic configuration to broker[%s]", brokerLocation); + emitStat( + Stats.Configuration.BROKER_SYNC_ERROR, + RowKey.with(Dimension.SERVER, broker.getDruidNode().getHostAndPortToUse()).build(), + 1 + ); } + emitStat( + Stats.Configuration.BROKER_SYNC_TIME, + RowKey.with(Dimension.SERVER, broker.getDruidNode().getHostAndPortToUse()).build(), + stopwatch.millisElapsed() + ); } /** - * Returns a list of {@link ServiceLocation} for all brokers currently known to the druidNodeDiscovery. + * Returns a collection of {@link DiscoveryDruidNode} for all brokers currently known to the druidNodeDiscovery. */ - private Set getKnownBrokers() + private Collection getKnownBrokers() { return druidNodeDiscovery.getForNodeRole(NodeRole.BROKER) - .getAllNodes() - .stream() - .map(CoordinatorDynamicConfigSyncer::convertDiscoveryNodeToServiceLocation) - .collect(Collectors.toSet()); + .getAllNodes(); } /** @@ -218,4 +252,13 @@ private static ServiceLocation convertDiscoveryNodeToServiceLocation(DiscoveryDr "" ); } + + private void emitStat(CoordinatorStat stat, RowKey rowKey, long value) + { + ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder(); + rowKey.getValues().forEach( + (dim, dimValue) -> eventBuilder.setDimension(dim.reportedName(), dimValue) + ); + emitter.emit(eventBuilder.setMetric(stat.getMetricName(), value)); + } } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncerTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncerTest.java new file mode 100644 index 000000000000..2764ab0e6f11 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncerTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.google.common.util.concurrent.Futures; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscovery; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.rpc.ServiceClient; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.coordinator.CoordinatorConfigManager; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; + +import java.util.List; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class CoordinatorDynamicConfigSyncerTest +{ + private CoordinatorDynamicConfigSyncer target; + + private ServiceClient serviceClient; + private CoordinatorConfigManager coordinatorConfigManager; + private DruidNodeDiscovery druidNodeDiscovery; + + @Before + public void setUp() throws Exception + { + serviceClient = mock(ServiceClient.class); + + BytesFullResponseHolder holder = mock(BytesFullResponseHolder.class); + doReturn(HttpResponseStatus.OK).when(holder).getStatus(); + doReturn(Futures.immediateFuture(holder)) + .when(serviceClient).asyncRequest(ArgumentMatchers.any(), ArgumentMatchers.any()); + + coordinatorConfigManager = mock(CoordinatorConfigManager.class); + DruidNodeDiscoveryProvider provider = mock(DruidNodeDiscoveryProvider.class); + druidNodeDiscovery = mock(DruidNodeDiscovery.class); + doReturn(druidNodeDiscovery).when(provider).getForNodeRole(NodeRole.BROKER); + + target = new CoordinatorDynamicConfigSyncer( + (serviceName, serviceLocator, retryPolicy) -> serviceClient, + coordinatorConfigManager, + DefaultObjectMapper.INSTANCE, + provider, + mock(ServiceEmitter.class) + ); + } + + @Test + public void testSync() + { + CoordinatorDynamicConfig config = CoordinatorDynamicConfig + .builder() + .withMaxSegmentsToMove(105) + .withReplicantLifetime(500) + .withReplicationThrottleLimit(5) + .build(); + + doReturn(config).when(coordinatorConfigManager).getCurrentDynamicConfig(); + List nodes = List.of( + new DiscoveryDruidNode( + new DruidNode("service", "host", false, 8080, null, true, false), + NodeRole.BROKER, + null, + null + ) + ); + doReturn(nodes).when(druidNodeDiscovery).getAllNodes(); + + target.broadcastConfigToBrokers(); + RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST, "/druid-internal/v1/config/coordinator") + .jsonContent(DefaultObjectMapper.INSTANCE, config); + verify(serviceClient).asyncRequest(eq(requestBuilder), ArgumentMatchers.any()); + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 993f3f7bf4c1..cec857b49693 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -226,7 +226,7 @@ public void configure(Binder binder) binder.bind(DruidCoordinatorConfig.class); binder.bind(RedirectFilter.class).in(LazySingleton.class); - binder.bind(CoordinatorDynamicConfigSyncer.class).in(LazySingleton.class); + binder.bind(CoordinatorDynamicConfigSyncer.class).in(ManageLifecycle.class); if (beOverlord) { binder.bind(RedirectInfo.class).to(CoordinatorOverlordRedirectInfo.class).in(LazySingleton.class); } else {