Skip to content

Commit 61c0b04

Browse files
committed
Unload bundle: close topic forcefully and enable bundle on ownership removal failure
1 parent 12f896e commit 61c0b04

File tree

6 files changed

+111
-49
lines changed

6 files changed

+111
-49
lines changed

pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -677,15 +677,15 @@ private boolean isDestinationOwned(DestinationName fqdn) throws Exception {
677677
}
678678

679679
public void removeOwnedServiceUnit(NamespaceName nsName) throws Exception {
680-
ownershipCache.removeOwnership(getFullBundle(nsName));
680+
ownershipCache.removeOwnership(getFullBundle(nsName)).get();
681681
}
682682

683683
public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
684-
ownershipCache.removeOwnership(nsBundle);
684+
ownershipCache.removeOwnership(nsBundle).get();
685685
}
686686

687687
public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData) throws Exception {
688-
ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData));
688+
ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get();
689689
}
690690

691691
public NamespaceBundleFactory getNamespaceBundleFactory() {

pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedBundle.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,26 @@ public void handleUnloadRequest(PulsarService pulsar) throws Exception {
104104
int unloadedTopics = 0;
105105
try {
106106
LOG.info("Disabling ownership: {}", this.bundle);
107-
pulsar.getNamespaceService().getOwnershipCache().disableOwnership(this.bundle);
108-
109-
// Handle unload of persistent topics
110-
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle).get();
111-
pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle);
107+
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.bundle, false);
108+
109+
// close topics forcefully
110+
try {
111+
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle).get();
112+
} catch (Exception e) {
113+
// ignore topic-close failure to unload bundle
114+
LOG.error("Failed to close topics under namespace {}", bundle.toString(), e);
115+
}
116+
// delete ownership node on zk
117+
try {
118+
pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle).get();
119+
} catch (Exception e) {
120+
// Failed to remove ownership node: enable namespace-bundle again so, it can serve new topics
121+
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.bundle, true);
122+
throw new RuntimeException(String.format("Failed to delete ownership node %s", bundle.toString()),
123+
e.getCause());
124+
}
112125
} catch (Exception e) {
113-
LOG.error(String.format("failed to unload a namespace. ns=%s", bundle.toString()), e);
126+
LOG.error("Failed to unload a namespace {}", bundle.toString(), e);
114127
throw new RuntimeException(e);
115128
}
116129

pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static com.google.common.base.Preconditions.checkState;
1919

20+
import java.util.List;
2021
import java.util.Map;
2122
import java.util.Optional;
2223
import java.util.concurrent.CompletableFuture;
@@ -37,6 +38,7 @@
3738
import com.github.benmanes.caffeine.cache.Caffeine;
3839
import com.github.benmanes.caffeine.cache.RemovalCause;
3940
import com.github.benmanes.caffeine.cache.RemovalListener;
41+
import com.google.common.collect.Lists;
4042
import com.google.common.util.concurrent.MoreExecutors;
4143
import com.yahoo.pulsar.broker.PulsarService;
4244
import com.yahoo.pulsar.client.util.FutureUtil;
@@ -141,26 +143,6 @@ public CompletableFuture<OwnedBundle> asyncLoad(String namespaceBundleZNode, Exe
141143
}
142144
}
143145

144-
private class OwnedServiceUnitCacheRemovalListener implements RemovalListener<String, OwnedBundle> {
145-
146-
@Override
147-
public void onRemoval(String key, OwnedBundle value, RemovalCause cause) {
148-
LOG.info("Removing ownership for {}", key);
149-
// Under the cache sync lock, removing the ZNode
150-
// If succeeded, we guaranteed that the cache entry is removed together w/ ZNode
151-
152-
localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> {
153-
if (rc == KeeperException.Code.OK.intValue()) {
154-
LOG.info("Removed zk lock for service unit: {}", key);
155-
} else {
156-
LOG.warn("Failed to delete the namespace ephemeral node. key={}", key,
157-
KeeperException.Code.get(rc));
158-
}
159-
}, null);
160-
ownershipReadOnlyCache.invalidate(key);
161-
}
162-
}
163-
164146
/**
165147
* Constructor of <code>OwnershipCache</code>
166148
*
@@ -179,7 +161,6 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory
179161
this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
180162
// ownedBundlesCache contains all namespaces that are owned by the local broker
181163
this.ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.sameThreadExecutor())
182-
.removalListener(new OwnedServiceUnitCacheRemovalListener())
183164
.buildAsync(new OwnedServiceUnitCacheLoader());
184165
}
185166

@@ -268,8 +249,22 @@ public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(Namespace
268249
* Method to remove the ownership of local broker on the <code>NamespaceBundle</code>, if owned
269250
*
270251
*/
271-
public void removeOwnership(NamespaceBundle bundle) {
272-
ownedBundlesCache.synchronous().invalidate(ServiceUnitZkUtils.path(bundle));
252+
public CompletableFuture<Void> removeOwnership(NamespaceBundle bundle) {
253+
CompletableFuture<Void> result = new CompletableFuture<>();
254+
String key = ServiceUnitZkUtils.path(bundle);
255+
localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> {
256+
if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NONODE.intValue()) {
257+
LOG.info("[{}] Removed zk lock for service unit: {}", key, KeeperException.Code.get(rc));
258+
ownedBundlesCache.synchronous().invalidate(key);
259+
ownershipReadOnlyCache.invalidate(key);
260+
result.complete(null);
261+
} else {
262+
LOG.warn("[{}] Failed to delete the namespace ephemeral node. key={}", key,
263+
KeeperException.Code.get(rc));
264+
result.completeExceptionally(KeeperException.create(rc));
265+
}
266+
}, null);
267+
return result;
273268
}
274269

275270
/**
@@ -278,22 +273,18 @@ public void removeOwnership(NamespaceBundle bundle) {
278273
* @param bundles
279274
* <code>NamespaceBundles</code> to remove from ownership cache
280275
*/
281-
public void removeOwnership(NamespaceBundles bundles) {
282-
boolean hasError = false;
276+
public CompletableFuture<Void> removeOwnership(NamespaceBundles bundles) {
277+
List<CompletableFuture<Void>> allFutures = Lists.newArrayList();
283278
for (NamespaceBundle bundle : bundles.getBundles()) {
284279
if (getOwnedBundle(bundle) == null) {
285280
// continue
286281
continue;
287282
}
288-
try {
289-
this.removeOwnership(bundle);
290-
} catch (Exception e) {
291-
LOG.warn(String.format("Failed to remove ownership of a service unit: %s", bundle), e);
292-
hasError = true;
293-
}
283+
allFutures.add(this.removeOwnership(bundle));
294284
}
295-
checkState(!hasError, "Not able to remove all owned bundles");
285+
return FutureUtil.waitForAll(allFutures);
296286
}
287+
297288

298289
/**
299290
* Method to access the map of all <code>ServiceUnit</code> objects owned by the local broker
@@ -330,17 +321,32 @@ public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
330321
}
331322
}
332323

324+
/**
325+
* Disable bundle in local cache and on zk
326+
*
327+
* @param bundle
328+
* @throws Exception
329+
*/
333330
public void disableOwnership(NamespaceBundle bundle) throws Exception {
334331
String path = ServiceUnitZkUtils.path(bundle);
335-
332+
updateBundleState(bundle, false);
333+
localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfoDisabled), -1);
334+
ownershipReadOnlyCache.invalidate(path);
335+
}
336+
337+
/**
338+
* Update bundle state in a local cache
339+
*
340+
* @param bundle
341+
* @throws Exception
342+
*/
343+
public void updateBundleState(NamespaceBundle bundle, boolean isActive) throws Exception {
344+
String path = ServiceUnitZkUtils.path(bundle);
336345
// Disable owned instance in local cache
337346
CompletableFuture<OwnedBundle> f = ownedBundlesCache.getIfPresent(path);
338347
if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
339-
f.join().setActive(false);
348+
f.join().setActive(isActive);
340349
}
341-
342-
localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfoDisabled), -1);
343-
ownershipReadOnlyCache.invalidate(path);
344350
}
345351

346352
public NamespaceEphemeralData getSelfOwnerInfo() {

pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
1919
import static com.yahoo.pulsar.broker.web.PulsarWebResource.joinPath;
2020
import static org.mockito.Matchers.any;
21+
import static org.mockito.Mockito.doAnswer;
2122
import static org.mockito.Mockito.doNothing;
2223
import static org.mockito.Mockito.mock;
2324
import static org.mockito.Mockito.spy;
@@ -39,6 +40,8 @@
3940
import org.apache.commons.collections.CollectionUtils;
4041
import org.apache.commons.lang3.tuple.Pair;
4142
import org.apache.zookeeper.data.Stat;
43+
import org.mockito.invocation.InvocationOnMock;
44+
import org.mockito.stubbing.Answer;
4245
import org.testng.Assert;
4346
import org.testng.annotations.AfterMethod;
4447
import org.testng.annotations.BeforeMethod;
@@ -48,14 +51,18 @@
4851
import com.google.common.collect.Lists;
4952
import com.google.common.hash.Hashing;
5053
import com.yahoo.pulsar.broker.service.BrokerTestBase;
54+
import com.yahoo.pulsar.broker.service.Topic;
5155
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
56+
import com.yahoo.pulsar.client.api.Consumer;
57+
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
5258
import com.yahoo.pulsar.common.naming.DestinationName;
5359
import com.yahoo.pulsar.common.naming.NamespaceBundle;
5460
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
5561
import com.yahoo.pulsar.common.naming.NamespaceBundles;
5662
import com.yahoo.pulsar.common.naming.NamespaceName;
5763
import com.yahoo.pulsar.common.policies.data.Policies;
5864
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
65+
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
5966

6067
public class NamespaceServiceTest extends BrokerTestBase {
6168

@@ -232,10 +239,45 @@ public void testremoveOwnershipNamespaceBundle() throws Exception {
232239
NamespaceBundle bundle = bundles.getBundles().get(0);
233240
assertNotNull(ownershipCache.tryAcquiringOwnership(bundle));
234241
assertNotNull(ownershipCache.getOwnedBundle(bundle));
235-
ownershipCache.removeOwnership(bundles);
242+
ownershipCache.removeOwnership(bundles).get();
236243
assertNull(ownershipCache.getOwnedBundle(bundle));
237244
}
238245

246+
@Test
247+
public void testUnloadNamespaceBundleFailure() throws Exception {
248+
249+
final String topicName = "persistent://my-property/use/my-ns/my-topic1";
250+
ConsumerConfiguration conf = new ConsumerConfiguration();
251+
Consumer consumer = pulsarClient.subscribe(topicName, "my-subscriber-name", conf);
252+
ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics = pulsar.getBrokerService().getTopics();
253+
Topic spyTopic = spy(topics.get(topicName).get());
254+
topics.clear();
255+
CompletableFuture<Topic> topicFuture = CompletableFuture.completedFuture(spyTopic);
256+
// add mock topic
257+
topics.put(topicName, topicFuture);
258+
doAnswer(new Answer<CompletableFuture<Void>>() {
259+
@Override
260+
public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
261+
CompletableFuture<Void> result = new CompletableFuture<>();
262+
result.completeExceptionally(new RuntimeException("first time failed"));
263+
return result;
264+
}
265+
}).when(spyTopic).close();
266+
NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(DestinationName.get(topicName));
267+
try {
268+
pulsar.getNamespaceService().unloadNamespaceBundle(bundle);
269+
} catch (Exception e) {
270+
// fail
271+
fail(e.getMessage());
272+
}
273+
try {
274+
pulsar.getLocalZkCache().getZooKeeper().getData(ServiceUnitZkUtils.path(bundle), null, null);
275+
fail("it should fail as node is not present");
276+
} catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
277+
// ok
278+
}
279+
}
280+
239281
@SuppressWarnings("unchecked")
240282
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
241283
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {

pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ public void testRemoveOwnership() throws Exception {
258258
// case 1: no one owns the namespace
259259
assertFalse(cache.getOwnerAsync(bundle).get().isPresent());
260260

261-
cache.removeOwnership(bundle);
261+
cache.removeOwnership(bundle).get();
262262
assertTrue(cache.getOwnedBundles().isEmpty());
263263

264264
// case 2: this broker owns the namespace
@@ -267,6 +267,7 @@ public void testRemoveOwnership() throws Exception {
267267
assertTrue(!data1.isDisabled());
268268
assertTrue(cache.getOwnedBundles().size() == 1);
269269
cache.removeOwnership(bundle);
270+
Thread.sleep(500);
270271
assertTrue(cache.getOwnedBundles().isEmpty());
271272

272273
Thread.sleep(500);

pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ public void testConfigChangeNegativeCases() throws Exception {
232232
}
233233

234234
// restore the namespace state
235-
ownerCache.removeOwnership(globalNsBundle);
235+
ownerCache.removeOwnership(globalNsBundle).get();
236236
ownerCache.tryAcquiringOwnership(globalNsBundle);
237237
}
238238

0 commit comments

Comments
 (0)