bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-873 and BOOKKEEPER-553
Date Tue, 31 Jan 2017 01:42:02 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 13d668f22 -> 42e8f1294


 BOOKKEEPER-873 and BOOKKEEPER-553

    BOOKKEEPER-873: CreateLedgerAPI to accept ledgerId
    Add ledgerCreateAdv with ledgerId interface to Bookkeeper
    and corresponding Junit tests.

    BOOKKEEPER-553: LongHierarchicalLedgerManager
    - LongHierarchicalLedgerManager to support 63 bits ledgerid (positive long)
    - LongHierarchicalLedgerManager splits the generated id into 5 parts (3-4-4-4-4)

Author: Venkateswara <vjujjuri@salesforce.com>
Author: Charan Reddy Guttapalem <cguttapalem@salesforce.com>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #88 from reddycharan/ledgerhandleadvwithledgerid


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/42e8f129
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/42e8f129
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/42e8f129

Branch: refs/heads/master
Commit: 42e8f1294f206cbe51a6af669cf605833b78bf42
Parents: 13d668f
Author: Venkateswara <vjujjuri@salesforce.com>
Authored: Mon Jan 30 17:41:54 2017 -0800
Committer: Sijie Guo <sijie@apache.org>
Committed: Mon Jan 30 17:41:54 2017 -0800

----------------------------------------------------------------------
 .../apache/bookkeeper/client/BookKeeper.java    | 109 +++++-
 .../bookkeeper/client/LedgerCreateOp.java       |  26 +-
 .../meta/HierarchicalLedgerManager.java         |   4 +-
 .../meta/LongHierarchicalLedgerManager.java     | 334 +++++++++++++++++++
 .../LongHierarchicalLedgerManagerFactory.java   |  12 +
 .../org/apache/bookkeeper/util/StringUtils.java |  49 +++
 .../client/BookieWriteLedgerTest.java           | 103 +++++-
 .../client/TestWatchEnsembleChange.java         |   4 +-
 .../apache/bookkeeper/meta/GcLedgersTest.java   |   2 +-
 .../bookkeeper/meta/LedgerManagerTestCase.java  |   3 +-
 .../MultiLedgerManagerMultiDigestTestCase.java  |   1 +
 .../test/MultiLedgerManagerTestCase.java        |   3 +-
 12 files changed, 629 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 08c24b0..2f8a0b8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -738,7 +738,114 @@ public class BookKeeper implements AutoCloseable {
                 return;
             }
             new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
-                               ackQuorumSize, digestType, passwd, cb, ctx, customMetadata).initiateAdv();
+                               ackQuorumSize, digestType, passwd, cb, ctx, customMetadata).initiateAdv((long)(-1));
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Synchronously creates a new ledger using the interface which accepts a ledgerId as
input.
+     * This method returns {@link LedgerHandleAdv} which can accept entryId.
+     * Parameters must match those of
+     * {@link #asyncCreateLedgerAdvWithLedgerId(byte[], long, int, int, int, DigestType,
byte[],
+     *                           AsyncCallback.CreateCallback, Object)}
+     * @param ledgerId
+     * @param ensSize
+     * @param writeQuorumSize
+     * @param ackQuorumSize
+     * @param digestType
+     * @param passwd
+     * @param customMetadata
+     * @return a handle to the newly created ledger
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public LedgerHandle createLedgerAdv(final long ledgerId,
+                                        int ensSize,
+                                        int writeQuorumSize,
+                                        int ackQuorumSize,
+                                        DigestType digestType,
+                                        byte passwd[],
+                                        final Map<String, byte[]> customMetadata) throws
InterruptedException, BKException{
+        CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+
+        /*
+         * Calls asynchronous version
+         */
+        asyncCreateLedgerAdv(ledgerId, ensSize, writeQuorumSize, ackQuorumSize, digestType,
passwd,
+                             new SyncCreateCallback(), counter, customMetadata);
+
+        LedgerHandle lh = SynchCallbackUtils.waitForResult(counter);
+        if (lh == null) {
+            LOG.error("Unexpected condition : no ledger handle returned for a success ledger
creation");
+            throw BKException.create(BKException.Code.UnexpectedConditionException);
+        } else if (ledgerId != lh.getId()) {
+            LOG.error("Unexpected condition : Expected ledgerId: {} but got: {}", ledgerId,
lh.getId());
+            throw BKException.create(BKException.Code.UnexpectedConditionException);
+        }
+
+        LOG.info("Ensemble: {} for ledger: {}", lh.getLedgerMetadata().getEnsemble(0L),
+                lh.getId());
+
+        return lh;
+    }
+
+    /**
+     * Asynchronously creates a new ledger using the interface which accepts a ledgerId as
input.
+     * This method returns {@link LedgerHandleAdv} which can accept entryId.
+     * Ledgers created with this call have ability to accept
+     * a separate write quorum and ack quorum size. The write quorum must be larger than
+     * the ack quorum.
+     *
+     * Separating the write and the ack quorum allows the BookKeeper client to continue
+     * writing when a bookie has failed but the failure has not yet been detected. Detecting
+     * a bookie has failed can take a number of seconds, as configured by the read timeout
+     * {@link ClientConfiguration#getReadTimeout()}. Once the bookie failure is detected,
+     * that bookie will be removed from the ensemble.
+     *
+     * The other parameters match those of {@link #asyncCreateLedger(long, int, int, DigestType,
byte[],
+     *                                      AsyncCallback.CreateCallback, Object)}
+     *
+     * @param ledgerId
+     *          ledger Id to use for the newly created ledger
+     * @param ensSize
+     *          number of bookies over which to stripe entries
+     * @param writeQuorumSize
+     *          number of bookies each entry will be written to
+     * @param ackQuorumSize
+     *          number of bookies which must acknowledge an entry before the call is completed
+     * @param digestType
+     *          digest type, either MAC or CRC32
+     * @param passwd
+     *          password
+     * @param cb
+     *          createCallback implementation
+     * @param ctx
+     *          optional control object
+     * @param customMetadata
+     *          optional customMetadata that holds user specified metadata
+     */
+    public void asyncCreateLedgerAdv(final long ledgerId,
+                                     final int ensSize,
+                                     final int writeQuorumSize,
+                                     final int ackQuorumSize,
+                                     final DigestType digestType,
+                                     final byte[] passwd,
+                                     final CreateCallback cb,
+                                     final Object ctx,
+                                     final Map<String, byte[]> customMetadata) {
+        if (writeQuorumSize < ackQuorumSize) {
+            throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
+        }
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                cb.createComplete(BKException.Code.ClientClosedException, null, ctx);
+                return;
+            }
+            new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
+                               ackQuorumSize, digestType, passwd, cb, ctx, customMetadata).initiateAdv(ledgerId);
         } finally {
             closeLock.readLock().unlock();
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 52a5cb6..376d716 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -48,7 +48,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
     CreateCallback cb;
     LedgerMetadata metadata;
     LedgerHandle lh;
-    Long ledgerId;
+    Long ledgerId = -1L;
     Object ctx;
     byte[] passwd;
     BookKeeper bk;
@@ -56,6 +56,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
     long startTime;
     OpStatsLogger createOpLogger;
     boolean adv = false;
+    boolean generateLedgerId = true;
 
     /**
      * Constructor
@@ -119,12 +120,16 @@ class LedgerCreateOp implements GenericCallback<Void> {
          * Add ensemble to the configuration
          */
         metadata.addEnsemble(0L, ensemble);
-
-        createLedger();
+        if (this.generateLedgerId) {
+            generateLedgerIdAndCreateLedger();
+        } else {
+            // Create ledger with supplied ledgerId
+            bk.getLedgerManager().createLedgerMetadata(ledgerId, metadata, LedgerCreateOp.this);
+        }
     }
 
-    void createLedger() {
-        // generate a ledger id and then create the ledger with metadata
+    void generateLedgerIdAndCreateLedger() {
+        // generate a ledgerId
         final LedgerIdGenerator ledgerIdGenerator = bk.getLedgerIdGenerator();
         ledgerIdGenerator.generateLedgerId(new GenericCallback<Long>() {
             @Override
@@ -133,7 +138,6 @@ class LedgerCreateOp implements GenericCallback<Void> {
                     createComplete(rc, null);
                     return;
                 }
-
                 LedgerCreateOp.this.ledgerId = ledgerId;
                 // create a ledger with metadata
                 bk.getLedgerManager().createLedgerMetadata(ledgerId, metadata, LedgerCreateOp.this);
@@ -144,8 +148,12 @@ class LedgerCreateOp implements GenericCallback<Void> {
     /**
      * Initiates the operation to return LedgerHandleAdv.
      */
-    public void initiateAdv() {
+    public void initiateAdv(final long ledgerId) {
         this.adv = true;
+        this.ledgerId = ledgerId;
+        if (this.ledgerId != -1L) {
+            this.generateLedgerId = false;
+        }
         initiate();
     }
 
@@ -154,9 +162,9 @@ class LedgerCreateOp implements GenericCallback<Void> {
      */
     @Override
     public void operationComplete(int rc, Void result) {
-        if (BKException.Code.LedgerExistException == rc) {
+        if (this.generateLedgerId && (BKException.Code.LedgerExistException == rc))
{
             // retry to generate a new ledger id
-            createLedger();
+            generateLedgerIdAndCreateLedger();
             return;
         } else if (BKException.Code.OK != rc) {
             createComplete(rc, null);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index 2804af1..bed1627 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -84,7 +84,7 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
     }
 
     // get ledger from all level nodes
-    private long getLedgerId(String...levelNodes) throws IOException {
+    long getLedgerId(String...levelNodes) throws IOException {
         return StringUtils.stringToHierarchicalLedgerId(levelNodes);
     }
 
@@ -151,7 +151,7 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
     /**
      * Process hash nodes in a given path
      */
-    private void asyncProcessLevelNodes(
+    void asyncProcessLevelNodes(
         final String path, final Processor<String> processor,
         final AsyncCallback.VoidCallback finalCb, final Object context,
         final int successRc, final int failureRc) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
new file mode 100644
index 0000000..990297f
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LongHierarchical Ledger Manager which manages ledger meta in zookeeper using 4-level hierarchical
znodes.
+ *
+ * <p>
+ * LongHierarchicalLedgerManager splits the generated id into 5 parts (3-4-4-4-4):
+ *
+ * <pre>
+ * &lt;level1 (3 digits)&gt;&lt;level2 (4 digits)&gt;&lt;level3 (4 digits)&gt;&lt;level4
(4 digits)&gt;
+ * &lt;level5 (4 digits)&gt;
+ * </pre>
+ *
+ * These 5 parts are used to form the actual ledger node path used to store ledger metadata:
+ *
+ * <pre>
+ * (ledgersRootPath) / level1 / level2 / level3 / level4 / L(level5)
+ * </pre>
+ *
+ * E.g Ledger 0000000000000000001 is split into 5 parts <i>000</i>, <i>0000</i>,
<i>0000</i>, <i>0000</i>, <i>0001</i>,
+ * which is stored in <i>(ledgersRootPath)/000/0000/0000/0000/L0001</i>. So each
znode could have at most 10000 ledgers,
+ * which avoids errors during garbage collection due to lists of children that are too long.
+ */
+class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
+
+    static final Logger LOG = LoggerFactory.getLogger(LongHierarchicalLedgerManager.class);
+
+    private static final String MAX_ID_SUFFIX = "9999";
+    private static final String MIN_ID_SUFFIX = "0000";
+
+    /**
+     * Constructor
+     *
+     * @param conf
+     *            Configuration object
+     * @param zk
+     *            ZooKeeper Client Handle
+     */
+    public LongHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+        super(conf, zk);
+    }
+
+    @Override
+    public String getLedgerPath(long ledgerId) {
+        return ledgerRootPath + StringUtils.getLongHierarchicalLedgerPath(ledgerId);
+    }
+
+    @Override
+    public long getLedgerId(String pathName) throws IOException {
+        if (!pathName.startsWith(ledgerRootPath)) {
+            throw new IOException("it is not a valid hashed path name : " + pathName);
+        }
+        String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
+        return StringUtils.stringToLongHierarchicalLedgerId(hierarchicalPath);
+    }
+
+    //
+    // Active Ledger Manager
+    //
+
+    /**
+     * Get the smallest cache id in a specified node /level1/level2/level3/level4
+     *
+     * @param level1
+     *            1st level node name
+     * @param level2
+     *            2nd level node name
+     * @param level3
+     *            3rd level node name
+     * @param level4
+     *            4th level node name
+     * @return the smallest ledger id
+     */
+    private long getStartLedgerIdByLevel(String level1, String level2, String level3, String
level4)
+            throws IOException {
+        return getLedgerId(level1, level2, level3, level4, MIN_ID_SUFFIX);
+    }
+
+    /**
+     * Get the largest cache id in a specified node /level1/level2/level3/level4
+     *
+     * @param level1
+     *            1st level node name
+     * @param level2
+     *            2nd level node name
+     * @param level3
+     *            3rd level node name
+     * @param level4
+     *            4th level node name
+     * @return the largest ledger id
+     */
+    private long getEndLedgerIdByLevel(String level1, String level2, String level3, String
level4) throws IOException {
+        return getLedgerId(level1, level2, level3, level4, MAX_ID_SUFFIX);
+    }
+
+    @Override
+    public void asyncProcessLedgers(final Processor<Long> processor, final AsyncCallback.VoidCallback
finalCb,
+            final Object context, final int successRc, final int failureRc) {
+        asyncProcessLevelNodes(ledgerRootPath,
+                new RecursiveProcessor(0, ledgerRootPath, processor, context, successRc,
failureRc), finalCb, context,
+                successRc, failureRc);
+    }
+
+    private class RecursiveProcessor implements Processor<String> {
+        private final int level;
+        private final String path;
+        private final Processor<Long> processor;
+        private final Object context;
+        private final int successRc;
+        private final int failureRc;
+
+        private RecursiveProcessor(int level, String path, Processor<Long> processor,
Object context, int successRc,
+                int failureRc) {
+            this.level = level;
+            this.path = path;
+            this.processor = processor;
+            this.context = context;
+            this.successRc = successRc;
+            this.failureRc = failureRc;
+        }
+
+        @Override
+        public void process(String lNode, VoidCallback cb) {
+            String nodePath = path + "/" + lNode;
+            if ((level == 0) && isSpecialZnode(lNode)) {
+                cb.processResult(successRc, null, context);
+                return;
+            } else if (level < 3) {
+                asyncProcessLevelNodes(nodePath,
+                        new RecursiveProcessor(level + 1, nodePath, processor, context, successRc,
failureRc), cb,
+                        context, successRc, failureRc);
+            } else {
+                // process each ledger after all ledger are processed, cb will be call to
continue processing next
+                // level5 node
+                asyncProcessLedgersInSingleNode(nodePath, processor, cb, context, successRc,
failureRc);
+            }
+        }
+    }
+
+    @Override
+    public LedgerRangeIterator getLedgerRanges() {
+        return new LongHierarchicalLedgerRangeIterator();
+    }
+
+    /**
+     * Iterator through each metadata bucket with hierarchical mode
+     */
+    private class LongHierarchicalLedgerRangeIterator implements LedgerRangeIterator {
+        private List<Iterator<String>> levelNodesIter;
+        private List<String> curLevelNodes;
+
+        private boolean initialized = false;
+        private boolean iteratorDone = false;
+        private LedgerRange nextRange = null;
+
+        private LongHierarchicalLedgerRangeIterator() {
+            levelNodesIter = new ArrayList<Iterator<String>>(Collections.nCopies(4,
(Iterator<String>) null));
+            curLevelNodes = new ArrayList<String>(Collections.nCopies(4, (String) null));
+        }
+
+        private void initialize(String path, int level) throws KeeperException, InterruptedException,
IOException {
+            List<String> levelNodes = zk.getChildren(path, null);
+            Collections.sort(levelNodes);
+            if (level == 0) {
+                Iterator<String> l0NodesIter = levelNodes.iterator();
+                levelNodesIter.set(0, l0NodesIter);
+                while (l0NodesIter.hasNext()) {
+                    String curL0Node = l0NodesIter.next();
+                    if (!isSpecialZnode(curL0Node)) {
+                        curLevelNodes.set(0, curL0Node);
+                        break;
+                    }
+                }
+            } else {
+                Iterator<String> lNodesIter = levelNodes.iterator();
+                levelNodesIter.set(level, lNodesIter);
+                if (lNodesIter.hasNext()) {
+                    String curLNode = lNodesIter.next();
+                    curLevelNodes.set(level, curLNode);
+                }
+            }
+            String curLNode = curLevelNodes.get(level);
+            if (curLNode != null) {
+                if (level != 3) {
+                    String nextLevelPath = path + "/" + curLNode;
+                    initialize(nextLevelPath, level + 1);
+                } else {
+                    nextRange = getLedgerRangeByLevel(curLevelNodes);
+                    initialized = true;
+                }
+            } else {
+                iteratorDone = true;
+            }
+        }
+
+        private boolean moveToNext(int level) throws KeeperException, InterruptedException
{
+            Iterator<String> curLevelNodesIter = levelNodesIter.get(level);
+            boolean movedToNextNode = false;
+            if (level == 0) {
+                while (curLevelNodesIter.hasNext()) {
+                    String nextNode = curLevelNodesIter.next();
+                    if (isSpecialZnode(nextNode)) {
+                        continue;
+                    } else {
+                        curLevelNodes.set(level, nextNode);
+                        movedToNextNode = true;
+                        break;
+                    }
+                }
+            } else {
+                if (curLevelNodesIter.hasNext()) {
+                    String nextNode = curLevelNodesIter.next();
+                    curLevelNodes.set(level, nextNode);
+                    movedToNextNode = true;
+                } else {
+                    movedToNextNode = moveToNext(level - 1);
+                    if (movedToNextNode) {
+                        StringBuilder path = new StringBuilder(ledgerRootPath);
+                        for (int i = 0; i < level; i++) {
+                            path = path.append("/").append(curLevelNodes.get(i));
+                        }
+                        List<String> newCurLevelNodesList = zk.getChildren(path.toString(),
null);
+                        Collections.sort(newCurLevelNodesList);
+                        Iterator<String> newCurLevelNodesIter = newCurLevelNodesList.iterator();
+                        levelNodesIter.set(level, newCurLevelNodesIter);
+                        if (newCurLevelNodesIter.hasNext()) {
+                            curLevelNodes.set(level, newCurLevelNodesIter.next());
+                            movedToNextNode = true;
+                        }
+                    }
+                }
+            }
+            return movedToNextNode;
+        }
+
+        synchronized private void preload() throws IOException, KeeperException, InterruptedException
{
+            if (!iteratorDone && !initialized) {
+                initialize(ledgerRootPath, 0);
+            }
+            while (((nextRange == null) || (nextRange.size() == 0)) && !iteratorDone)
{
+                boolean movedToNextNode = moveToNext(3);
+                if (movedToNextNode) {
+                    nextRange = getLedgerRangeByLevel(curLevelNodes);
+                } else {
+                    iteratorDone = true;
+                }
+            }
+        }
+
+        @Override
+        synchronized public boolean hasNext() throws IOException {
+            try {
+                preload();
+            } catch (KeeperException ke) {
+                throw new IOException("Error preloading next range", ke);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted while preloading", ie);
+            }
+            return nextRange != null && !iteratorDone;
+        }
+
+        @Override
+        synchronized public LedgerRange next() throws IOException {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            LedgerRange r = nextRange;
+            nextRange = null;
+            return r;
+        }
+
+        LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) throws IOException
{
+            String level1 = curLevelNodes.get(0);
+            String level2 = curLevelNodes.get(1);
+            String level3 = curLevelNodes.get(2);
+            String level4 = curLevelNodes.get(3);
+
+            StringBuilder nodeBuilder = new StringBuilder();
+            nodeBuilder.append(ledgerRootPath).append("/").append(level1).append("/").append(level2).append("/")
+                    .append(level3).append("/").append(level4);
+            String nodePath = nodeBuilder.toString();
+            List<String> ledgerNodes = null;
+            try {
+                ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
+            } catch (InterruptedException e) {
+                throw new IOException("Error when get child nodes from zk", e);
+            }
+            NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("All active ledgers from ZK for hash node " + level1 + "/" + level2
+ "/" + level3 + "/"
+                        + level4 + " : " + zkActiveLedgers);
+            }
+            return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1,
level2, level3, level4), true,
+                    getEndLedgerIdByLevel(level1, level2, level3, level4), true));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManagerFactory.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManagerFactory.java
new file mode 100644
index 0000000..020bde8
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManagerFactory.java
@@ -0,0 +1,12 @@
+package org.apache.bookkeeper.meta;
+
+public class LongHierarchicalLedgerManagerFactory extends HierarchicalLedgerManagerFactory
{
+
+    public static final String NAME = "longhierarchical";
+
+    @Override
+    public LedgerManager newLedgerManager() {
+        return new LongHierarchicalLedgerManager(conf, zk);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
index bea0372..c2f658b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
@@ -41,6 +41,16 @@ public class StringUtils {
     }
 
     /**
+     * Formats ledger ID according to ZooKeeper rules
+     *
+     * @param id
+     *            znode id
+     */
+    public static String getZKStringIdForLongHierarchical(long id) {
+        return String.format("%019d", id);
+    }
+    
+    /**
      * Get the hierarchical ledger path according to the ledger id
      *
      * @param ledgerId
@@ -60,6 +70,27 @@ public class StringUtils {
     }
 
     /**
+     * Get the long hierarchical ledger path according to the ledger id
+     *
+     * @param ledgerId
+     *          ledger id
+     * @return the long hierarchical path
+     */
+    public static String getLongHierarchicalLedgerPath(long ledgerId) {
+        String ledgerIdStr = getZKStringIdForLongHierarchical(ledgerId);
+        // do 3-4-4-4-4 split
+        StringBuilder sb = new StringBuilder();
+        sb.append("/")
+          .append(ledgerIdStr.substring(0, 3)).append("/")
+          .append(ledgerIdStr.substring(3, 7)).append("/")
+          .append(ledgerIdStr.substring(7, 11)).append("/")
+          .append(ledgerIdStr.substring(11, 15)).append("/")
+          .append(LEDGER_NODE_PREFIX)
+          .append(ledgerIdStr.substring(15, 19));
+        return sb.toString();
+    }
+    
+    /**
      * Parse the hierarchical ledger path to its ledger id
      *
      * @param hierarchicalLedgerPath
@@ -78,6 +109,24 @@ public class StringUtils {
     }
 
     /**
+     * Parse the long hierarchical ledger path to its ledger id
+     *
+     * @param longHierarchicalLedgerPaths
+     * @return the ledger id
+     * @throws IOException
+     */
+    public static long stringToLongHierarchicalLedgerId(String longHierarchicalLedgerPath)
+            throws IOException {
+        String[] longHierarchicalParts = longHierarchicalLedgerPath.split("/");
+        if (longHierarchicalParts.length != 5) {
+            throw new IOException("it is not a valid hierarchical path name : " + longHierarchicalLedgerPath);
+        }
+        longHierarchicalParts[4] =
+                longHierarchicalParts[4].substring(LEDGER_NODE_PREFIX.length());
+        return stringToHierarchicalLedgerId(longHierarchicalParts);
+    }
+    
+    /**
      * Get ledger id
      *
      * @param levelNodes

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index a5fbe24..69ac921 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -20,9 +20,13 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Random;
 import java.util.Map;
 import java.util.UUID;
@@ -30,6 +34,7 @@ import java.util.HashMap;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase;
 import org.junit.Before;
@@ -37,9 +42,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
-
-
 /**
  * Testing ledger write entry cases
  */
@@ -177,6 +179,50 @@ public class BookieWriteLedgerTest extends
     }
 
     /**
+     * Verify the functionality of Advanced Ledger which accepts ledgerId as input and returns
+     * LedgerHandleAdv. LedgerHandleAdv takes entryId for addEntry, and let
+     * user manage entryId allocation.
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 60000)
+    public void testLedgerCreateAdvWithLedgerId() throws Exception {
+        // Create a ledger
+        long ledgerId = 0xABCDEF;
+        lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null);
+        for (int i = 0; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(i, entry.array());
+        }
+        // Start one more bookies
+        startNewBookie();
+
+        // Shutdown one bookie in the last ensemble and continue writing
+        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+                .getValue();
+        killBookie(ensemble.get(0));
+
+        int i = numEntriesToWrite;
+        numEntriesToWrite = numEntriesToWrite + 50;
+        for (; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(i, entry.array());
+        }
+
+        readEntries(lh, entries1);
+        lh.close();
+        bkc.deleteLedger(ledgerId);
+    }
+
+    /**
      * Verify the functionality of Ledger create which accepts customMetadata as input.
      * Also verifies that the data written is read back properly.
      *
@@ -222,6 +268,55 @@ public class BookieWriteLedgerTest extends
         }
     }
 
+    /*
+     * In a loop create/write/delete the ledger with same ledgerId through
+     * the functionality of Advanced Ledger which accepts ledgerId as input.
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 180000)
+    public void testLedgerCreateAdvWithLedgerIdInLoop() throws Exception {
+        long ledgerId;
+        int ledgerCount = 40;
+
+        List<List<byte[]>> entryList = new ArrayList<List<byte[]>>();
+        LedgerHandle[] lhArray = new LedgerHandle[ledgerCount];
+
+        List<byte[]> tmpEntry;
+        for (int lc = 0; lc < ledgerCount; lc++) {
+            tmpEntry = new ArrayList<byte[]>();
+
+            ledgerId = rng.nextLong();
+            ledgerId &= Long.MAX_VALUE;
+            if (!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
{
+                // since LongHierarchicalLedgerManager supports ledgerIds of decimal length
upto 19 digits but other
+                // LedgerManagers only upto 10 decimals
+                ledgerId %= 9999999999L;
+            }
+
+            LOG.info("Iteration: {}  LedgerId: {}", lc, ledgerId);
+            lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null);
+            lhArray[lc] = lh;
+
+            for (int i = 0; i < numEntriesToWrite; i++) {
+                ByteBuffer entry = ByteBuffer.allocate(4);
+                entry.putInt(rng.nextInt(maxInt));
+                entry.position(0);
+                tmpEntry.add(entry.array());
+                lh.addEntry(i, entry.array());
+            }
+            entryList.add(tmpEntry);
+        }
+        for (int lc = 0; lc < ledgerCount; lc++) {
+            // Read and verify
+            long lid = lhArray[lc].getId();
+            LOG.info("readEntries for lc: {} ledgerId: {} ", lc, lhArray[lc].getId());
+            readEntries(lhArray[lc], entryList.get(lc));
+            lhArray[lc].close();
+            bkc.deleteLedger(lid);
+        }
+    }
+
     /**
      * Verify asynchronous writing when few bookie failures in last ensemble.
      */
@@ -615,7 +710,7 @@ public class BookieWriteLedgerTest extends
         lh.close();
     }
 
-    private void readEntries(LedgerHandle lh, ArrayList<byte[]> entries) throws InterruptedException,
BKException {
+    private void readEntries(LedgerHandle lh, List<byte[]> entries) throws InterruptedException,
BKException {
         ls = lh.readEntries(0, numEntriesToWrite - 1);
         int index = 0;
         while (ls.hasMoreElements()) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index df74339..97e3c9f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -72,7 +73,8 @@ public class TestWatchEnsembleChange extends BookKeeperClusterTestCase {
         return Arrays.asList(new Object[][] {
                 { FlatLedgerManagerFactory.class },
                 { HierarchicalLedgerManagerFactory.class },
-                { MSLedgerManagerFactory.class }
+                { LongHierarchicalLedgerManagerFactory.class },
+                { MSLedgerManagerFactory.class },
         });
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index d5866a5..c1e8bde 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -263,7 +263,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         assertEquals("Should have cleaned first ledger" + first, (long)first, (long)cleaned.poll());
     }
 
-    @Test(timeout=60000)
+    @Test(timeout=120000)
     public void testGcLedgersNotLast() throws Exception {
         final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new
TreeSet<Long>());
         final List<Long> cleaned = new ArrayList<Long>();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index cf7fdcc..5387424 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -89,7 +89,8 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase
{
         return Arrays.asList(new Object[][] {
             { FlatLedgerManagerFactory.class },
             { HierarchicalLedgerManagerFactory.class },
-            { MSLedgerManagerFactory.class }
+            { LongHierarchicalLedgerManagerFactory.class },
+            { MSLedgerManagerFactory.class },
         });
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
index b4026b4..2c9a1f4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
@@ -49,6 +49,7 @@ public abstract class MultiLedgerManagerMultiDigestTestCase extends BookKeeperCl
         String[] ledgerManagers = {
             "org.apache.bookkeeper.meta.FlatLedgerManagerFactory",
             "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory",
+            "org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory",
             "org.apache.bookkeeper.meta.MSLedgerManagerFactory",
         };
         ArrayList<Object[]> cfgs = new ArrayList<Object[]>(ledgerManagers.length);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
index cba8be4..34a22af 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
@@ -28,8 +28,6 @@ import org.junit.runners.Parameterized.Parameters;
  *
  */
 
-import org.apache.bookkeeper.meta.LedgerManagerFactory;
-
 /**
  * Test Case run over different ledger manager.
  */
@@ -45,6 +43,7 @@ public abstract class MultiLedgerManagerTestCase extends BookKeeperClusterTestCa
         String[] ledgerManagers = new String[] {
             "org.apache.bookkeeper.meta.FlatLedgerManagerFactory",
             "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory",
+            "org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory",
             "org.apache.bookkeeper.meta.MSLedgerManagerFactory",
         };
         ArrayList<Object[]> cfgs = new ArrayList<Object[]>(ledgerManagers.length);


Mime
View raw message