pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] jai1 closed pull request #1710: Issue #1117: handle race in concurrent bundle split
Date Wed, 02 May 2018 08:27:06 GMT
jai1 closed pull request #1710: Issue #1117: handle race in concurrent bundle split 
URL: https://github.com/apache/incubator-pulsar/pull/1710
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 66a1ffaab6..4b28cad8b4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -22,6 +22,8 @@
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
 
+import com.google.common.collect.Maps;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -38,6 +40,7 @@
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,20 +82,28 @@ public LocalPolicies deserialize(String path, byte[] content) throws Exception
{
 
             @Override
             public CompletableFuture<Optional<LocalPolicies>> getAsync(String
path) {
-                CompletableFuture<Optional<LocalPolicies>> future = new CompletableFuture<>();
+                return getWithStatAsync(path).thenApply(entry -> entry.map(e -> e.getKey()));
+            }
+
+            @Override
+            public CompletableFuture<Optional<Entry<LocalPolicies, Stat>>>
getWithStatAsync(String path) {
+                CompletableFuture<Optional<Entry<LocalPolicies, Stat>>>
future = new CompletableFuture<>();
 
                 // First check in local-zk cache
-                super.getAsync(path).thenAccept(localPolicies -> {
+                super.getWithStatAsync(path).thenAccept(result -> {
+                    Optional<LocalPolicies> localPolicies = result.map(Entry::getKey);
                     if (localPolicies.isPresent()) {
-                        future.complete(localPolicies);
+                        future.complete(result);
                     } else {
                         // create new policies node under Local ZK by coping it from Global
ZK
                         createPolicies(path, true).thenAccept(p -> {
                             LOG.info("Successfully created local policies for {} -- {}",
path, p);
                             // local-policies have been created but it's not part of policiesCache.
so, call
                             // super.getAsync() which will load it and set the watch on local-policies
path
-                            super.getAsync(path);
-                            future.complete(p);
+                            super.getWithStatAsync(path);
+                            Stat stat = new Stat();
+                            stat.setVersion(-1);
+                            future.complete(Optional.of(Maps.immutableEntry(p.orElse(null),
stat)));
                         }).exceptionally(ex -> {
                             future.completeExceptionally(ex);
                             return null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e7b885a427..b8e4ae0e58 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -22,7 +22,6 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
 import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
@@ -37,12 +36,12 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -52,6 +51,7 @@
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.lookup.data.LookupData;
@@ -72,6 +72,7 @@
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,6 +112,8 @@
 
     private final String host;
 
+    private static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
+
     public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
     public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
     public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY
+ "/[^/]+/([^:]+:\\d+)");
@@ -546,87 +549,141 @@ public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws
Exceptio
 
     /**
      * 1. split the given bundle into two bundles 2. assign ownership of both the bundles
to current broker 3. update
-     * policies with newly created bundles into LocalZK 4. disable original bundle and refresh
the cache
+     * policies with newly created bundles into LocalZK 4. disable original bundle and refresh
the cache.
+     *
+     * It will call splitAndOwnBundleOnceAndRetry to do the real retry work, which will retry
"retryTimes".
      *
      * @param bundle
      * @return
      * @throws Exception
      */
-    public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, final
boolean unload) throws Exception {
+    public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean
unload)
+        throws Exception {
 
         final CompletableFuture<Void> unloadFuture = new CompletableFuture<>();
+        final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
+
+        splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture);
+
+        return unloadFuture;
+    }
+
+    void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
+                                       boolean unload,
+                                       AtomicInteger counter,
+                                       CompletableFuture<Void> unloadFuture) {
+        CompletableFuture<NamespaceBundles> updateFuture = new CompletableFuture<>();
 
         final Pair<NamespaceBundles, List<NamespaceBundle>> splittedBundles =
bundleFactory.splitBundles(bundle,
-                2 /* by default split into 2 */);
+            2 /* by default split into 2 */);
+
+        // Split and updateNamespaceBundles. Update may fail because of concurrent write
to Zookeeper.
         if (splittedBundles != null) {
             checkNotNull(splittedBundles.getLeft());
             checkNotNull(splittedBundles.getRight());
             checkArgument(splittedBundles.getRight().size() == 2, "bundle has to be split
in two bundles");
             NamespaceName nsname = bundle.getNamespaceObject();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {},  2 bundles: {}, {}",
+                    nsname.toString(), bundle.getBundleRange(), counter.get(),
+                    splittedBundles != null ? splittedBundles.getRight().get(0).getBundleRange()
: "null splittedBundles",
+                    splittedBundles != null ? splittedBundles.getRight().get(1).getBundleRange()
: "null splittedBundles");
+            }
             try {
                 // take ownership of newly split bundles
                 for (NamespaceBundle sBundle : splittedBundles.getRight()) {
                     checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle));
                 }
                 updateNamespaceBundles(nsname, splittedBundles.getLeft(),
-                        (rc, path, zkCtx, stat) -> pulsar.getOrderedExecutor().submit(safeRun(()
-> {
-                            if (rc == KeeperException.Code.OK.intValue()) {
-                                try {
-                                    // disable old bundle in memory
-                                    getOwnershipCache().updateBundleState(bundle, false);
-                                    // invalidate cache as zookeeper has new split
-                                    // namespace bundle
-                                    bundleFactory.invalidateBundleCache(nsname);
-                                    // update bundled_topic cache for load-report-generation
-                                    pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
-                                    loadManager.get().setLoadReportForceUpdateFlag();
-                                    unloadFuture.complete(null);
-                                } catch (Exception e) {
-                                    String msg1 = format(
-                                            "failed to disable bundle %s under namespace
[%s] with error %s",
-                                            nsname.toString(), bundle.toString(), e.getMessage());
-                                    LOG.warn(msg1, e);
-                                    unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1));
-                                }
-                            } else {
-                                String msg2 = format("failed to update namespace [%s] policies
due to %s",
-                                        nsname.toString(),
-                                        KeeperException.create(KeeperException.Code.get(rc)).getMessage());
-                                LOG.warn(msg2);
-                                unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2));
-                            }
-                        })));
+                    (rc, path, zkCtx, stat) ->  {
+                        if (rc == Code.OK.intValue()) {
+                            // invalidate cache as zookeeper has new split
+                            // namespace bundle
+                            bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
+
+                            updateFuture.complete(splittedBundles.getLeft());
+                        } else if (rc == Code.BADVERSION.intValue()) {
+                            KeeperException keeperException = KeeperException.create(KeeperException.Code.get(rc));
+                            String msg = format("failed to update namespace policies [%s],
NamespaceBundle: %s " +
+                                    "due to %s, counter: %d",
+                                nsname.toString(), bundle.getBundleRange(),
+                                keeperException.getMessage(), counter.get());
+                            LOG.warn(msg);
+                            updateFuture.completeExceptionally(new ServerMetadataException(keeperException));
+                        } else {
+                            String msg = format("failed to update namespace policies [%s],
NamespaceBundle: %s due to %s",
+                                nsname.toString(), bundle.getBundleRange(),
+                                KeeperException.create(KeeperException.Code.get(rc)).getMessage());
+                            LOG.warn(msg);
+                            updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
+                        }
+                    });
             } catch (Exception e) {
-                String msg = format("failed to aquire ownership of split bundle for namespace
[%s], %s",
-                        nsname.toString(), e.getMessage());
+                String msg = format("failed to acquire ownership of split bundle for namespace
[%s], %s",
+                    nsname.toString(), e.getMessage());
                 LOG.warn(msg, e);
-                unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
+                updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
             }
-
         } else {
             String msg = format("bundle %s not found under namespace", bundle.toString());
-            unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
+            LOG.warn(msg);
+            updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
         }
 
-        return unloadFuture.thenApply(res -> {
-            if (!unload) {
-                return null;
+        // If success updateNamespaceBundles, then do invalidateBundleCache and unload.
+        // Else retry splitAndOwnBundleOnceAndRetry.
+        updateFuture.whenCompleteAsync((r, t)-> {
+            if (t != null) {
+                // retry several times on BadVersion
+                if ((t instanceof ServerMetadataException) && (counter.decrementAndGet()
>= 0)) {
+                    pulsar.getOrderedExecutor().submit(
+                        () -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture));
+                } else {
+                    // Retry enough, or meet other exception
+                    String msg2 = format(" %s not success update nsBundles, counter %d, reason
%s",
+                        bundle.toString(), counter.get(), t.getMessage());
+                    LOG.warn(msg2);
+                    unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2));
+                }
+                return;
             }
-            // unload new split bundles
-            splittedBundles.getRight().forEach(splitBundle -> {
-                try {
-                    unloadNamespaceBundle(splitBundle);
-                } catch (Exception e) {
-                    LOG.warn("Failed to unload split bundle {}", splitBundle, e);
-                    throw new RuntimeException("Failed to unload split bundle " + splitBundle,
e);
+
+            // success updateNamespaceBundles
+            try {
+                // disable old bundle in memory
+                getOwnershipCache().updateBundleState(bundle, false);
+
+                // update bundled_topic cache for load-report-generation
+                pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
+                loadManager.get().setLoadReportForceUpdateFlag();
+
+                if (unload) {
+                    // unload new split bundles
+                    r.getBundles().forEach(splitBundle -> {
+                        try {
+                            unloadNamespaceBundle(splitBundle);
+                        } catch (Exception e) {
+                            LOG.warn("Failed to unload split bundle {}", splitBundle, e);
+                            throw new RuntimeException("Failed to unload split bundle " +
splitBundle, e);
+                        }
+                    });
                 }
-            });
-            return null;
-        });
+
+                unloadFuture.complete(null);
+            } catch (Exception e) {
+                String msg1 = format(
+                    "failed to disable bundle %s under namespace [%s] with error %s",
+                    bundle.getNamespaceObject().toString(), bundle.toString(), e.getMessage());
+                LOG.warn(msg1, e);
+                unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1));
+            }
+            return;
+        }, pulsar.getOrderedExecutor());
     }
 
     /**
-     * update new bundle-range to LocalZk (create a new node if not present)
+     * Update new bundle-range to LocalZk (create a new node if not present).
+     * Update may fail because of concurrent write to Zookeeper.
      *
      * @param nsname
      * @param nsBundles
@@ -643,12 +700,16 @@ private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles
nsBun
         if (!policies.isPresent()) {
             // if policies is not present into localZk then create new policies
             this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(cacheTimeOutInSec,
SECONDS);
-            policies = this.pulsar.getLocalZkCacheService().policiesCache().get(path);
         }
 
-        policies.get().bundles = getBundlesData(nsBundles);
-        this.pulsar.getLocalZkCache().getZooKeeper().setData(path,
-                ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies.get()), -1,
callback, null);
+        long version = nsBundles.getVersion();
+        LocalPolicies local = new LocalPolicies();
+        local.bundles = getBundlesData(nsBundles);
+        byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(local);
+
+        this.pulsar.getLocalZkCache().getZooKeeper()
+            .setData(path, data, Math.toIntExact(version), callback, null);
+
         // invalidate namespace's local-policies
         this.pulsar.getLocalZkCacheService().policiesCache().invalidate(path);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2f3b27a75b..dd272b76bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -30,6 +30,7 @@
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -1010,7 +1011,17 @@ public AuthenticationService getAuthenticationService() {
     }
 
     public List<Topic> getAllTopicsFromNamespaceBundle(String namespace, String bundle)
{
-        return multiLayerTopicsMap.get(namespace).get(bundle).values();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> map1
= multiLayerTopicsMap.get(namespace);
+        if (map1 == null) {
+            return Collections.emptyList();
+        }
+
+        ConcurrentOpenHashMap<String, Topic> map2 = map1.get(bundle);
+        if (map2 == null) {
+            return Collections.emptyList();
+        }
+
+        return map2.values();
     }
 
     public ZooKeeperDataCache<Map<String, String>> getDynamicConfigurationCache()
{
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 44dd686700..62670aaf38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -28,6 +28,8 @@
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.SortedSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -78,10 +80,19 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc)
{
 
             CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
             // Read the static bundle data from the policies
-            pulsar.getLocalZkCacheService().policiesCache().getAsync(path).thenAccept(policies
-> {
+            pulsar.getLocalZkCacheService().policiesCache().getWithStatAsync(path).thenAccept(result
-> {
                 // If no policies defined for namespace, assume 1 single bundle
-                BundlesData bundlesData = policies.map(p -> p.bundles).orElse(null);
-                NamespaceBundles namespaceBundles = getBundles(namespace, bundlesData);
+                BundlesData bundlesData = result.map(Entry::getKey).map(p -> p.bundles).orElse(null);
+                NamespaceBundles namespaceBundles = getBundles(
+                    namespace, bundlesData, result.map(Entry::getValue).map(s -> s.getVersion()).orElse(-1));
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[{}] Get bundles from getLocalZkCacheService: path: {},  bundles:
{}, version: {}",
+                        namespace, path,
+                        (bundlesData != null && bundlesData.boundaries != null) ?
bundlesData.toString() : "null",
+                        namespaceBundles.getVersion());
+                }
+
                 future.complete(namespaceBundles);
             }).exceptionally(ex -> {
                 future.completeExceptionally(ex);
@@ -157,7 +168,7 @@ public NamespaceBundle getBundle(String namespace, String bundleRange)
{
                 (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED
: BoundType.OPEN);
         return getBundle(NamespaceName.get(namespace), hashRange);
     }
-    
+
     public NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
         return bundlesCache.synchronous().get(fqnn).getFullBundle();
     }
@@ -167,6 +178,10 @@ public long getLongHashCode(String name) {
     }
 
     public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData) {
+        return getBundles(nsname, bundleData, -1);
+    }
+
+    public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData, long
version) {
         long[] partitions;
         if (bundleData == null) {
             partitions = new long[] { Long.decode(FIRST_BOUNDARY), Long.decode(LAST_BOUNDARY)
};
@@ -176,7 +191,7 @@ public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData)
                 partitions[i] = Long.decode(bundleData.boundaries.get(i));
             }
         }
-        return new NamespaceBundles(nsname, partitions, this);
+        return new NamespaceBundles(nsname, partitions, this, version);
     }
 
     public static BundlesData getBundlesData(NamespaceBundles bundles) throws Exception {
@@ -199,10 +214,8 @@ public static BundlesData getBundlesData(NamespaceBundles bundles) throws
Except
      *            split into numBundles
      * @return List of split {@link NamespaceBundle} and {@link NamespaceBundles} that contains
final bundles including
      *         split bundles for a given namespace
-     * @throws Exception
      */
-    public Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundle
targetBundle, int numBundles)
-            throws Exception {
+    public Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundle
targetBundle, int numBundles) {
         checkArgument(canSplitBundle(targetBundle), "%s bundle can't be split further", targetBundle);
         checkNotNull(targetBundle, "can't split null bundle");
         checkNotNull(targetBundle.getNamespaceObject(), "namespace must be present");
@@ -234,7 +247,8 @@ public static BundlesData getBundlesData(NamespaceBundles bundles) throws
Except
         }
         partitions[pos] = sourceBundle.partitions[lastIndex];
         if (splitPartition != -1) {
-            NamespaceBundles splittedNsBundles = new NamespaceBundles(nsname, partitions,
this);
+            // keep version of sourceBundle
+            NamespaceBundles splittedNsBundles = new NamespaceBundles(nsname, partitions,
this, sourceBundle.getVersion());
             List<NamespaceBundle> splittedBundles = splittedNsBundles.getBundles().subList(splitPartition,
                     (splitPartition + numBundles));
             return new ImmutablePair<NamespaceBundles, List<NamespaceBundle>>(splittedNsBundles,
splittedBundles);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
index 29d2b266af..0c09d686f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
@@ -26,9 +26,6 @@
 import java.util.List;
 import java.util.SortedSet;
 
-import org.apache.pulsar.common.naming.DestinationName;
-import org.apache.pulsar.common.naming.NamespaceName;
-
 import com.google.common.base.Objects;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Lists;
@@ -38,6 +35,8 @@
     private final NamespaceName nsname;
     private final ArrayList<NamespaceBundle> bundles;
     private final NamespaceBundleFactory factory;
+    private final long version;
+
     protected final long[] partitions;
 
     public static final Long FULL_LOWER_BOUND = 0x00000000L;
@@ -49,16 +48,17 @@ public NamespaceBundles(NamespaceName nsname, SortedSet<Long> partitionsSet,
Nam
         this(nsname, convertPartitions(partitionsSet), factory);
     }
 
-    public NamespaceBundles(NamespaceName nsname, long[] partitions, NamespaceBundleFactory
factory) {
+    public NamespaceBundles(NamespaceName nsname, long[] partitions, NamespaceBundleFactory
factory, long version) {
         // check input arguments
         this.nsname = checkNotNull(nsname);
         this.factory = checkNotNull(factory);
+        this.version = version;
         checkArgument(partitions.length > 0, "Can't create bundles w/o partition boundaries");
 
         // calculate bundles based on partition boundaries
         this.bundles = Lists.newArrayList();
         fullBundle = new NamespaceBundle(nsname,
-                Range.range(FULL_LOWER_BOUND, BoundType.CLOSED, FULL_UPPER_BOUND, BoundType.CLOSED),
factory);
+            Range.range(FULL_LOWER_BOUND, BoundType.CLOSED, FULL_UPPER_BOUND, BoundType.CLOSED),
factory);
 
         if (partitions.length > 0) {
             if (partitions.length == 1) {
@@ -86,6 +86,10 @@ public NamespaceBundles(NamespaceName nsname, long[] partitions, NamespaceBundle
         }
     }
 
+    public NamespaceBundles(NamespaceName nsname, long[] partitions, NamespaceBundleFactory
factory) {
+        this(nsname, partitions, factory, -1);
+    }
+
     public NamespaceBundle findBundle(DestinationName dn) {
         checkArgument(this.nsname.equals(dn.getNamespaceObject()));
         long hashCode = factory.getLongHashCode(dn.toString());
@@ -140,4 +144,8 @@ public boolean equals(Object obj) {
         }
         return false;
     }
+
+    public long getVersion() {
+        return version;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 444d224a34..3929c4855e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -27,18 +27,23 @@
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -106,6 +111,7 @@
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
 
+@Slf4j
 public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(AdminApiTest.class);
@@ -913,6 +919,115 @@ public void testNamespaceSplitBundle() throws Exception {
         producer.close();
     }
 
+    @Test
+    public void testNamespaceSplitBundleConcurrent() throws Exception {
+        // Force to create a topic
+        final String namespace = "prop-xyz/use/ns1";
+        final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        producer.send("message".getBytes());
+        publishMessagesOnPersistentTopic(topicName, 0);
+        assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName));
+
+        try {
+            admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false);
+        } catch (Exception e) {
+            fail("split bundle shouldn't have thrown exception");
+        }
+
+        // bundle-factory cache must have updated split bundles
+        NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
+        String[] splitRange = {namespace + "/0x00000000_0x7fffffff", namespace + "/0x7fffffff_0xffffffff"};
+        for (int i = 0; i < bundles.getBundles().size(); i++) {
+            assertEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
+        }
+
+        ExecutorService executorService = Executors.newCachedThreadPool();
+
+
+        try {
+            executorService.invokeAll(
+                Arrays.asList(
+                    () ->
+                    {
+                        log.info("split 2 bundles at the same time. spilt: 0x00000000_0x7fffffff
");
+                        admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff",
false);
+                        return null;
+                    },
+                    () ->
+                    {
+                        log.info("split 2 bundles at the same time. spilt: 0x7fffffff_0xffffffff
");
+                        admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff",
false);
+                        return null;
+                    }
+                )
+            );
+        } catch (Exception e) {
+            fail("split bundle shouldn't have thrown exception");
+        }
+
+        String[] splitRange4 = {
+            namespace + "/0x00000000_0x3fffffff",
+            namespace + "/0x3fffffff_0x7fffffff",
+            namespace + "/0x7fffffff_0xbfffffff",
+            namespace + "/0xbfffffff_0xffffffff"};
+        bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
+        assertEquals(bundles.getBundles().size(), 4);
+        for (int i = 0; i < bundles.getBundles().size(); i++) {
+            assertEquals(bundles.getBundles().get(i).toString(), splitRange4[i]);
+        }
+
+        try {
+            executorService.invokeAll(
+                Arrays.asList(
+                    () ->
+                    {
+                        log.info("split 4 bundles at the same time. spilt: 0x00000000_0x3fffffff
");
+                        admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff",
false);
+                        return null;
+                    },
+                    () ->
+                    {
+                        log.info("split 4 bundles at the same time. spilt: 0x3fffffff_0x7fffffff
");
+                        admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff",
false);
+                        return null;
+                    },
+                    () ->
+                    {
+                        log.info("split 4 bundles at the same time. spilt: 0x7fffffff_0xbfffffff
");
+                        admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff",
false);
+                        return null;
+                    },
+                    () ->
+                    {
+                        log.info("split 4 bundles at the same time. spilt: 0xbfffffff_0xffffffff
");
+                        admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff",
false);
+                        return null;
+                    }
+                )
+            );
+        } catch (Exception e) {
+            fail("split bundle shouldn't have thrown exception");
+        }
+
+        String[] splitRange8 = {
+            namespace + "/0x00000000_0x1fffffff",
+            namespace + "/0x1fffffff_0x3fffffff",
+            namespace + "/0x3fffffff_0x5fffffff",
+            namespace + "/0x5fffffff_0x7fffffff",
+            namespace + "/0x7fffffff_0x9fffffff",
+            namespace + "/0x9fffffff_0xbfffffff",
+            namespace + "/0xbfffffff_0xdfffffff",
+            namespace + "/0xdfffffff_0xffffffff"};
+        bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
+        assertEquals(bundles.getBundles().size(), 8);
+        for (int i = 0; i < bundles.getBundles().size(); i++) {
+            assertEquals(bundles.getBundles().get(i).toString(), splitRange8[i]);
+        }
+
+        producer.close();
+    }
+
     @Test
     public void testNamespaceUnloadBundle() throws Exception {
         assertEquals(admin.persistentTopics().getList("prop-xyz/use/ns1"), Lists.newArrayList());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index f20a877e3e..93131a7986 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -201,13 +201,9 @@ public void testSplitMapWithRefreshedStatMap() throws Exception {
             fail("split bundle faild", e);
         }
 
-        try {
-            // old bundle should be removed from status-map
-            list = this.pulsar.getBrokerService().getAllTopicsFromNamespaceBundle(nspace,
originalBundle.toString());
-            fail();
-        } catch (NullPointerException ne) {
-            // OK
-        }
+        // old bundle should be removed from status-map
+        list = this.pulsar.getBrokerService().getAllTopicsFromNamespaceBundle(nspace, originalBundle.toString());
+        assertTrue(list.isEmpty());
 
         // status-map should be updated with new split bundles
         NamespaceBundle splitBundle = pulsar.getNamespaceService().getBundle(dn);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
index 5c69d4331a..defed3ae95 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
@@ -73,6 +73,14 @@ public ZooKeeperDataCache(final ZooKeeperCache cache) {
         return future;
     }
 
+    public CompletableFuture<Optional<Entry<T, Stat>>> getWithStatAsync(String
path) {
+        return cache.getDataAsync(path, this, this).whenComplete((entry, ex) -> {
+            if (ex != null) {
+                cache.asyncInvalidate(path);
+            }
+        });
+    }
+
     /**
      * Return an item from the cache
      *


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message