Skip to content

Commit cfbf898

Browse files
merlimatwolfstudy
authored andcommitted
Fix topic getting recreated immediately after deletion (#7524)
(cherry picked from commit dba0b65)
1 parent 8b4a921 commit cfbf898

File tree

6 files changed

+130
-75
lines changed

6 files changed

+130
-75
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.commons.lang3.StringUtils;
3939
import org.apache.pulsar.broker.PulsarService;
4040
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
41+
import org.apache.pulsar.broker.namespace.LookupOptions;
4142
import org.apache.pulsar.broker.web.PulsarWebResource;
4243
import org.apache.pulsar.broker.web.RestException;
4344
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
@@ -79,7 +80,7 @@ protected void internalLookupTopicAsync(TopicName topicName, boolean authoritati
7980
}
8081

8182
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
82-
.getBrokerServiceUrlAsync(topicName, authoritative);
83+
.getBrokerServiceUrlAsync(topicName, LookupOptions.builder().authoritative(authoritative).loadTopicsInBundle(false).build());
8384

8485
lookupFuture.thenAccept(optionalResult -> {
8586
if (optionalResult == null || !optionalResult.isPresent()) {
@@ -251,7 +252,12 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
251252
if (validationFailureResponse != null) {
252253
lookupfuture.complete(validationFailureResponse);
253254
} else {
254-
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, authoritative, advertisedListenerName)
255+
LookupOptions options = LookupOptions.builder()
256+
.authoritative(authoritative)
257+
.advertisedListenerName(advertisedListenerName)
258+
.loadTopicsInBundle(true)
259+
.build();
260+
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)
255261
.thenAccept(lookupResult -> {
256262

257263
if (log.isDebugEnabled()) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.namespace;
20+
21+
import lombok.Builder;
22+
import lombok.Data;
23+
24+
import org.apache.commons.lang3.StringUtils;
25+
26+
@Data
27+
@Builder
28+
public class LookupOptions {
29+
/**
30+
* If authoritative, it means the lookup had already been redirected here by a different broker
31+
*/
32+
private final boolean authoritative;
33+
34+
/**
35+
* If read-only, do not attempt to acquire ownership
36+
*/
37+
private final boolean readOnly;
38+
39+
/**
40+
* After acquiring the ownership, load all the topics
41+
*/
42+
private final boolean loadTopicsInBundle;
43+
44+
/**
45+
* The lookup request was made through HTTPs
46+
*/
47+
private final boolean requestHttps;
48+
49+
private final String advertisedListenerName;
50+
51+
public boolean hasAdvertisedListenerName() {
52+
return StringUtils.isNotBlank(advertisedListenerName);
53+
}
54+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 29 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -173,15 +173,9 @@ public void initialize() {
173173
}
174174
}
175175

176-
public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic,
177-
boolean authoritative) {
178-
return getBrokerServiceUrlAsync(topic, authoritative, null);
179-
}
180-
181-
public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, boolean authoritative,
182-
final String advertisedListenerName) {
176+
public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
183177
return getBundleAsync(topic)
184-
.thenCompose(bundle -> findBrokerServiceUrl(bundle, authoritative, false /* read-only */, advertisedListenerName));
178+
.thenCompose(bundle -> findBrokerServiceUrl(bundle, options));
185179
}
186180

187181
public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
@@ -211,38 +205,36 @@ private NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
211205
*
212206
* If the service unit is not owned, return an empty optional
213207
*/
214-
public Optional<URL> getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps,
215-
boolean readOnly) throws Exception {
208+
public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions options) throws Exception {
216209
if (suName instanceof TopicName) {
217210
TopicName name = (TopicName) suName;
218211
if (LOG.isDebugEnabled()) {
219-
LOG.debug("Getting web service URL of topic: {} - auth: {}", name, authoritative);
212+
LOG.debug("Getting web service URL of topic: {} - options: {}", name, options);
220213
}
221-
return this.internalGetWebServiceUrl(getBundle(name), authoritative, isRequestHttps, readOnly)
214+
return this.internalGetWebServiceUrl(getBundle(name), options)
222215
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
223216
}
224217

225218
if (suName instanceof NamespaceName) {
226-
return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) suName), authoritative, isRequestHttps,
227-
readOnly).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
219+
return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) suName), options)
220+
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
228221
}
229222

230223
if (suName instanceof NamespaceBundle) {
231-
return this.internalGetWebServiceUrl((NamespaceBundle) suName, authoritative, isRequestHttps, readOnly)
224+
return this.internalGetWebServiceUrl((NamespaceBundle) suName, options)
232225
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
233226
}
234227

235228
throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName());
236229
}
237230

238-
private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(NamespaceBundle bundle, boolean authoritative,
239-
boolean isRequestHttps, boolean readOnly) {
231+
private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) {
240232

241-
return findBrokerServiceUrl(bundle, authoritative, readOnly).thenApply(lookupResult -> {
233+
return findBrokerServiceUrl(bundle, options).thenApply(lookupResult -> {
242234
if (lookupResult.isPresent()) {
243235
try {
244236
LookupData lookupData = lookupResult.get().getLookupData();
245-
final String redirectUrl = isRequestHttps ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
237+
final String redirectUrl = options.isRequestHttps() ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
246238
return Optional.of(new URL(redirectUrl));
247239
} catch (Exception e) {
248240
// just log the exception, nothing else to do
@@ -329,19 +321,6 @@ public boolean registerNamespace(String namespace, boolean ensureOwned) throws P
329321
private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> findingBundlesNotAuthoritative
330322
= new ConcurrentOpenHashMap<>();
331323

332-
/**
333-
* Main internal method to lookup and setup ownership of service unit to a broker.
334-
*
335-
* @param bundle
336-
* @param authoritative
337-
* @param readOnly
338-
* @return
339-
*/
340-
private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
341-
boolean readOnly) {
342-
return findBrokerServiceUrl(bundle, authoritative, readOnly, null);
343-
}
344-
345324
/**
346325
* Main internal method to lookup and setup ownership of service unit to a broker
347326
*
@@ -352,14 +331,13 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
352331
* @return
353332
* @throws PulsarServerException
354333
*/
355-
private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
356-
boolean readOnly, final String advertisedListenerName) {
334+
private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, LookupOptions options) {
357335
if (LOG.isDebugEnabled()) {
358-
LOG.debug("findBrokerServiceUrl: {} - read-only: {}", bundle, readOnly);
336+
LOG.debug("findBrokerServiceUrl: {} - options: {}", bundle, options);
359337
}
360338

361339
ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> targetMap;
362-
if (authoritative) {
340+
if (options.isAuthoritative()) {
363341
targetMap = findingBundlesAuthoritative;
364342
} else {
365343
targetMap = findingBundlesNotAuthoritative;
@@ -373,13 +351,13 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
373351
if (!nsData.isPresent()) {
374352
// No one owns this bundle
375353

376-
if (readOnly) {
354+
if (options.isReadOnly()) {
377355
// Do not attempt to acquire ownership
378356
future.complete(Optional.empty());
379357
} else {
380358
// Now, no one owns the namespace yet. Hence, we will try to dynamically assign it
381359
pulsar.getExecutor().execute(() -> {
382-
searchForCandidateBroker(bundle, future, authoritative, advertisedListenerName);
360+
searchForCandidateBroker(bundle, future, options);
383361
});
384362
}
385363
} else if (nsData.get().isDisabled()) {
@@ -390,11 +368,11 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
390368
LOG.debug("Namespace bundle {} already owned by {} ", bundle, nsData);
391369
}
392370
// find the target
393-
if (StringUtils.isNotBlank(advertisedListenerName)) {
394-
AdvertisedListener listener = nsData.get().getAdvertisedListeners().get(advertisedListenerName);
371+
if (options.hasAdvertisedListenerName()) {
372+
AdvertisedListener listener = nsData.get().getAdvertisedListeners().get(options.getAdvertisedListenerName());
395373
if (listener == null) {
396374
future.completeExceptionally(
397-
new PulsarServerException("the broker do not have " + advertisedListenerName + " listener"));
375+
new PulsarServerException("the broker do not have " + options.getAdvertisedListenerName() + " listener"));
398376
} else {
399377
URI urlTls = listener.getBrokerServiceUrlTls();
400378
future.complete(Optional.of(new LookupResult(nsData.get(),
@@ -420,13 +398,8 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
420398
}
421399

422400
private void searchForCandidateBroker(NamespaceBundle bundle,
423-
CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative) {
424-
searchForCandidateBroker(bundle, lookupFuture, authoritative, null);
425-
}
426-
427-
private void searchForCandidateBroker(NamespaceBundle bundle,
428-
CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative,
429-
final String advertisedListenerName) {
401+
CompletableFuture<Optional<LookupResult>> lookupFuture,
402+
LookupOptions options) {
430403
String candidateBroker = null;
431404
boolean authoritativeRedirect = pulsar.getLeaderElectionService().isLeader();
432405

@@ -442,7 +415,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
442415
}
443416

444417
if (candidateBroker == null) {
445-
if (authoritative) {
418+
if (options.isAuthoritative()) {
446419
// leader broker already assigned the current broker as owner
447420
candidateBroker = pulsar.getSafeWebServiceAddress();
448421
} else if (!this.loadManager.get().isCentralized()
@@ -488,14 +461,16 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
488461
} else {
489462
// Found owner for the namespace bundle
490463

491-
// Schedule the task to pre-load topics
492-
pulsar.loadNamespaceTopics(bundle);
464+
if (options.isLoadTopicsInBundle()) {
465+
// Schedule the task to pre-load topics
466+
pulsar.loadNamespaceTopics(bundle);
467+
}
493468
// find the target
494-
if (StringUtils.isNotBlank(advertisedListenerName)) {
495-
AdvertisedListener listener = ownerInfo.getAdvertisedListeners().get(advertisedListenerName);
469+
if (options.hasAdvertisedListenerName()) {
470+
AdvertisedListener listener = ownerInfo.getAdvertisedListeners().get(options.getAdvertisedListenerName());
496471
if (listener == null) {
497472
lookupFuture.completeExceptionally(
498-
new PulsarServerException("the broker do not have " + advertisedListenerName + " listener"));
473+
new PulsarServerException("the broker do not have " + options.getAdvertisedListenerName() + " listener"));
499474
return;
500475
} else {
501476
URI urlTls = listener.getBrokerServiceUrlTls();

pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
5454
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
5555
import org.apache.pulsar.broker.authorization.AuthorizationService;
56+
import org.apache.pulsar.broker.namespace.LookupOptions;
5657
import org.apache.pulsar.broker.namespace.NamespaceService;
5758
import org.apache.pulsar.common.naming.Constants;
5859
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -488,8 +489,14 @@ protected boolean isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundl
488489
String bundleRange) {
489490
NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
490491
NamespaceService nsService = pulsar().getNamespaceService();
492+
493+
LookupOptions options = LookupOptions.builder()
494+
.authoritative(false)
495+
.requestHttps(isRequestHttps())
496+
.readOnly(true)
497+
.loadTopicsInBundle(false).build();
491498
try {
492-
return nsService.getWebServiceUrl(nsBundle, /*authoritative */ false, isRequestHttps(), /* read-only */ true).isPresent();
499+
return nsService.getWebServiceUrl(nsBundle, options).isPresent();
493500
} catch (Exception e) {
494501
log.error("[{}] Failed to check whether namespace bundle is owned {}/{}", clientAppId(), fqnn.toString(), bundleRange, e);
495502
throw new RestException(e);
@@ -525,7 +532,12 @@ public void validateBundleOwnership(NamespaceBundle bundle, boolean authoritativ
525532
// - If authoritative is false and this broker is not leader, forward to leader
526533
// - If authoritative is false and this broker is leader, determine owner and forward w/ authoritative=true
527534
// - If authoritative is true, own the namespace and continue
528-
Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, authoritative, isRequestHttps(), readOnly);
535+
LookupOptions options = LookupOptions.builder()
536+
.authoritative(authoritative)
537+
.requestHttps(isRequestHttps())
538+
.readOnly(readOnly)
539+
.loadTopicsInBundle(false).build();
540+
Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, options);
529541
// Ensure we get a url
530542
if (webUrl == null || !webUrl.isPresent()) {
531543
log.warn("Unable to get web service url");
@@ -581,7 +593,12 @@ protected void validateTopicOwnership(TopicName topicName, boolean authoritative
581593

582594
try {
583595
// per function name, this is trying to acquire the whole namespace ownership
584-
Optional<URL> webUrl = nsService.getWebServiceUrl(topicName, authoritative, isRequestHttps(), false);
596+
LookupOptions options = LookupOptions.builder()
597+
.authoritative(authoritative)
598+
.requestHttps(isRequestHttps())
599+
.readOnly(false)
600+
.loadTopicsInBundle(false).build();
601+
Optional<URL> webUrl = nsService.getWebServiceUrl(topicName, options);
585602
// Ensure we get a url
586603
if (webUrl == null || !webUrl.isPresent()) {
587604
log.info("Unable to get web service url");

0 commit comments

Comments
 (0)