Skip to content

Unload bundle: close topic forcefully and enable bundle on ownership removal failure #164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -677,15 +677,15 @@ private boolean isDestinationOwned(DestinationName fqdn) throws Exception {
}

public void removeOwnedServiceUnit(NamespaceName nsName) throws Exception {
ownershipCache.removeOwnership(getFullBundle(nsName));
ownershipCache.removeOwnership(getFullBundle(nsName)).get();
}

public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
ownershipCache.removeOwnership(nsBundle);
ownershipCache.removeOwnership(nsBundle).get();
}

public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData) throws Exception {
ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData));
ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get();
}

public NamespaceBundleFactory getNamespaceBundleFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,26 @@ public void handleUnloadRequest(PulsarService pulsar) throws Exception {
int unloadedTopics = 0;
try {
LOG.info("Disabling ownership: {}", this.bundle);
pulsar.getNamespaceService().getOwnershipCache().disableOwnership(this.bundle);

// Handle unload of persistent topics
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle).get();
pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle);
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.bundle, false);

// close topics forcefully
try {
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle).get();
} catch (Exception e) {
// ignore topic-close failure to unload bundle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also make sure the MLs are removed from the ML factory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while closing the topic we disconnects producers, subscribers and replicator and if it is successful then we remove it from the MLFactory immediately.
Disconnecting prod,sub,repl should be clean but if not for one of the topic then additionally we can schedule a retry only for unloading-bundle : if there was a failure while closing any topic and if we successfully remove the ownership-node later.
any thought?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was saying to verify that ML.close() will remove the ML from the factory even in the presence of errors. Errors could happen when closing the cursor or closing the last data ledger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, ML.close() removes ML from MLFactory first and then tries to close ledger the cursors.

LOG.error("Failed to close topics under namespace {}", bundle.toString(), e);
}
// delete ownership node on zk
try {
pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle).get();
} catch (Exception e) {
// Failed to remove ownership node: enable namespace-bundle again so, it can serve new topics
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.bundle, true);
throw new RuntimeException(String.format("Failed to delete ownership node %s", bundle.toString()),
e.getCause());
}
} catch (Exception e) {
LOG.error(String.format("failed to unload a namespace. ns=%s", bundle.toString()), e);
LOG.error("Failed to unload a namespace {}", bundle.toString(), e);
throw new RuntimeException(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.common.base.Preconditions.checkState;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -37,6 +38,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.client.util.FutureUtil;
Expand Down Expand Up @@ -141,26 +143,6 @@ public CompletableFuture<OwnedBundle> asyncLoad(String namespaceBundleZNode, Exe
}
}

private class OwnedServiceUnitCacheRemovalListener implements RemovalListener<String, OwnedBundle> {

@Override
public void onRemoval(String key, OwnedBundle value, RemovalCause cause) {
LOG.info("Removing ownership for {}", key);
// Under the cache sync lock, removing the ZNode
// If succeeded, we guaranteed that the cache entry is removed together w/ ZNode

localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> {
if (rc == KeeperException.Code.OK.intValue()) {
LOG.info("Removed zk lock for service unit: {}", key);
} else {
LOG.warn("Failed to delete the namespace ephemeral node. key={}", key,
KeeperException.Code.get(rc));
}
}, null);
ownershipReadOnlyCache.invalidate(key);
}
}

/**
* Constructor of <code>OwnershipCache</code>
*
Expand All @@ -179,7 +161,6 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory
this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
// ownedBundlesCache contains all namespaces that are owned by the local broker
this.ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.sameThreadExecutor())
.removalListener(new OwnedServiceUnitCacheRemovalListener())
.buildAsync(new OwnedServiceUnitCacheLoader());
}

Expand Down Expand Up @@ -268,8 +249,22 @@ public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(Namespace
* Method to remove the ownership of local broker on the <code>NamespaceBundle</code>, if owned
*
*/
public void removeOwnership(NamespaceBundle bundle) {
ownedBundlesCache.synchronous().invalidate(ServiceUnitZkUtils.path(bundle));
public CompletableFuture<Void> removeOwnership(NamespaceBundle bundle) {
CompletableFuture<Void> result = new CompletableFuture<>();
String key = ServiceUnitZkUtils.path(bundle);
localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> {
if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NONODE.intValue()) {
LOG.info("[{}] Removed zk lock for service unit: {}", key, KeeperException.Code.get(rc));
ownedBundlesCache.synchronous().invalidate(key);
ownershipReadOnlyCache.invalidate(key);
result.complete(null);
} else {
LOG.warn("[{}] Failed to delete the namespace ephemeral node. key={}", key,
KeeperException.Code.get(rc));
result.completeExceptionally(KeeperException.create(rc));
}
}, null);
return result;
}

/**
Expand All @@ -278,22 +273,18 @@ public void removeOwnership(NamespaceBundle bundle) {
* @param bundles
* <code>NamespaceBundles</code> to remove from ownership cache
*/
public void removeOwnership(NamespaceBundles bundles) {
boolean hasError = false;
public CompletableFuture<Void> removeOwnership(NamespaceBundles bundles) {
List<CompletableFuture<Void>> allFutures = Lists.newArrayList();
for (NamespaceBundle bundle : bundles.getBundles()) {
if (getOwnedBundle(bundle) == null) {
// continue
continue;
}
try {
this.removeOwnership(bundle);
} catch (Exception e) {
LOG.warn(String.format("Failed to remove ownership of a service unit: %s", bundle), e);
hasError = true;
}
allFutures.add(this.removeOwnership(bundle));
}
checkState(!hasError, "Not able to remove all owned bundles");
return FutureUtil.waitForAll(allFutures);
}


/**
* Method to access the map of all <code>ServiceUnit</code> objects owned by the local broker
Expand Down Expand Up @@ -330,17 +321,32 @@ public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
}
}

/**
* Disable bundle in local cache and on zk
*
* @param bundle
* @throws Exception
*/
public void disableOwnership(NamespaceBundle bundle) throws Exception {
String path = ServiceUnitZkUtils.path(bundle);

updateBundleState(bundle, false);
localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfoDisabled), -1);
ownershipReadOnlyCache.invalidate(path);
}

/**
* Update bundle state in a local cache
*
* @param bundle
* @throws Exception
*/
public void updateBundleState(NamespaceBundle bundle, boolean isActive) throws Exception {
String path = ServiceUnitZkUtils.path(bundle);
// Disable owned instance in local cache
CompletableFuture<OwnedBundle> f = ownedBundlesCache.getIfPresent(path);
if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
f.join().setActive(false);
f.join().setActive(isActive);
}

localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfoDisabled), -1);
ownershipReadOnlyCache.invalidate(path);
}

public NamespaceEphemeralData getSelfOwnerInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static com.yahoo.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand All @@ -39,6 +40,8 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.data.Stat;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -48,14 +51,18 @@
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.yahoo.pulsar.broker.service.BrokerTestBase;
import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;

public class NamespaceServiceTest extends BrokerTestBase {

Expand Down Expand Up @@ -232,10 +239,45 @@ public void testremoveOwnershipNamespaceBundle() throws Exception {
NamespaceBundle bundle = bundles.getBundles().get(0);
assertNotNull(ownershipCache.tryAcquiringOwnership(bundle));
assertNotNull(ownershipCache.getOwnedBundle(bundle));
ownershipCache.removeOwnership(bundles);
ownershipCache.removeOwnership(bundles).get();
assertNull(ownershipCache.getOwnedBundle(bundle));
}

@Test
public void testUnloadNamespaceBundleFailure() throws Exception {

final String topicName = "persistent://my-property/use/my-ns/my-topic1";
ConsumerConfiguration conf = new ConsumerConfiguration();
Consumer consumer = pulsarClient.subscribe(topicName, "my-subscriber-name", conf);
ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics = pulsar.getBrokerService().getTopics();
Topic spyTopic = spy(topics.get(topicName).get());
topics.clear();
CompletableFuture<Topic> topicFuture = CompletableFuture.completedFuture(spyTopic);
// add mock topic
topics.put(topicName, topicFuture);
doAnswer(new Answer<CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(new RuntimeException("first time failed"));
return result;
}
}).when(spyTopic).close();
NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(DestinationName.get(topicName));
try {
pulsar.getNamespaceService().unloadNamespaceBundle(bundle);
} catch (Exception e) {
// fail
fail(e.getMessage());
}
try {
pulsar.getLocalZkCache().getZooKeeper().getData(ServiceUnitZkUtils.path(bundle), null, null);
fail("it should fail as node is not present");
} catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
// ok
}
}

@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public void testRemoveOwnership() throws Exception {
// case 1: no one owns the namespace
assertFalse(cache.getOwnerAsync(bundle).get().isPresent());

cache.removeOwnership(bundle);
cache.removeOwnership(bundle).get();
assertTrue(cache.getOwnedBundles().isEmpty());

// case 2: this broker owns the namespace
Expand All @@ -267,6 +267,7 @@ public void testRemoveOwnership() throws Exception {
assertTrue(!data1.isDisabled());
assertTrue(cache.getOwnedBundles().size() == 1);
cache.removeOwnership(bundle);
Thread.sleep(500);
assertTrue(cache.getOwnedBundles().isEmpty());

Thread.sleep(500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void testConfigChangeNegativeCases() throws Exception {
}

// restore the namespace state
ownerCache.removeOwnership(globalNsBundle);
ownerCache.removeOwnership(globalNsBundle).get();
ownerCache.tryAcquiringOwnership(globalNsBundle);
}

Expand Down