Skip to content

[fix][broker] Fix broker shutdown delay by resolving hanging health checks #24210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 171 additions & 148 deletions pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,11 @@
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
Expand All @@ -50,23 +45,12 @@
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.PulsarService.State;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.BrokerOperation;
Expand All @@ -81,16 +65,9 @@
*/
public class BrokersBase extends AdminResource {
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck";
// log a full thread dump when a deadlock is detected in healthcheck once every 10 minutes
// to prevent excessive logging
private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L;
// there is a timeout of 60 seconds default in the client(readTimeoutMs), so we need to set the timeout
// a bit shorter than 60 seconds to avoid the client timeout exception thrown before the server timeout exception.
// or we can't propagate the server timeout exception to the client.
private static final Duration HEALTH_CHECK_READ_TIMEOUT = Duration.ofSeconds(58);
private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION =
FutureUtil.createTimeoutException("Timeout", BrokersBase.class, "healthCheckRecursiveReadNext(...)");
private static volatile long threadDumpLoggedTimestamp;

@GET
Expand Down Expand Up @@ -385,16 +362,21 @@ public void isReady(@Suspended AsyncResponse asyncResponse) {
@ApiResponse(code = 307, message = "Current broker is not the target broker"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Service unavailable")})
public void healthCheck(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Topic Version")
@QueryParam("topicVersion") TopicVersion topicVersion,
@QueryParam("brokerId") String brokerId) {
if (pulsar().getState() == State.Closed || pulsar().getState() == State.Closing) {
asyncResponse.resume(Response.status(Status.SERVICE_UNAVAILABLE).build());
return;
}
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), StringUtils.isBlank(brokerId)
? pulsar().getBrokerId() : brokerId, BrokerOperation.HEALTH_CHECK)
.thenAccept(__ -> checkDeadlockedThreads())
.thenCompose(__ -> maybeRedirectToBroker(
StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId))
.thenAccept(__ -> checkDeadlockedThreads())
.thenCompose(__ -> internalRunHealthCheck(topicVersion))
.thenAccept(__ -> {
LOG.info("[{}] Successfully run health check.", clientAppId());
Expand Down Expand Up @@ -432,143 +414,8 @@ private void checkDeadlockedThreads() {
}
}

public static String getHeartbeatTopicName(String brokerId, ServiceConfiguration configuration, boolean isV2) {
NamespaceName namespaceName = isV2
? NamespaceService.getHeartbeatNamespaceV2(brokerId, configuration)
: NamespaceService.getHeartbeatNamespace(brokerId, configuration);
return String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
}

private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
return internalRunHealthCheck(topicVersion, pulsar(), clientAppId());
}


public static CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion, PulsarService pulsar,
String clientAppId) {
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
? NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfiguration())
: NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration());
String brokerId = pulsar.getBrokerId();
final String topicName =
getHeartbeatTopicName(brokerId, pulsar.getConfiguration(), (topicVersion == TopicVersion.V2));
LOG.info("[{}] Running healthCheck with topic={}", clientAppId, topicName);
final String messageStr = UUID.randomUUID().toString();
final String subscriptionName = "healthCheck-" + messageStr;
// create non-partitioned topic manually and close the previous reader if present.
return pulsar.getBrokerService().getTopic(topicName, true)
.thenCompose(topicOptional -> {
if (!topicOptional.isPresent()) {
LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
clientAppId, topicName);
throw new RestException(Status.NOT_FOUND,
String.format("Topic [%s] not found after create.", topicName));
}
PulsarClient client;
try {
client = pulsar.getClient();
} catch (PulsarServerException e) {
LOG.error("[{}] Fail to run health check while get client.", clientAppId);
throw new RestException(e);
}
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
client.newProducer(Schema.STRING).topic(topicName).createAsync()
.thenCompose(producer -> client.newReader(Schema.STRING).topic(topicName)
.subscriptionName(subscriptionName)
.startMessageId(MessageId.latest)
.createAsync().exceptionally(createException -> {
producer.closeAsync().exceptionally(ex -> {
LOG.error("[{}] Close producer fail while heath check.", clientAppId);
return null;
});
throw FutureUtil.wrapToCompletionException(createException);
}).thenCompose(reader -> producer.sendAsync(messageStr)
.thenCompose(__ -> FutureUtil.addTimeoutHandling(
healthCheckRecursiveReadNext(reader, messageStr),
HEALTH_CHECK_READ_TIMEOUT, pulsar.getBrokerService().executor(),
() -> HEALTH_CHECK_TIMEOUT_EXCEPTION))
.whenComplete((__, ex) -> {
closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName,
clientAppId)
.whenComplete((unused, innerEx) -> {
if (ex != null) {
resultFuture.completeExceptionally(ex);
} else {
resultFuture.complete(null);
}
});
}
))
).exceptionally(ex -> {
resultFuture.completeExceptionally(ex);
return null;
});
return resultFuture;
});
}

private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
Topic topic, String subscriptionName) {
return closeAndReCheck(producer, reader, topic, subscriptionName, clientAppId());
}

/**
* Close producer and reader and then to re-check if this operation is success.
*
* Re-check
* - Producer: If close fails we will print error log to notify user.
* - Consumer: If close fails we will force delete subscription.
*
* @param producer Producer
* @param reader Reader
* @param topic Topic
* @param subscriptionName Subscription name
*/
private static CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
Topic topic, String subscriptionName, String clientAppId) {
// no matter exception or success, we still need to
// close producer/reader
CompletableFuture<Void> producerFuture = producer.closeAsync();
CompletableFuture<Void> readerFuture = reader.closeAsync();
List<CompletableFuture<Void>> futures = new ArrayList<>(2);
futures.add(producerFuture);
futures.add(readerFuture);
return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
.exceptionally(closeException -> {
if (readerFuture.isCompletedExceptionally()) {
LOG.error("[{}] Close reader fail while heath check.", clientAppId);
Subscription subscription =
topic.getSubscription(subscriptionName);
// re-check subscription after reader close
if (subscription != null) {
LOG.warn("[{}] Force delete subscription {} "
+ "when it still exists after the"
+ " reader is closed.",
clientAppId, subscription);
subscription.deleteForcefully()
.exceptionally(ex -> {
LOG.error("[{}] Force delete subscription fail"
+ " while health check",
clientAppId, ex);
return null;
});
}
} else {
// producer future fail.
LOG.error("[{}] Close producer fail while heath check.", clientAppId);
}
return null;
});
}

private static CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) {
return reader.readNextAsync()
.thenCompose(msg -> {
if (!Objects.equals(content, msg.getValue())) {
return healthCheckRecursiveReadNext(reader, content);
}
return CompletableFuture.completedFuture(null);
});
return pulsar().runHealthCheck(topicVersion, clientAppId());
}

private CompletableFuture<Void> internalDeleteDynamicConfigurationOnMetadataAsync(String configName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.BrokersBase.internalRunHealthCheck;
import static org.apache.pulsar.client.util.RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
import static org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
Expand Down Expand Up @@ -679,7 +678,10 @@ protected void initializeHealthChecker() {
}

public CompletableFuture<Void> checkHealth() {
return internalRunHealthCheck(TopicVersion.V2, pulsar(), null).thenAccept(__ -> {
if (!pulsar().isRunning()) {
return CompletableFuture.completedFuture(null);
}
return pulsar().runHealthCheck(TopicVersion.V2, null).thenAccept(__ -> {
this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusSuccess();
}).exceptionally(ex -> {
this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusFail();
Expand Down
Loading
Loading