Skip to content

Commit d9e5c03

Browse files
committed
[fix][broker] Fix broker shutdown delay by resolving hanging health checks (#24210)
(cherry picked from commit 12961ca)
1 parent 0c333d7 commit d9e5c03

File tree

6 files changed

+531
-258
lines changed

6 files changed

+531
-258
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 168 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
import org.apache.pulsar.broker.resources.PulsarResources;
102102
import org.apache.pulsar.broker.rest.Topics;
103103
import org.apache.pulsar.broker.service.BrokerService;
104+
import org.apache.pulsar.broker.service.HealthChecker;
104105
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
105106
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
106107
import org.apache.pulsar.broker.service.Topic;
@@ -145,6 +146,7 @@
145146
import org.apache.pulsar.common.naming.NamespaceBundle;
146147
import org.apache.pulsar.common.naming.NamespaceName;
147148
import org.apache.pulsar.common.naming.TopicName;
149+
import org.apache.pulsar.common.naming.TopicVersion;
148150
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
149151
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
150152
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
@@ -300,6 +302,7 @@ public enum State {
300302
private volatile CompletableFuture<Void> closeFuture;
301303
// key is listener name, value is pulsar address and pulsar ssl address
302304
private Map<String, AdvertisedListener> advertisedListeners;
305+
private volatile HealthChecker healthChecker;
303306

304307
public PulsarService(ServiceConfiguration config) {
305308
this(config, Optional.empty(), (exitCode) -> LOG.info("Process termination requested with code {}. "
@@ -476,6 +479,11 @@ public CompletableFuture<Void> closeAsync() {
476479
// It only tells the Pulsar clients that this service is not ready to serve for the lookup requests
477480
state = State.Closing;
478481

482+
if (healthChecker != null) {
483+
healthChecker.close();
484+
healthChecker = null;
485+
}
486+
479487
// close the service in reverse order v.s. in which they are started
480488
if (this.resourceUsageTransportManager != null) {
481489
try {
@@ -1609,136 +1617,157 @@ protected synchronized OrderedScheduler getOffloaderReadScheduler(OffloadPolicie
16091617
return this.offloaderReadExecutor;
16101618
}
16111619

1612-
public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
1620+
public PulsarClientImpl createClientImpl(ClientConfigurationData conf) throws PulsarClientException {
1621+
return createClientImpl(conf, null);
1622+
}
1623+
1624+
public PulsarClientImpl createClientImpl(Consumer<PulsarClientImpl.PulsarClientImplBuilder> customizer)
1625+
throws PulsarClientException {
1626+
return createClientImpl(null, customizer);
1627+
}
1628+
1629+
public PulsarClientImpl createClientImpl(ClientConfigurationData conf,
1630+
Consumer<PulsarClientImpl.PulsarClientImplBuilder> customizer)
16131631
throws PulsarClientException {
1614-
return PulsarClientImpl.builder()
1615-
.conf(clientConf)
1632+
PulsarClientImpl.PulsarClientImplBuilder pulsarClientImplBuilder = PulsarClientImpl.builder()
1633+
.conf(conf != null ? conf : createClientConfigurationData())
16161634
.eventLoopGroup(ioEventLoopGroup)
16171635
.timer(brokerClientSharedTimer)
16181636
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
16191637
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
16201638
.scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider)
1621-
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider)
1622-
.build();
1639+
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider);
1640+
if (customizer != null) {
1641+
customizer.accept(pulsarClientImplBuilder);
1642+
}
1643+
return pulsarClientImplBuilder.build();
16231644
}
16241645

16251646
public synchronized PulsarClient getClient() throws PulsarServerException {
16261647
if (this.client == null) {
16271648
try {
1628-
ClientConfigurationData initialConf = new ClientConfigurationData();
1629-
1630-
// Disable memory limit for broker client and disable stats
1631-
initialConf.setMemoryLimitBytes(0);
1632-
initialConf.setStatsIntervalSeconds(0);
1633-
1634-
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
1635-
// @Secret on the ClientConfigurationData object because of the way they are serialized.
1636-
// See https://github.com/apache/pulsar/issues/8509 for more information.
1637-
Map<String, Object> overrides = PropertiesUtils
1638-
.filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_");
1639-
ClientConfigurationData conf =
1640-
ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class);
1641-
1642-
// Disabled auto release useless connections
1643-
// The automatic release connection feature is not yet perfect for transaction scenarios, so turn it
1644-
// off first.
1645-
conf.setConnectionMaxIdleSeconds(-1);
1646-
1647-
boolean tlsEnabled = this.getConfiguration().isBrokerClientTlsEnabled();
1648-
conf.setServiceUrl(tlsEnabled ? this.brokerServiceUrlTls : this.brokerServiceUrl);
1649-
1650-
if (tlsEnabled) {
1651-
conf.setTlsCiphers(this.getConfiguration().getBrokerClientTlsCiphers());
1652-
conf.setTlsProtocols(this.getConfiguration().getBrokerClientTlsProtocols());
1653-
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
1654-
conf.setTlsHostnameVerificationEnable(this.getConfiguration().isTlsHostnameVerificationEnabled());
1655-
if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
1656-
conf.setUseKeyStoreTls(true);
1657-
conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType());
1658-
conf.setTlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore());
1659-
conf.setTlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword());
1660-
conf.setTlsKeyStoreType(this.getConfiguration().getBrokerClientTlsKeyStoreType());
1661-
conf.setTlsKeyStorePath(this.getConfiguration().getBrokerClientTlsKeyStore());
1662-
conf.setTlsKeyStorePassword(this.getConfiguration().getBrokerClientTlsKeyStorePassword());
1663-
} else {
1664-
conf.setTlsTrustCertsFilePath(
1665-
isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
1666-
? this.getConfiguration().getBrokerClientTrustCertsFilePath()
1667-
: this.getConfiguration().getTlsTrustCertsFilePath());
1668-
conf.setTlsKeyFilePath(this.getConfiguration().getBrokerClientKeyFilePath());
1669-
conf.setTlsCertificateFilePath(this.getConfiguration().getBrokerClientCertificateFilePath());
1670-
}
1671-
}
1672-
1673-
if (isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) {
1674-
conf.setAuthPluginClassName(this.getConfiguration().getBrokerClientAuthenticationPlugin());
1675-
conf.setAuthParams(this.getConfiguration().getBrokerClientAuthenticationParameters());
1676-
conf.setAuthParamMap(null);
1677-
conf.setAuthentication(AuthenticationFactory.create(
1678-
this.getConfiguration().getBrokerClientAuthenticationPlugin(),
1679-
this.getConfiguration().getBrokerClientAuthenticationParameters()));
1680-
}
1681-
this.client = createClientImpl(conf);
1649+
this.client = createClientImpl(null, null);
16821650
} catch (Exception e) {
16831651
throw new PulsarServerException(e);
16841652
}
16851653
}
16861654
return this.client;
16871655
}
16881656

1657+
protected ClientConfigurationData createClientConfigurationData()
1658+
throws PulsarClientException.UnsupportedAuthenticationException {
1659+
ClientConfigurationData initialConf = new ClientConfigurationData();
1660+
1661+
// Disable memory limit for broker client and disable stats
1662+
initialConf.setMemoryLimitBytes(0);
1663+
initialConf.setStatsIntervalSeconds(0);
1664+
1665+
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
1666+
// @Secret on the ClientConfigurationData object because of the way they are serialized.
1667+
// See https://github.com/apache/pulsar/issues/8509 for more information.
1668+
Map<String, Object> overrides = PropertiesUtils
1669+
.filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_");
1670+
ClientConfigurationData conf =
1671+
ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class);
1672+
1673+
// Disabled auto release useless connections
1674+
// The automatic release connection feature is not yet perfect for transaction scenarios, so turn it
1675+
// off first.
1676+
conf.setConnectionMaxIdleSeconds(-1);
1677+
1678+
boolean tlsEnabled = this.getConfiguration().isBrokerClientTlsEnabled();
1679+
conf.setServiceUrl(tlsEnabled ? this.brokerServiceUrlTls : this.brokerServiceUrl);
1680+
1681+
if (tlsEnabled) {
1682+
conf.setTlsCiphers(this.getConfiguration().getBrokerClientTlsCiphers());
1683+
conf.setTlsProtocols(this.getConfiguration().getBrokerClientTlsProtocols());
1684+
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
1685+
conf.setTlsHostnameVerificationEnable(this.getConfiguration().isTlsHostnameVerificationEnabled());
1686+
if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
1687+
conf.setUseKeyStoreTls(true);
1688+
conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType());
1689+
conf.setTlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore());
1690+
conf.setTlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword());
1691+
conf.setTlsKeyStoreType(this.getConfiguration().getBrokerClientTlsKeyStoreType());
1692+
conf.setTlsKeyStorePath(this.getConfiguration().getBrokerClientTlsKeyStore());
1693+
conf.setTlsKeyStorePassword(this.getConfiguration().getBrokerClientTlsKeyStorePassword());
1694+
} else {
1695+
conf.setTlsTrustCertsFilePath(
1696+
isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
1697+
? this.getConfiguration().getBrokerClientTrustCertsFilePath()
1698+
: this.getConfiguration().getTlsTrustCertsFilePath());
1699+
conf.setTlsKeyFilePath(this.getConfiguration().getBrokerClientKeyFilePath());
1700+
conf.setTlsCertificateFilePath(this.getConfiguration().getBrokerClientCertificateFilePath());
1701+
}
1702+
}
1703+
1704+
if (isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) {
1705+
conf.setAuthPluginClassName(this.getConfiguration().getBrokerClientAuthenticationPlugin());
1706+
conf.setAuthParams(this.getConfiguration().getBrokerClientAuthenticationParameters());
1707+
conf.setAuthParamMap(null);
1708+
conf.setAuthentication(AuthenticationFactory.create(
1709+
this.getConfiguration().getBrokerClientAuthenticationPlugin(),
1710+
this.getConfiguration().getBrokerClientAuthenticationParameters()));
1711+
}
1712+
return conf;
1713+
}
1714+
16891715
public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
16901716
if (this.adminClient == null) {
16911717
try {
1692-
ServiceConfiguration conf = this.getConfiguration();
1693-
final String adminApiUrl = conf.isBrokerClientTlsEnabled() ? webServiceAddressTls : webServiceAddress;
1694-
if (adminApiUrl == null) {
1695-
throw new IllegalArgumentException("Web service address was not set properly "
1696-
+ ", isBrokerClientTlsEnabled: " + conf.isBrokerClientTlsEnabled()
1697-
+ ", webServiceAddressTls: " + webServiceAddressTls
1698-
+ ", webServiceAddress: " + webServiceAddress);
1699-
}
1700-
PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
1701-
1702-
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
1703-
// @Secret on the ClientConfigurationData object because of the way they are serialized.
1704-
// See https://github.com/apache/pulsar/issues/8509 for more information.
1705-
builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
1718+
this.adminClient = getCreateAdminClientBuilder().build();
1719+
LOG.info("created admin with url {} ", adminClient.getServiceUrl());
1720+
} catch (Exception e) {
1721+
throw new PulsarServerException(e);
1722+
}
1723+
}
1724+
return this.adminClient;
1725+
}
17061726

1707-
builder.authentication(
1708-
conf.getBrokerClientAuthenticationPlugin(),
1709-
conf.getBrokerClientAuthenticationParameters());
1710-
1711-
if (conf.isBrokerClientTlsEnabled()) {
1712-
builder.tlsCiphers(config.getBrokerClientTlsCiphers())
1713-
.tlsProtocols(config.getBrokerClientTlsProtocols());
1714-
if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
1715-
builder.useKeyStoreTls(true).tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
1716-
.tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
1717-
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword())
1718-
.tlsKeyStoreType(conf.getBrokerClientTlsKeyStoreType())
1719-
.tlsKeyStorePath(conf.getBrokerClientTlsKeyStore())
1720-
.tlsKeyStorePassword(conf.getBrokerClientTlsKeyStorePassword());
1721-
} else {
1722-
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath())
1723-
.tlsKeyFilePath(conf.getBrokerClientKeyFilePath())
1724-
.tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath());
1725-
}
1726-
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection())
1727-
.enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled());
1728-
}
1727+
protected PulsarAdminBuilder getCreateAdminClientBuilder()
1728+
throws PulsarClientException.UnsupportedAuthenticationException {
1729+
ServiceConfiguration conf = this.getConfiguration();
1730+
final String adminApiUrl = conf.isBrokerClientTlsEnabled() ? webServiceAddressTls : webServiceAddress;
1731+
if (adminApiUrl == null) {
1732+
throw new IllegalArgumentException("Web service address was not set properly "
1733+
+ ", isBrokerClientTlsEnabled: " + conf.isBrokerClientTlsEnabled()
1734+
+ ", webServiceAddressTls: " + webServiceAddressTls
1735+
+ ", webServiceAddress: " + webServiceAddress);
1736+
}
1737+
PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
17291738

1730-
// most of the admin request requires to make zk-call so, keep the max read-timeout based on
1731-
// zk-operation timeout
1732-
builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
1739+
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
1740+
// @Secret on the ClientConfigurationData object because of the way they are serialized.
1741+
// See https://github.com/apache/pulsar/issues/8509 for more information.
1742+
builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
17331743

1734-
this.adminClient = builder.build();
1735-
LOG.info("created admin with url {} ", adminApiUrl);
1736-
} catch (Exception e) {
1737-
throw new PulsarServerException(e);
1744+
builder.authentication(
1745+
conf.getBrokerClientAuthenticationPlugin(),
1746+
conf.getBrokerClientAuthenticationParameters());
1747+
1748+
if (conf.isBrokerClientTlsEnabled()) {
1749+
builder.tlsCiphers(config.getBrokerClientTlsCiphers())
1750+
.tlsProtocols(config.getBrokerClientTlsProtocols());
1751+
if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
1752+
builder.useKeyStoreTls(true).tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
1753+
.tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
1754+
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword())
1755+
.tlsKeyStoreType(conf.getBrokerClientTlsKeyStoreType())
1756+
.tlsKeyStorePath(conf.getBrokerClientTlsKeyStore())
1757+
.tlsKeyStorePassword(conf.getBrokerClientTlsKeyStorePassword());
1758+
} else {
1759+
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath())
1760+
.tlsKeyFilePath(conf.getBrokerClientKeyFilePath())
1761+
.tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath());
17381762
}
1763+
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection())
1764+
.enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled());
17391765
}
17401766

1741-
return this.adminClient;
1767+
// most of the admin request requires to make zk-call so, keep the max read-timeout based on
1768+
// zk-operation timeout
1769+
builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
1770+
return builder;
17421771
}
17431772

17441773
/**
@@ -2069,4 +2098,40 @@ public void initConfigMetadataSynchronizerIfNeeded() {
20692098
mutex.unlock();
20702099
}
20712100
}
2101+
2102+
/**
2103+
* Run health check for the broker.
2104+
*
2105+
* @return CompletableFuture
2106+
*/
2107+
public CompletableFuture<Void> runHealthCheck(TopicVersion topicVersion, String clientId) {
2108+
if (!isRunning()) {
2109+
return CompletableFuture.failedFuture(new PulsarServerException("Broker is not running"));
2110+
}
2111+
HealthChecker localHealthChecker = getHealthChecker();
2112+
if (localHealthChecker == null) {
2113+
return CompletableFuture.failedFuture(new PulsarServerException("Broker is not running"));
2114+
}
2115+
return localHealthChecker.checkHealth(topicVersion, clientId);
2116+
}
2117+
2118+
@VisibleForTesting
2119+
public HealthChecker getHealthChecker() {
2120+
if (healthChecker == null) {
2121+
synchronized (this) {
2122+
if (healthChecker == null) {
2123+
if (!isRunning()) {
2124+
return null;
2125+
}
2126+
try {
2127+
healthChecker = new HealthChecker(this);
2128+
} catch (PulsarServerException e) {
2129+
LOG.error("Failed to create health checker", e);
2130+
throw new RuntimeException(e);
2131+
}
2132+
}
2133+
}
2134+
}
2135+
return healthChecker;
2136+
}
20722137
}

0 commit comments

Comments
 (0)