Skip to content

Commit 69fd898

Browse files
lhotariganesh-ctds
authored andcommitted
[fix][broker] Fix broker shutdown delay by resolving hanging health checks (apache#24210)
(cherry picked from commit 12961ca) (cherry picked from commit 6455b02)
1 parent d027449 commit 69fd898

File tree

6 files changed

+559
-329
lines changed

6 files changed

+559
-329
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 168 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import org.apache.pulsar.broker.resources.PulsarResources;
9999
import org.apache.pulsar.broker.rest.Topics;
100100
import org.apache.pulsar.broker.service.BrokerService;
101+
import org.apache.pulsar.broker.service.HealthChecker;
101102
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
102103
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
103104
import org.apache.pulsar.broker.service.Topic;
@@ -139,6 +140,7 @@
139140
import org.apache.pulsar.common.naming.NamespaceBundle;
140141
import org.apache.pulsar.common.naming.NamespaceName;
141142
import org.apache.pulsar.common.naming.TopicName;
143+
import org.apache.pulsar.common.naming.TopicVersion;
142144
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
143145
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
144146
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
@@ -287,6 +289,7 @@ public enum State {
287289
private volatile CompletableFuture<Void> closeFuture;
288290
// key is listener name, value is pulsar address and pulsar ssl address
289291
private Map<String, AdvertisedListener> advertisedListeners;
292+
private volatile HealthChecker healthChecker;
290293

291294
public PulsarService(ServiceConfiguration config) {
292295
this(config, Optional.empty(), (exitCode) -> LOG.info("Process termination requested with code {}. "
@@ -433,6 +436,11 @@ public CompletableFuture<Void> closeAsync() {
433436
}
434437
state = State.Closing;
435438

439+
if (healthChecker != null) {
440+
healthChecker.close();
441+
healthChecker = null;
442+
}
443+
436444
// close the service in reverse order v.s. in which they are started
437445
if (this.resourceUsageTransportManager != null) {
438446
try {
@@ -1575,136 +1583,157 @@ protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImp
15751583
return this.offloaderScheduler;
15761584
}
15771585

1578-
public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
1586+
public PulsarClientImpl createClientImpl(ClientConfigurationData conf) throws PulsarClientException {
1587+
return createClientImpl(conf, null);
1588+
}
1589+
1590+
public PulsarClientImpl createClientImpl(Consumer<PulsarClientImpl.PulsarClientImplBuilder> customizer)
1591+
throws PulsarClientException {
1592+
return createClientImpl(null, customizer);
1593+
}
1594+
1595+
public PulsarClientImpl createClientImpl(ClientConfigurationData conf,
1596+
Consumer<PulsarClientImpl.PulsarClientImplBuilder> customizer)
15791597
throws PulsarClientException {
1580-
return PulsarClientImpl.builder()
1581-
.conf(clientConf)
1598+
PulsarClientImpl.PulsarClientImplBuilder pulsarClientImplBuilder = PulsarClientImpl.builder()
1599+
.conf(conf != null ? conf : createClientConfigurationData())
15821600
.eventLoopGroup(ioEventLoopGroup)
15831601
.timer(brokerClientSharedTimer)
15841602
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
15851603
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
15861604
.scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider)
1587-
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider)
1588-
.build();
1605+
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider);
1606+
if (customizer != null) {
1607+
customizer.accept(pulsarClientImplBuilder);
1608+
}
1609+
return pulsarClientImplBuilder.build();
15891610
}
15901611

15911612
public synchronized PulsarClient getClient() throws PulsarServerException {
15921613
if (this.client == null) {
15931614
try {
1594-
ClientConfigurationData initialConf = new ClientConfigurationData();
1595-
1596-
// Disable memory limit for broker client and disable stats
1597-
initialConf.setMemoryLimitBytes(0);
1598-
initialConf.setStatsIntervalSeconds(0);
1599-
1600-
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
1601-
// @Secret on the ClientConfigurationData object because of the way they are serialized.
1602-
// See https://github.com/apache/pulsar/issues/8509 for more information.
1603-
Map<String, Object> overrides = PropertiesUtils
1604-
.filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_");
1605-
ClientConfigurationData conf =
1606-
ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class);
1607-
1608-
// Disabled auto release useless connections
1609-
// The automatic release connection feature is not yet perfect for transaction scenarios, so turn it
1610-
// off first.
1611-
conf.setConnectionMaxIdleSeconds(-1);
1612-
1613-
boolean tlsEnabled = this.getConfiguration().isBrokerClientTlsEnabled();
1614-
conf.setServiceUrl(tlsEnabled ? this.brokerServiceUrlTls : this.brokerServiceUrl);
1615-
1616-
if (tlsEnabled) {
1617-
conf.setTlsCiphers(this.getConfiguration().getBrokerClientTlsCiphers());
1618-
conf.setTlsProtocols(this.getConfiguration().getBrokerClientTlsProtocols());
1619-
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
1620-
conf.setTlsHostnameVerificationEnable(this.getConfiguration().isTlsHostnameVerificationEnabled());
1621-
if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
1622-
conf.setUseKeyStoreTls(true);
1623-
conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType());
1624-
conf.setTlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore());
1625-
conf.setTlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword());
1626-
conf.setTlsKeyStoreType(this.getConfiguration().getBrokerClientTlsKeyStoreType());
1627-
conf.setTlsKeyStorePath(this.getConfiguration().getBrokerClientTlsKeyStore());
1628-
conf.setTlsKeyStorePassword(this.getConfiguration().getBrokerClientTlsKeyStorePassword());
1629-
} else {
1630-
conf.setTlsTrustCertsFilePath(
1631-
isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
1632-
? this.getConfiguration().getBrokerClientTrustCertsFilePath()
1633-
: this.getConfiguration().getTlsTrustCertsFilePath());
1634-
conf.setTlsKeyFilePath(this.getConfiguration().getBrokerClientKeyFilePath());
1635-
conf.setTlsCertificateFilePath(this.getConfiguration().getBrokerClientCertificateFilePath());
1636-
}
1637-
}
1638-
1639-
if (isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) {
1640-
conf.setAuthPluginClassName(this.getConfiguration().getBrokerClientAuthenticationPlugin());
1641-
conf.setAuthParams(this.getConfiguration().getBrokerClientAuthenticationParameters());
1642-
conf.setAuthParamMap(null);
1643-
conf.setAuthentication(AuthenticationFactory.create(
1644-
this.getConfiguration().getBrokerClientAuthenticationPlugin(),
1645-
this.getConfiguration().getBrokerClientAuthenticationParameters()));
1646-
}
1647-
this.client = createClientImpl(conf);
1615+
this.client = createClientImpl(null, null);
16481616
} catch (Exception e) {
16491617
throw new PulsarServerException(e);
16501618
}
16511619
}
16521620
return this.client;
16531621
}
16541622

1623+
protected ClientConfigurationData createClientConfigurationData()
1624+
throws PulsarClientException.UnsupportedAuthenticationException {
1625+
ClientConfigurationData initialConf = new ClientConfigurationData();
1626+
1627+
// Disable memory limit for broker client and disable stats
1628+
initialConf.setMemoryLimitBytes(0);
1629+
initialConf.setStatsIntervalSeconds(0);
1630+
1631+
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
1632+
// @Secret on the ClientConfigurationData object because of the way they are serialized.
1633+
// See https://github.com/apache/pulsar/issues/8509 for more information.
1634+
Map<String, Object> overrides = PropertiesUtils
1635+
.filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_");
1636+
ClientConfigurationData conf =
1637+
ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class);
1638+
1639+
// Disabled auto release useless connections
1640+
// The automatic release connection feature is not yet perfect for transaction scenarios, so turn it
1641+
// off first.
1642+
conf.setConnectionMaxIdleSeconds(-1);
1643+
1644+
boolean tlsEnabled = this.getConfiguration().isBrokerClientTlsEnabled();
1645+
conf.setServiceUrl(tlsEnabled ? this.brokerServiceUrlTls : this.brokerServiceUrl);
1646+
1647+
if (tlsEnabled) {
1648+
conf.setTlsCiphers(this.getConfiguration().getBrokerClientTlsCiphers());
1649+
conf.setTlsProtocols(this.getConfiguration().getBrokerClientTlsProtocols());
1650+
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
1651+
conf.setTlsHostnameVerificationEnable(this.getConfiguration().isTlsHostnameVerificationEnabled());
1652+
if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
1653+
conf.setUseKeyStoreTls(true);
1654+
conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType());
1655+
conf.setTlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore());
1656+
conf.setTlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword());
1657+
conf.setTlsKeyStoreType(this.getConfiguration().getBrokerClientTlsKeyStoreType());
1658+
conf.setTlsKeyStorePath(this.getConfiguration().getBrokerClientTlsKeyStore());
1659+
conf.setTlsKeyStorePassword(this.getConfiguration().getBrokerClientTlsKeyStorePassword());
1660+
} else {
1661+
conf.setTlsTrustCertsFilePath(
1662+
isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
1663+
? this.getConfiguration().getBrokerClientTrustCertsFilePath()
1664+
: this.getConfiguration().getTlsTrustCertsFilePath());
1665+
conf.setTlsKeyFilePath(this.getConfiguration().getBrokerClientKeyFilePath());
1666+
conf.setTlsCertificateFilePath(this.getConfiguration().getBrokerClientCertificateFilePath());
1667+
}
1668+
}
1669+
1670+
if (isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) {
1671+
conf.setAuthPluginClassName(this.getConfiguration().getBrokerClientAuthenticationPlugin());
1672+
conf.setAuthParams(this.getConfiguration().getBrokerClientAuthenticationParameters());
1673+
conf.setAuthParamMap(null);
1674+
conf.setAuthentication(AuthenticationFactory.create(
1675+
this.getConfiguration().getBrokerClientAuthenticationPlugin(),
1676+
this.getConfiguration().getBrokerClientAuthenticationParameters()));
1677+
}
1678+
return conf;
1679+
}
1680+
16551681
public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
16561682
if (this.adminClient == null) {
16571683
try {
1658-
ServiceConfiguration conf = this.getConfiguration();
1659-
final String adminApiUrl = conf.isBrokerClientTlsEnabled() ? webServiceAddressTls : webServiceAddress;
1660-
if (adminApiUrl == null) {
1661-
throw new IllegalArgumentException("Web service address was not set properly "
1662-
+ ", isBrokerClientTlsEnabled: " + conf.isBrokerClientTlsEnabled()
1663-
+ ", webServiceAddressTls: " + webServiceAddressTls
1664-
+ ", webServiceAddress: " + webServiceAddress);
1665-
}
1666-
PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
1667-
1668-
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
1669-
// @Secret on the ClientConfigurationData object because of the way they are serialized.
1670-
// See https://github.com/apache/pulsar/issues/8509 for more information.
1671-
builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
1684+
this.adminClient = getCreateAdminClientBuilder().build();
1685+
LOG.info("created admin with url {} ", adminClient.getServiceUrl());
1686+
} catch (Exception e) {
1687+
throw new PulsarServerException(e);
1688+
}
1689+
}
1690+
return this.adminClient;
1691+
}
16721692

1673-
builder.authentication(
1674-
conf.getBrokerClientAuthenticationPlugin(),
1675-
conf.getBrokerClientAuthenticationParameters());
1676-
1677-
if (conf.isBrokerClientTlsEnabled()) {
1678-
builder.tlsCiphers(config.getBrokerClientTlsCiphers())
1679-
.tlsProtocols(config.getBrokerClientTlsProtocols());
1680-
if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
1681-
builder.useKeyStoreTls(true).tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
1682-
.tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
1683-
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword())
1684-
.tlsKeyStoreType(conf.getBrokerClientTlsKeyStoreType())
1685-
.tlsKeyStorePath(conf.getBrokerClientTlsKeyStore())
1686-
.tlsKeyStorePassword(conf.getBrokerClientTlsKeyStorePassword());
1687-
} else {
1688-
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath())
1689-
.tlsKeyFilePath(conf.getBrokerClientKeyFilePath())
1690-
.tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath());
1691-
}
1692-
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection())
1693-
.enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled());
1694-
}
1693+
protected PulsarAdminBuilder getCreateAdminClientBuilder()
1694+
throws PulsarClientException.UnsupportedAuthenticationException {
1695+
ServiceConfiguration conf = this.getConfiguration();
1696+
final String adminApiUrl = conf.isBrokerClientTlsEnabled() ? webServiceAddressTls : webServiceAddress;
1697+
if (adminApiUrl == null) {
1698+
throw new IllegalArgumentException("Web service address was not set properly "
1699+
+ ", isBrokerClientTlsEnabled: " + conf.isBrokerClientTlsEnabled()
1700+
+ ", webServiceAddressTls: " + webServiceAddressTls
1701+
+ ", webServiceAddress: " + webServiceAddress);
1702+
}
1703+
PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
16951704

1696-
// most of the admin request requires to make zk-call so, keep the max read-timeout based on
1697-
// zk-operation timeout
1698-
builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
1705+
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
1706+
// @Secret on the ClientConfigurationData object because of the way they are serialized.
1707+
// See https://github.com/apache/pulsar/issues/8509 for more information.
1708+
builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
16991709

1700-
this.adminClient = builder.build();
1701-
LOG.info("created admin with url {} ", adminApiUrl);
1702-
} catch (Exception e) {
1703-
throw new PulsarServerException(e);
1710+
builder.authentication(
1711+
conf.getBrokerClientAuthenticationPlugin(),
1712+
conf.getBrokerClientAuthenticationParameters());
1713+
1714+
if (conf.isBrokerClientTlsEnabled()) {
1715+
builder.tlsCiphers(config.getBrokerClientTlsCiphers())
1716+
.tlsProtocols(config.getBrokerClientTlsProtocols());
1717+
if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
1718+
builder.useKeyStoreTls(true).tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
1719+
.tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
1720+
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword())
1721+
.tlsKeyStoreType(conf.getBrokerClientTlsKeyStoreType())
1722+
.tlsKeyStorePath(conf.getBrokerClientTlsKeyStore())
1723+
.tlsKeyStorePassword(conf.getBrokerClientTlsKeyStorePassword());
1724+
} else {
1725+
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath())
1726+
.tlsKeyFilePath(conf.getBrokerClientKeyFilePath())
1727+
.tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath());
17041728
}
1729+
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection())
1730+
.enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled());
17051731
}
17061732

1707-
return this.adminClient;
1733+
// most of the admin request requires to make zk-call so, keep the max read-timeout based on
1734+
// zk-operation timeout
1735+
builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
1736+
return builder;
17081737
}
17091738

17101739
public MetricsGenerator getMetricsGenerator() {
@@ -2063,4 +2092,40 @@ public CompletableFuture<TopicCompactionService> newTopicCompactionService(Strin
20632092
return CompletableFuture.failedFuture(e);
20642093
}
20652094
}
2095+
2096+
/**
2097+
* Run health check for the broker.
2098+
*
2099+
* @return CompletableFuture
2100+
*/
2101+
public CompletableFuture<Void> runHealthCheck(TopicVersion topicVersion, String clientId) {
2102+
if (!isRunning()) {
2103+
return CompletableFuture.failedFuture(new PulsarServerException("Broker is not running"));
2104+
}
2105+
HealthChecker localHealthChecker = getHealthChecker();
2106+
if (localHealthChecker == null) {
2107+
return CompletableFuture.failedFuture(new PulsarServerException("Broker is not running"));
2108+
}
2109+
return localHealthChecker.checkHealth(topicVersion, clientId);
2110+
}
2111+
2112+
@VisibleForTesting
2113+
public HealthChecker getHealthChecker() {
2114+
if (healthChecker == null) {
2115+
synchronized (this) {
2116+
if (healthChecker == null) {
2117+
if (!isRunning()) {
2118+
return null;
2119+
}
2120+
try {
2121+
healthChecker = new HealthChecker(this);
2122+
} catch (PulsarServerException e) {
2123+
LOG.error("Failed to create health checker", e);
2124+
throw new RuntimeException(e);
2125+
}
2126+
}
2127+
}
2128+
}
2129+
return healthChecker;
2130+
}
20662131
}

0 commit comments

Comments
 (0)