Skip to content

Commit c303742

Browse files
lhotarinikhil-ctds
authored andcommitted
[fix][broker] Fix broker shutdown delay by resolving hanging health checks (apache#24210)
(cherry picked from commit 12961ca) (cherry picked from commit 606365a)
1 parent d5dabc0 commit c303742

File tree

9 files changed

+540
-327
lines changed

9 files changed

+540
-327
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 171 additions & 148 deletions
Large diffs are not rendered by default.

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java

Lines changed: 8 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,11 @@
2626
import java.lang.management.ManagementFactory;
2727
import java.lang.management.ThreadInfo;
2828
import java.lang.management.ThreadMXBean;
29-
import java.time.Duration;
30-
import java.util.ArrayList;
3129
import java.util.Arrays;
3230
import java.util.Collections;
3331
import java.util.List;
3432
import java.util.Map;
35-
import java.util.Objects;
36-
import java.util.UUID;
3733
import java.util.concurrent.CompletableFuture;
38-
import java.util.concurrent.TimeoutException;
3934
import java.util.stream.Collectors;
4035
import javax.ws.rs.DELETE;
4136
import javax.ws.rs.DefaultValue;
@@ -50,23 +45,12 @@
5045
import javax.ws.rs.core.Response.Status;
5146
import org.apache.commons.lang.StringUtils;
5247
import org.apache.pulsar.PulsarVersion;
53-
import org.apache.pulsar.broker.PulsarServerException;
54-
import org.apache.pulsar.broker.PulsarService;
5548
import org.apache.pulsar.broker.PulsarService.State;
5649
import org.apache.pulsar.broker.ServiceConfiguration;
5750
import org.apache.pulsar.broker.admin.AdminResource;
5851
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
59-
import org.apache.pulsar.broker.namespace.NamespaceService;
60-
import org.apache.pulsar.broker.service.Subscription;
61-
import org.apache.pulsar.broker.service.Topic;
6252
import org.apache.pulsar.broker.web.RestException;
63-
import org.apache.pulsar.client.api.MessageId;
64-
import org.apache.pulsar.client.api.Producer;
65-
import org.apache.pulsar.client.api.PulsarClient;
66-
import org.apache.pulsar.client.api.Reader;
67-
import org.apache.pulsar.client.api.Schema;
6853
import org.apache.pulsar.common.conf.InternalConfigurationData;
69-
import org.apache.pulsar.common.naming.NamespaceName;
7054
import org.apache.pulsar.common.naming.TopicVersion;
7155
import org.apache.pulsar.common.policies.data.BrokerInfo;
7256
import org.apache.pulsar.common.policies.data.BrokerOperation;
@@ -81,16 +65,9 @@
8165
*/
8266
public class BrokersBase extends AdminResource {
8367
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
84-
public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck";
8568
// log a full thread dump when a deadlock is detected in healthcheck once every 10 minutes
8669
// to prevent excessive logging
8770
private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L;
88-
// there is a timeout of 60 seconds default in the client(readTimeoutMs), so we need to set the timeout
89-
// a bit shorter than 60 seconds to avoid the client timeout exception thrown before the server timeout exception.
90-
// or we can't propagate the server timeout exception to the client.
91-
private static final Duration HEALTH_CHECK_READ_TIMEOUT = Duration.ofSeconds(58);
92-
private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION =
93-
FutureUtil.createTimeoutException("Timeout", BrokersBase.class, "healthCheckRecursiveReadNext(...)");
9471
private static volatile long threadDumpLoggedTimestamp;
9572

9673
@GET
@@ -385,16 +362,21 @@ public void isReady(@Suspended AsyncResponse asyncResponse) {
385362
@ApiResponse(code = 307, message = "Current broker is not the target broker"),
386363
@ApiResponse(code = 403, message = "Don't have admin permission"),
387364
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
388-
@ApiResponse(code = 500, message = "Internal server error")})
365+
@ApiResponse(code = 500, message = "Internal server error"),
366+
@ApiResponse(code = 503, message = "Service unavailable")})
389367
public void healthCheck(@Suspended AsyncResponse asyncResponse,
390368
@ApiParam(value = "Topic Version")
391369
@QueryParam("topicVersion") TopicVersion topicVersion,
392370
@QueryParam("brokerId") String brokerId) {
371+
if (pulsar().getState() == State.Closed || pulsar().getState() == State.Closing) {
372+
asyncResponse.resume(Response.status(Status.SERVICE_UNAVAILABLE).build());
373+
return;
374+
}
393375
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), StringUtils.isBlank(brokerId)
394376
? pulsar().getBrokerId() : brokerId, BrokerOperation.HEALTH_CHECK)
395-
.thenAccept(__ -> checkDeadlockedThreads())
396377
.thenCompose(__ -> maybeRedirectToBroker(
397378
StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId))
379+
.thenAccept(__ -> checkDeadlockedThreads())
398380
.thenCompose(__ -> internalRunHealthCheck(topicVersion))
399381
.thenAccept(__ -> {
400382
LOG.info("[{}] Successfully run health check.", clientAppId());
@@ -432,143 +414,8 @@ private void checkDeadlockedThreads() {
432414
}
433415
}
434416

435-
public static String getHeartbeatTopicName(String brokerId, ServiceConfiguration configuration, boolean isV2) {
436-
NamespaceName namespaceName = isV2
437-
? NamespaceService.getHeartbeatNamespaceV2(brokerId, configuration)
438-
: NamespaceService.getHeartbeatNamespace(brokerId, configuration);
439-
return String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
440-
}
441-
442417
private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
443-
return internalRunHealthCheck(topicVersion, pulsar(), clientAppId());
444-
}
445-
446-
447-
public static CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion, PulsarService pulsar,
448-
String clientAppId) {
449-
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
450-
? NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfiguration())
451-
: NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration());
452-
String brokerId = pulsar.getBrokerId();
453-
final String topicName =
454-
getHeartbeatTopicName(brokerId, pulsar.getConfiguration(), (topicVersion == TopicVersion.V2));
455-
LOG.info("[{}] Running healthCheck with topic={}", clientAppId, topicName);
456-
final String messageStr = UUID.randomUUID().toString();
457-
final String subscriptionName = "healthCheck-" + messageStr;
458-
// create non-partitioned topic manually and close the previous reader if present.
459-
return pulsar.getBrokerService().getTopic(topicName, true)
460-
.thenCompose(topicOptional -> {
461-
if (!topicOptional.isPresent()) {
462-
LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
463-
clientAppId, topicName);
464-
throw new RestException(Status.NOT_FOUND,
465-
String.format("Topic [%s] not found after create.", topicName));
466-
}
467-
PulsarClient client;
468-
try {
469-
client = pulsar.getClient();
470-
} catch (PulsarServerException e) {
471-
LOG.error("[{}] Fail to run health check while get client.", clientAppId);
472-
throw new RestException(e);
473-
}
474-
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
475-
client.newProducer(Schema.STRING).topic(topicName).createAsync()
476-
.thenCompose(producer -> client.newReader(Schema.STRING).topic(topicName)
477-
.subscriptionName(subscriptionName)
478-
.startMessageId(MessageId.latest)
479-
.createAsync().exceptionally(createException -> {
480-
producer.closeAsync().exceptionally(ex -> {
481-
LOG.error("[{}] Close producer fail while heath check.", clientAppId);
482-
return null;
483-
});
484-
throw FutureUtil.wrapToCompletionException(createException);
485-
}).thenCompose(reader -> producer.sendAsync(messageStr)
486-
.thenCompose(__ -> FutureUtil.addTimeoutHandling(
487-
healthCheckRecursiveReadNext(reader, messageStr),
488-
HEALTH_CHECK_READ_TIMEOUT, pulsar.getBrokerService().executor(),
489-
() -> HEALTH_CHECK_TIMEOUT_EXCEPTION))
490-
.whenComplete((__, ex) -> {
491-
closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName,
492-
clientAppId)
493-
.whenComplete((unused, innerEx) -> {
494-
if (ex != null) {
495-
resultFuture.completeExceptionally(ex);
496-
} else {
497-
resultFuture.complete(null);
498-
}
499-
});
500-
}
501-
))
502-
).exceptionally(ex -> {
503-
resultFuture.completeExceptionally(ex);
504-
return null;
505-
});
506-
return resultFuture;
507-
});
508-
}
509-
510-
private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
511-
Topic topic, String subscriptionName) {
512-
return closeAndReCheck(producer, reader, topic, subscriptionName, clientAppId());
513-
}
514-
515-
/**
516-
* Close producer and reader and then to re-check if this operation is success.
517-
*
518-
* Re-check
519-
* - Producer: If close fails we will print error log to notify user.
520-
* - Consumer: If close fails we will force delete subscription.
521-
*
522-
* @param producer Producer
523-
* @param reader Reader
524-
* @param topic Topic
525-
* @param subscriptionName Subscription name
526-
*/
527-
private static CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
528-
Topic topic, String subscriptionName, String clientAppId) {
529-
// no matter exception or success, we still need to
530-
// close producer/reader
531-
CompletableFuture<Void> producerFuture = producer.closeAsync();
532-
CompletableFuture<Void> readerFuture = reader.closeAsync();
533-
List<CompletableFuture<Void>> futures = new ArrayList<>(2);
534-
futures.add(producerFuture);
535-
futures.add(readerFuture);
536-
return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
537-
.exceptionally(closeException -> {
538-
if (readerFuture.isCompletedExceptionally()) {
539-
LOG.error("[{}] Close reader fail while heath check.", clientAppId);
540-
Subscription subscription =
541-
topic.getSubscription(subscriptionName);
542-
// re-check subscription after reader close
543-
if (subscription != null) {
544-
LOG.warn("[{}] Force delete subscription {} "
545-
+ "when it still exists after the"
546-
+ " reader is closed.",
547-
clientAppId, subscription);
548-
subscription.deleteForcefully()
549-
.exceptionally(ex -> {
550-
LOG.error("[{}] Force delete subscription fail"
551-
+ " while health check",
552-
clientAppId, ex);
553-
return null;
554-
});
555-
}
556-
} else {
557-
// producer future fail.
558-
LOG.error("[{}] Close producer fail while heath check.", clientAppId);
559-
}
560-
return null;
561-
});
562-
}
563-
564-
private static CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) {
565-
return reader.readNextAsync()
566-
.thenCompose(msg -> {
567-
if (!Objects.equals(content, msg.getValue())) {
568-
return healthCheckRecursiveReadNext(reader, content);
569-
}
570-
return CompletableFuture.completedFuture(null);
571-
});
418+
return pulsar().runHealthCheck(topicVersion, clientAppId());
572419
}
573420

574421
private CompletableFuture<Void> internalDeleteDynamicConfigurationOnMetadataAsync(String configName) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
2525
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
2626
import static org.apache.commons.lang3.StringUtils.isNotBlank;
27-
import static org.apache.pulsar.broker.admin.impl.BrokersBase.internalRunHealthCheck;
2827
import static org.apache.pulsar.client.util.RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
2928
import static org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
3029
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
@@ -679,7 +678,10 @@ protected void initializeHealthChecker() {
679678
}
680679

681680
public CompletableFuture<Void> checkHealth() {
682-
return internalRunHealthCheck(TopicVersion.V2, pulsar(), null).thenAccept(__ -> {
681+
if (!pulsar().isRunning()) {
682+
return CompletableFuture.completedFuture(null);
683+
}
684+
return pulsar().runHealthCheck(TopicVersion.V2, null).thenAccept(__ -> {
683685
this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusSuccess();
684686
}).exceptionally(ex -> {
685687
this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusFail();

0 commit comments

Comments
 (0)