Skip to content

Commit f8a697d

Browse files
author
Masahiro Sakamoto
authored
Prevent redirection of lookup requests from looping (#7200)
1 parent 6bdc880 commit f8a697d

File tree

4 files changed

+30
-17
lines changed

4 files changed

+30
-17
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,28 +34,35 @@ public enum Type {
3434

3535
private final Type type;
3636
private final LookupData lookupData;
37+
private final boolean authoritativeRedirect;
3738

3839
public LookupResult(NamespaceEphemeralData namespaceEphemeralData) {
3940
this.type = Type.BrokerUrl;
41+
this.authoritativeRedirect = false;
4042
this.lookupData = new LookupData(namespaceEphemeralData.getNativeUrl(),
4143
namespaceEphemeralData.getNativeUrlTls(), namespaceEphemeralData.getHttpUrl(),
4244
namespaceEphemeralData.getHttpUrlTls());
4345
}
4446

45-
public LookupResult(String httpUrl, String httpUrlTls, String brokerServiceUrl, String brokerServiceUrlTls) {
47+
public LookupResult(String httpUrl, String httpUrlTls, String brokerServiceUrl, String brokerServiceUrlTls,
48+
boolean authoritativeRedirect) {
4649
this.type = Type.RedirectUrl; // type = redirect => as current broker is
4750
// not owner and prepares LookupResult
4851
// with other broker's urls
52+
this.authoritativeRedirect = authoritativeRedirect;
4953
this.lookupData = new LookupData(brokerServiceUrl, brokerServiceUrlTls, httpUrl, httpUrlTls);
5054
}
5155

52-
public LookupResult(String httpUrl, String httpUrlTls, String nativeUrl, String nativeUrlTls, Type type) {
56+
public LookupResult(String httpUrl, String httpUrlTls, String nativeUrl, String nativeUrlTls, Type type,
57+
boolean authoritativeRedirect) {
5358
this.type = type;
59+
this.authoritativeRedirect = authoritativeRedirect;
5460
this.lookupData = new LookupData(nativeUrl, nativeUrlTls, httpUrl, httpUrlTls);
5561
}
5662

5763
public LookupResult(NamespaceEphemeralData namespaceEphemeralData, String nativeUrl, String nativeUrlTls) {
5864
this.type = Type.BrokerUrl;
65+
this.authoritativeRedirect = false;
5966
this.lookupData = new LookupData(nativeUrl, nativeUrlTls, namespaceEphemeralData.getHttpUrl(),
6067
namespaceEphemeralData.getHttpUrlTls());
6168
}
@@ -68,6 +75,10 @@ public boolean isRedirect() {
6875
return type == Type.RedirectUrl;
6976
}
7077

78+
public boolean isAuthoritativeRedirect() {
79+
return authoritativeRedirect;
80+
}
81+
7182
public LookupData getLookupData() {
7283
return lookupData;
7384
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ protected void internalLookupTopicAsync(TopicName topicName, boolean authoritati
9292
LookupResult result = optionalResult.get();
9393
// We have found either a broker that owns the topic, or a broker to which we should redirect the client to
9494
if (result.isRedirect()) {
95-
boolean newAuthoritative = this.isLeaderBroker();
95+
boolean newAuthoritative = result.isAuthoritativeRedirect();
9696
URI redirect;
9797
try {
9898
String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
@@ -266,7 +266,7 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
266266

267267
LookupData lookupData = lookupResult.get().getLookupData();
268268
if (lookupResult.get().isRedirect()) {
269-
boolean newAuthoritative = isLeaderBroker(pulsarService);
269+
boolean newAuthoritative = lookupResult.get().isAuthoritativeRedirect();
270270
lookupfuture.complete(
271271
newLookupResponse(lookupData.getBrokerUrl(), lookupData.getBrokerUrlTls(),
272272
newAuthoritative, LookupType.Redirect, requestId, false));

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,8 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
426426
CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative,
427427
final String advertisedListenerName) {
428428
String candidateBroker = null;
429+
boolean authoritativeRedirect = pulsar.getLeaderElectionService().isLeader();
430+
429431
try {
430432
// check if this is Heartbeat or SLAMonitor namespace
431433
candidateBroker = checkHeartbeatNamespace(bundle);
@@ -438,7 +440,10 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
438440
}
439441

440442
if (candidateBroker == null) {
441-
if (!this.loadManager.get().isCentralized()
443+
if (authoritative) {
444+
// leader broker already assigned the current broker as owner
445+
candidateBroker = pulsar.getSafeWebServiceAddress();
446+
} else if (!this.loadManager.get().isCentralized()
442447
|| pulsar.getLeaderElectionService().isLeader()
443448

444449
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
@@ -450,14 +455,10 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
450455
return;
451456
}
452457
candidateBroker = availableBroker.get();
458+
authoritativeRedirect = true;
453459
} else {
454-
if (authoritative) {
455-
// leader broker already assigned the current broker as owner
456-
candidateBroker = pulsar.getSafeWebServiceAddress();
457-
} else {
458-
// forward to leader broker to make assignment
459-
candidateBroker = pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl();
460-
}
460+
// forward to leader broker to make assignment
461+
candidateBroker = pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl();
461462
}
462463
}
463464
} catch (Exception e) {
@@ -519,7 +520,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
519520
}
520521

521522
// Now setting the redirect url
522-
createLookupResult(candidateBroker)
523+
createLookupResult(candidateBroker, authoritativeRedirect)
523524
.thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))
524525
.exceptionally(ex -> {
525526
lookupFuture.completeExceptionally(ex);
@@ -533,7 +534,8 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
533534
}
534535
}
535536

536-
protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker) throws Exception {
537+
protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect)
538+
throws Exception {
537539

538540
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
539541
try {
@@ -546,7 +548,7 @@ protected CompletableFuture<LookupResult> createLookupResult(String candidateBro
546548
ServiceLookupData lookupData = reportData.get();
547549
lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
548550
lookupData.getWebServiceUrlTls(), lookupData.getPulsarServiceUrl(),
549-
lookupData.getPulsarServiceUrlTls()));
551+
lookupData.getPulsarServiceUrlTls(), authoritativeRedirect));
550552
} else {
551553
lookupFuture.completeExceptionally(new KeeperException.NoNodeException(path));
552554
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,13 +358,13 @@ public void testLoadReportDeserialize() throws Exception {
358358
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path2,
359359
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld), ZooDefs.Ids.OPEN_ACL_UNSAFE,
360360
CreateMode.EPHEMERAL);
361-
LookupResult result1 = pulsar.getNamespaceService().createLookupResult(candidateBroker1).get();
361+
LookupResult result1 = pulsar.getNamespaceService().createLookupResult(candidateBroker1, false).get();
362362

363363
// update to new load manager
364364
LoadManager oldLoadManager = pulsar.getLoadManager()
365365
.getAndSet(new ModularLoadManagerWrapper(new ModularLoadManagerImpl()));
366366
oldLoadManager.stop();
367-
LookupResult result2 = pulsar.getNamespaceService().createLookupResult(candidateBroker2).get();
367+
LookupResult result2 = pulsar.getNamespaceService().createLookupResult(candidateBroker2, false).get();
368368
Assert.assertEquals(result1.getLookupData().getBrokerUrl(), candidateBroker1);
369369
Assert.assertEquals(result2.getLookupData().getBrokerUrl(), candidateBroker2);
370370
System.out.println(result2);

0 commit comments

Comments
 (0)