Skip to content

Commit 228304e

Browse files
GWphuaasdf2014
andauthored
Fix Huge Number of Watches in ZooKeeper (#17482)
--------- Co-authored-by: asdf2014 <[email protected]>
1 parent d8cfac8 commit 228304e

File tree

22 files changed

+1125
-151
lines changed

22 files changed

+1125
-151
lines changed

docs/api-reference/tasks-api.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -1059,9 +1059,9 @@ Host: http://ROUTER_IP:ROUTER_PORT
10591059
2023-07-03T22:11:17,933 INFO [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
10601060
2023-07-03T22:11:17,933 INFO [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
10611061
2023-07-03T22:11:17,935 INFO [task-runner-0-priority-0] org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-kafka-supervisor-dcanhmig-1 unregistered
1062-
2023-07-03T22:11:17,936 INFO [task-runner-0-priority-0] org.apache.druid.curator.announcement.Announcer - Unannouncing [/druid/internal-discovery/PEON/localhost:8100]
1062+
2023-07-03T22:11:17,936 INFO [task-runner-0-priority-0] org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing [/druid/internal-discovery/PEON/localhost:8100]
10631063
2023-07-03T22:11:17,972 INFO [task-runner-0-priority-0] org.apache.druid.curator.discovery.CuratorDruidNodeAnnouncer - Unannounced self [{"druidNode":{"service":"druid/middleManager","host":"localhost","bindOnHost":false,"plaintextPort":8100,"port":-1,"tlsPort":-1,"enablePlaintextPort":true,"enableTlsPort":false},"nodeType":"peon","services":{"dataNodeService":{"type":"dataNodeService","tier":"_default_tier","maxSize":0,"type":"indexer-executor","serverType":"indexer-executor","priority":0},"lookupNodeService":{"type":"lookupNodeService","lookupTier":"__default"}}}].
1064-
2023-07-03T22:11:17,972 INFO [task-runner-0-priority-0] org.apache.druid.curator.announcement.Announcer - Unannouncing [/druid/announcements/localhost:8100]
1064+
2023-07-03T22:11:17,972 INFO [task-runner-0-priority-0] org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing [/druid/announcements/localhost:8100]
10651065
2023-07-03T22:11:17,996 INFO [task-runner-0-priority-0] org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
10661066
"id" : "index_kafka_social_media_0e905aa31037879_nommnaeg",
10671067
"status" : "SUCCESS",

docs/configuration/index.md

+1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ We recommend just setting the base ZK path and the ZK service host, but all ZK p
148148
|`druid.zk.service.connectionTimeoutMs`|ZooKeeper connection timeout, in milliseconds.|`15000`|
149149
|`druid.zk.service.compress`|Boolean flag for whether or not created Znodes should be compressed.|`true`|
150150
|`druid.zk.service.acl`|Boolean flag for whether or not to enable ACL security for ZooKeeper. If ACL is enabled, zNode creators will have all permissions.|`false`|
151+
|`druid.zk.service.pathChildrenCacheStrategy`|Dictates the underlying caching strategy for service announcements. Set true to let announcers to use Apache Curator's PathChildrenCache strategy, otherwise NodeCache strategy. Consider using NodeCache strategy when you are dealing with huge number of ZooKeeper watches in your cluster.|`true`|
151152

152153
#### Path configuration
153154

indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
import com.google.inject.Inject;
2626
import org.apache.curator.framework.CuratorFramework;
2727
import org.apache.druid.curator.CuratorUtils;
28-
import org.apache.druid.curator.announcement.Announcer;
28+
import org.apache.druid.curator.announcement.ServiceAnnouncer;
29+
import org.apache.druid.guice.annotations.DirectExecutorAnnouncer;
2930
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
3031
import org.apache.druid.java.util.common.DateTimes;
3132
import org.apache.druid.java.util.common.ISE;
32-
import org.apache.druid.java.util.common.concurrent.Execs;
3333
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
3434
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
3535
import org.apache.druid.java.util.common.logger.Logger;
@@ -54,7 +54,7 @@ public class WorkerCuratorCoordinator
5454
private final ObjectMapper jsonMapper;
5555
private final RemoteTaskRunnerConfig config;
5656
private final CuratorFramework curatorFramework;
57-
private final Announcer announcer;
57+
private final ServiceAnnouncer announcer;
5858

5959
private final String baseAnnouncementsPath;
6060
private final String baseTaskPath;
@@ -69,15 +69,15 @@ public WorkerCuratorCoordinator(
6969
IndexerZkConfig indexerZkConfig,
7070
RemoteTaskRunnerConfig config,
7171
CuratorFramework curatorFramework,
72+
@DirectExecutorAnnouncer ServiceAnnouncer announcer,
7273
Worker worker
7374
)
7475
{
7576
this.jsonMapper = jsonMapper;
7677
this.config = config;
7778
this.curatorFramework = curatorFramework;
7879
this.worker = worker;
79-
80-
this.announcer = new Announcer(curatorFramework, Execs.directExecutor());
80+
this.announcer = announcer;
8181

8282
this.baseAnnouncementsPath = getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(), worker.getHost()));
8383
this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(), worker.getHost()));
@@ -87,7 +87,7 @@ public WorkerCuratorCoordinator(
8787
@LifecycleStart
8888
public void start() throws Exception
8989
{
90-
log.info("WorkerCuratorCoordinator good to go sir. Server[%s]", worker.getHost());
90+
log.info("WorkerCuratorCoordinator good to go. Server[%s]", worker.getHost());
9191
synchronized (lock) {
9292
if (started) {
9393
return;

indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.curator.test.TestingCluster;
2929
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
3030
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
31+
import org.apache.druid.curator.announcement.NodeAnnouncer;
3132
import org.apache.druid.indexer.TaskState;
3233
import org.apache.druid.indexing.common.IndexingServiceCondition;
3334
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -47,6 +48,7 @@
4748
import org.apache.druid.indexing.worker.config.WorkerConfig;
4849
import org.apache.druid.java.util.common.FileUtils;
4950
import org.apache.druid.java.util.common.StringUtils;
51+
import org.apache.druid.java.util.common.concurrent.Execs;
5052
import org.apache.druid.query.policy.NoopPolicyEnforcer;
5153
import org.apache.druid.rpc.indexing.NoopOverlordClient;
5254
import org.apache.druid.rpc.indexing.OverlordClient;
@@ -142,6 +144,7 @@ public String getBase()
142144
),
143145
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
144146
cf,
147+
new NodeAnnouncer(cf, Execs.directExecutor()),
145148
worker
146149
);
147150
workerCuratorCoordinator.start();

indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@
2626
import org.apache.curator.test.TestingCluster;
2727
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
2828
import org.apache.druid.curator.ZkEnablementConfig;
29+
import org.apache.druid.curator.announcement.NodeAnnouncer;
2930
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
3031
import org.apache.druid.indexing.worker.Worker;
3132
import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
3233
import org.apache.druid.indexing.worker.WorkerTaskMonitor;
3334
import org.apache.druid.indexing.worker.config.WorkerConfig;
3435
import org.apache.druid.jackson.DefaultObjectMapper;
3536
import org.apache.druid.java.util.common.StringUtils;
37+
import org.apache.druid.java.util.common.concurrent.Execs;
3638
import org.apache.druid.server.initialization.IndexerZkConfig;
3739
import org.apache.druid.server.initialization.ZkPathsConfig;
3840
import org.easymock.EasyMock;
@@ -95,6 +97,7 @@ public String getBase()
9597
}, null, null, null, null),
9698
new RemoteTaskRunnerConfig(),
9799
cf,
100+
new NodeAnnouncer(cf, Execs.directExecutor()),
98101
worker
99102
);
100103
curatorCoordinator.start();

server/src/main/java/org/apache/druid/curator/CuratorConfig.java

+8
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ public class CuratorConfig
6363
@JsonProperty("maxZkRetries")
6464
private int maxZkRetries = 29;
6565

66+
@JsonProperty("pathChildrenCacheStrategy")
67+
private boolean pathChildrenCacheStrategy = true;
68+
6669
public static CuratorConfig create(String hosts)
6770
{
6871
CuratorConfig config = new CuratorConfig();
@@ -141,4 +144,9 @@ public int getMaxZkRetries()
141144
{
142145
return maxZkRetries;
143146
}
147+
148+
public boolean getPathChildrenCacheStrategy()
149+
{
150+
return pathChildrenCacheStrategy;
151+
}
144152
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.curator.announcement;
21+
22+
/**
23+
* The {@link Announceable} is a representation of an announcement to be made in ZooKeeper.
24+
*/
25+
class Announceable
26+
{
27+
/**
28+
* Represents the path in ZooKeeper where the announcement will be made.
29+
*/
30+
final String path;
31+
32+
/**
33+
* Holds the actual data to be announced.
34+
*/
35+
final byte[] bytes;
36+
37+
/**
38+
* Indicates whether parent nodes should be removed if the announcement is created successfully.
39+
* This can be useful for cleaning up unused paths in ZooKeeper.
40+
*/
41+
final boolean removeParentsIfCreated;
42+
43+
public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated)
44+
{
45+
this.path = path;
46+
this.bytes = bytes;
47+
this.removeParentsIfCreated = removeParentsIfCreated;
48+
}
49+
50+
// This should be used for updates only, where removeParentsIfCreated is not relevant.
51+
public Announceable(String path, byte[] bytes)
52+
{
53+
// removeParentsIfCreated is irrelevant, so we can use dummy value "false".
54+
this(path, bytes, false);
55+
}
56+
}

0 commit comments

Comments
 (0)