Skip to content

Add metrics for Historical cloning #17956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/api-reference/service-status-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ Host: http://COORDINATOR_IP:COORDINATOR_PORT
{
"host": "localhost",
"port": 8082,
"syncTimeInMs": 1745756337472
"lastSyncTimestampMillis": 1745756337472
}
]
}
Expand Down
2 changes: 1 addition & 1 deletion docs/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ See [SQL query context](sql-query-context.md) for other query context parameters
|`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:<br />- Log the stack trace of the exception (if any) produced by the query |
|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.|
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `clonesPreferred`. `excludeClones` means that clone Historicals are not queried by the broker. `clonesPreferred` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones; Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries; MSQ does not query Historicals directly, and Dart will not respect this context parameter.|
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `preferClones`. `excludeClones` means that clone Historicals are not queried by the broker. `preferClones` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones. Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries. MSQ does not query Historicals directly, and Dart will not respect this context parameter.|

## Parameters by query type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private synchronized Set<String> getCurrentServersToIgnore(CloneQueryMode cloneQ
// Don't remove either.
return Set.of();
default:
throw DruidException.defensive("Unexpected value: [%s]", cloneQueryMode);
throw DruidException.defensive("Unexpected value of cloneQueryMode[%s]", cloneQueryMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
if (!targetProjectedSegments.contains(segment) && loadQueueManager.loadSegment(segment, targetServer, SegmentAction.LOAD)) {
stats.add(
Stats.Segments.ASSIGNED_TO_CLONE,
RowKey.of(Dimension.SERVER, targetServer.getServer().getHost()),
RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
1L
);
}
Expand All @@ -116,7 +116,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
if (!sourceProjectedSegments.contains(segment) && loadQueueManager.dropSegment(segment, targetServer)) {
stats.add(
Stats.Segments.DROPPED_FROM_CLONE,
RowKey.of(Dimension.SERVER, targetServer.getServer().getHost()),
RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
1L
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,14 @@ public static class Balancer
public static final CoordinatorStat COMPUTE_THREADS = CoordinatorStat.toDebugOnly("balancerComputeThreads");
public static final CoordinatorStat MAX_TO_MOVE = CoordinatorStat.toDebugOnly("maxToMove");
}

public static class Configuration
{
public static final CoordinatorStat BROKER_SYNC_TIME
= CoordinatorStat.toDebugAndEmit("brokerSyncTime", "config/brokerSync/time");
public static final CoordinatorStat TOTAL_SYNC_TIME
= CoordinatorStat.toDebugAndEmit("totalBrokerSyncTime", "config/brokerSync/total/time");
public static final CoordinatorStat BROKER_SYNC_ERROR
= CoordinatorStat.toDebugAndEmit("configSyncFailure", "config/brokerSync/error");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,25 @@ public class BrokerSyncStatus
{
private final String host;
private final int port;
private final long syncTimeInMs;
private final long lastSyncTimestampMillis;

@JsonCreator
public BrokerSyncStatus(
@JsonProperty("host") String host,
@JsonProperty("port") int port,
@JsonProperty("syncTimeInMs") long syncTimeInMs
@JsonProperty("lastSyncTimestampMillis") long lastSyncTimestampMillis
)
{
this.host = host;
this.port = port;
this.syncTimeInMs = syncTimeInMs;
this.lastSyncTimestampMillis = lastSyncTimestampMillis;
}

public BrokerSyncStatus(ServiceLocation broker, long syncTimeInMs)
public BrokerSyncStatus(ServiceLocation broker, long lastSyncTimestampMillis)
{
this.host = broker.getHost();
this.port = broker.getTlsPort() > 0 ? broker.getTlsPort() : broker.getPlaintextPort();
this.syncTimeInMs = syncTimeInMs;
this.lastSyncTimestampMillis = lastSyncTimestampMillis;
}

@JsonProperty
Expand All @@ -66,9 +66,9 @@ public int getPort()
}

@JsonProperty
public long getSyncTimeInMs()
public long getLastSyncTimestampMillis()
{
return syncTimeInMs;
return lastSyncTimestampMillis;
}

@Override
Expand Down Expand Up @@ -96,7 +96,7 @@ public String toString()
return "BrokerSyncStatus{" +
"host='" + host + '\'' +
", port=" + port +
", syncTimeInMs=" + syncTimeInMs +
", lastSyncTimestampMillis=" + lastSyncTimestampMillis +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.server.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.druid.client.broker.BrokerClient;
Expand All @@ -29,24 +30,32 @@
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.rpc.FixedServiceLocator;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
* Updates all brokers with the latest coordinator dynamic config.
Expand All @@ -61,6 +70,7 @@ public class CoordinatorDynamicConfigSyncer

private final ServiceClientFactory clientFactory;
private final ScheduledExecutorService exec;
private final ServiceEmitter emitter;
private @Nullable Future<?> syncFuture = null;

@GuardedBy("this")
Expand All @@ -72,7 +82,8 @@ public CoordinatorDynamicConfigSyncer(
@EscalatedGlobal final ServiceClientFactory clientFactory,
final CoordinatorConfigManager configManager,
@Json final ObjectMapper jsonMapper,
final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
final ServiceEmitter emitter
)
{
this.clientFactory = clientFactory;
Expand All @@ -81,6 +92,7 @@ public CoordinatorDynamicConfigSyncer(
this.druidNodeDiscovery = druidNodeDiscoveryProvider;
this.exec = Execs.scheduledSingleThreaded("CoordinatorDynamicConfigSyncer-%d");
this.inSyncBrokers = ConcurrentHashMap.newKeySet();
this.emitter = emitter;
}

/**
Expand All @@ -95,12 +107,19 @@ public void queueBroadcastConfigToBrokers()
* Push the latest coordinator dynamic config, provided by the configManager to all currently known Brokers. Also
* invalidates the set of inSyncBrokers if the config has changed.
*/
private void broadcastConfigToBrokers()
@VisibleForTesting
void broadcastConfigToBrokers()
{
invalidateInSyncBrokersIfNeeded();
for (ServiceLocation broker : getKnownBrokers()) {
final Stopwatch stopwatch = Stopwatch.createStarted();
for (DiscoveryDruidNode broker : getKnownBrokers()) {
pushConfigToBroker(broker);
}
emitStat(
Stats.Configuration.TOTAL_SYNC_TIME,
RowKey.empty(),
stopwatch.millisElapsed()
);
}

/**
Expand Down Expand Up @@ -136,12 +155,20 @@ public void onLeaderStop()
}
}

@LifecycleStop
public void stop()
{
exec.shutdownNow();
}

/**
* Push the latest coordinator dynamic config, provided by the configManager to the Broker at the brokerLocation
* param.
*/
private void pushConfigToBroker(ServiceLocation brokerLocation)
private void pushConfigToBroker(DiscoveryDruidNode broker)
{
final Stopwatch stopwatch = Stopwatch.createStarted();
final ServiceLocation brokerLocation = CoordinatorDynamicConfigSyncer.convertDiscoveryNodeToServiceLocation(broker);
final BrokerClient brokerClient = new BrokerClientImpl(
clientFactory.makeClient(
NodeRole.BROKER.getJsonName(),
Expand All @@ -161,19 +188,26 @@ private void pushConfigToBroker(ServiceLocation brokerLocation)
catch (Exception e) {
// Catch and ignore the exception, wait for the next sync.
log.error(e, "Exception while syncing dynamic configuration to broker[%s]", brokerLocation);
emitStat(
Stats.Configuration.BROKER_SYNC_ERROR,
RowKey.with(Dimension.SERVER, broker.getDruidNode().getHostAndPortToUse()).build(),
1
);
}
emitStat(
Stats.Configuration.BROKER_SYNC_TIME,
RowKey.with(Dimension.SERVER, broker.getDruidNode().getHostAndPortToUse()).build(),
stopwatch.millisElapsed()
);
}

/**
* Returns a list of {@link ServiceLocation} for all brokers currently known to the druidNodeDiscovery.
* Returns a collection of {@link DiscoveryDruidNode} for all brokers currently known to the druidNodeDiscovery.
*/
private Set<ServiceLocation> getKnownBrokers()
private Collection<DiscoveryDruidNode> getKnownBrokers()
{
return druidNodeDiscovery.getForNodeRole(NodeRole.BROKER)
.getAllNodes()
.stream()
.map(CoordinatorDynamicConfigSyncer::convertDiscoveryNodeToServiceLocation)
.collect(Collectors.toSet());
.getAllNodes();
}

/**
Expand Down Expand Up @@ -218,4 +252,13 @@ private static ServiceLocation convertDiscoveryNodeToServiceLocation(DiscoveryDr
""
);
}

private void emitStat(CoordinatorStat stat, RowKey rowKey, long value)
{
ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder();
rowKey.getValues().forEach(
(dim, dimValue) -> eventBuilder.setDimension(dim.reportedName(), dimValue)
);
emitter.emit(eventBuilder.setMetric(stat.getMetricName(), value));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.http;

import com.google.common.util.concurrent.Futures;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;

import java.util.List;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class CoordinatorDynamicConfigSyncerTest
{
private CoordinatorDynamicConfigSyncer target;

private ServiceClient serviceClient;
private CoordinatorConfigManager coordinatorConfigManager;
private DruidNodeDiscovery druidNodeDiscovery;

@Before
public void setUp() throws Exception
{
serviceClient = mock(ServiceClient.class);

BytesFullResponseHolder holder = mock(BytesFullResponseHolder.class);
doReturn(HttpResponseStatus.OK).when(holder).getStatus();
doReturn(Futures.immediateFuture(holder))
.when(serviceClient).asyncRequest(ArgumentMatchers.any(), ArgumentMatchers.any());

coordinatorConfigManager = mock(CoordinatorConfigManager.class);
DruidNodeDiscoveryProvider provider = mock(DruidNodeDiscoveryProvider.class);
druidNodeDiscovery = mock(DruidNodeDiscovery.class);
doReturn(druidNodeDiscovery).when(provider).getForNodeRole(NodeRole.BROKER);

target = new CoordinatorDynamicConfigSyncer(
(serviceName, serviceLocator, retryPolicy) -> serviceClient,
coordinatorConfigManager,
DefaultObjectMapper.INSTANCE,
provider,
mock(ServiceEmitter.class)
);
}

@Test
public void testSync()
{
CoordinatorDynamicConfig config = CoordinatorDynamicConfig
.builder()
.withMaxSegmentsToMove(105)
.withReplicantLifetime(500)
.withReplicationThrottleLimit(5)
.build();

doReturn(config).when(coordinatorConfigManager).getCurrentDynamicConfig();
List<DiscoveryDruidNode> nodes = List.of(
new DiscoveryDruidNode(
new DruidNode("service", "host", false, 8080, null, true, false),
NodeRole.BROKER,
null,
null
)
);
doReturn(nodes).when(druidNodeDiscovery).getAllNodes();

target.broadcastConfigToBrokers();
RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST, "/druid-internal/v1/config/coordinator")
.jsonContent(DefaultObjectMapper.INSTANCE, config);
verify(serviceClient).asyncRequest(eq(requestBuilder), ArgumentMatchers.any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void configure(Binder binder)
binder.bind(DruidCoordinatorConfig.class);

binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(CoordinatorDynamicConfigSyncer.class).in(LazySingleton.class);
binder.bind(CoordinatorDynamicConfigSyncer.class).in(ManageLifecycle.class);
if (beOverlord) {
binder.bind(RedirectInfo.class).to(CoordinatorOverlordRedirectInfo.class).in(LazySingleton.class);
} else {
Expand Down
Loading