zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1372808 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ bookke...
Date Tue, 14 Aug 2012 09:40:35 GMT
Author: ivank
Date: Tue Aug 14 09:40:34 2012
New Revision: 1372808

URL: http://svn.apache.org/viewvc?rev=1372808&view=rev
Log:
BOOKKEEPER-246: Recording of underreplication of ledger entries (ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/DataFormats.proto
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1372808&r1=1372807&r2=1372808&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Aug 14 09:40:34 2012
@@ -80,6 +80,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-317: Exceptions for replication (ivank via sijie)
 
+        BOOKKEEPER-246: Recording of underreplication of ledger entries (ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java?rev=1372808&r1=1372807&r2=1372808&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java Tue Aug 14 09:40:34 2012
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
 
+import org.apache.zookeeper.KeeperException;
+import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.zookeeper.ZooKeeper;
 
@@ -69,4 +71,9 @@ class FlatLedgerManagerFactory extends L
         return new FlatLedgerManager(conf, zk);
     }
 
+    @Override
+    public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
+            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
+        return new ZkLedgerUnderreplicationManager(conf, zk);
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java?rev=1372808&r1=1372807&r2=1372808&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java Tue Aug 14 09:40:34 2012
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
 
+import org.apache.zookeeper.KeeperException;
+import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.zookeeper.ZooKeeper;
 
@@ -69,4 +71,9 @@ class HierarchicalLedgerManagerFactory e
         return new HierarchicalLedgerManager(conf, zk);
     }
 
+    @Override
+    public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
+            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
+        return new ZkLedgerUnderreplicationManager(conf, zk);
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java?rev=1372808&r1=1372807&r2=1372808&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java Tue Aug 14 09:40:34 2012
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.zookeeper.KeeperException;
@@ -82,6 +83,17 @@ public abstract class LedgerManagerFacto
     public abstract ActiveLedgerManager newActiveLedgerManager();
 
     /**
+     * Return a ledger underreplication manager, which is used to 
+     * mark ledgers as unreplicated, and to retrieve a ledger which
+     * is underreplicated so that it can be rereplicated.
+     *
+     * @return ledger underreplication manager
+     * @see LedgerUnderreplicationManager
+     */
+    public abstract LedgerUnderreplicationManager newLedgerUnderreplicationManager()
+            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException;
+
+    /**
      * Create new Ledger Manager Factory.
      *
      * @param conf

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java?rev=1372808&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java Tue Aug 14 09:40:34 2012
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import org.apache.bookkeeper.replication.ReplicationException;
+
+/**
+ * Interface for marking ledgers which need to be rereplicated
+ */
+public interface LedgerUnderreplicationManager {
+    /**
+     * Mark a ledger as underreplicated. The replication should
+     * then check which fragments are underreplicated and rereplicate them
+     */
+    void markLedgerUnderreplicated(long ledgerId, String missingReplica)
+            throws ReplicationException.UnavailableException;
+
+    /**
+     * Mark a ledger as fully replicated. If the ledger is not
+     * already marked as underreplicated, this is a noop.
+     */
+    void markLedgerReplicated(long ledgerId)
+            throws ReplicationException.UnavailableException;
+
+    /**
+     * Acquire a underreplicated ledger for rereplication. The ledger
+     * should be locked, so that no other agent will receive the ledger
+     * from this call.
+     * The ledger should remain locked until either #markLedgerComplete
+     * or #releaseLedger are called.
+     * This call is blocking, so will not return until a ledger is
+     * available for rereplication.
+     */
+    long getLedgerToRereplicate()
+            throws ReplicationException.UnavailableException;
+
+    /**
+     * Release a previously acquired ledger. This allows others to acquire
+     * the ledger
+     */
+    void releaseUnderreplicatedLedger(long ledgerId)
+            throws ReplicationException.UnavailableException;
+
+    /**
+     * Release all resources held by the ledger underreplication manager
+     */
+    void close()
+            throws ReplicationException.UnavailableException;
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java?rev=1372808&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java Tue Aug 14 09:40:34 2012
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.meta;
+
+import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
+import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+import com.google.protobuf.TextFormat;
+
+import java.nio.charset.Charset;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.Collections;
+
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper implementation of underreplication manager.
+ * This is implemented in a heirarchical fashion, so it'll work with
+ * FlatLedgerManagerFactory and HierarchicalLedgerManagerFactory.
+ *
+ * Layout is:
+ * /root/underreplication/ LAYOUT
+ *                         ledgers/(hierarchicalpath)/urL(ledgerId)
+ *                         locks/(ledgerId)
+ *
+ * The hierarchical path is created by splitting the ledger into 4 2byte
+ * segments which are represented in hexidecimal.
+ * e.g. For ledger id 0xcafebeef0000feed, the path is
+ *  cafe/beef/0000/feed/
+ */
+public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationManager {
+    static final Logger LOG = LoggerFactory.getLogger(ZkLedgerUnderreplicationManager.class);
+    static final Charset UTF8 = Charset.forName("UTF-8");
+
+    static final String LAYOUT="BASIC";
+    static final int LAYOUT_VERSION=1;
+
+    private static class Lock {
+        private final String lockZNode;
+        private final int ledgerZNodeVersion;
+
+        Lock(String lockZNode, int ledgerZNodeVersion) {
+            this.lockZNode = lockZNode;
+            this.ledgerZNodeVersion = ledgerZNodeVersion;
+        }
+
+        String getLockZNode() { return lockZNode; }
+        int getLedgerZNodeVersion() { return ledgerZNodeVersion; }
+    };
+    private final Map<Long, Lock> heldLocks = new ConcurrentHashMap<Long, Lock>();
+    private final Pattern idExtractionPattern;
+
+    private final String basePath;
+    private final String urLedgerPath;
+    private final String urLockPath;
+    private final String layoutZNode;
+
+    private final ZooKeeper zkc;
+
+    public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc)
+            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
+        basePath = conf.getZkLedgersRootPath() + "/underreplication";
+        layoutZNode = basePath + "/LAYOUT";
+        urLedgerPath = basePath + "/ledgers";
+        urLockPath = basePath + "/locks";
+
+        idExtractionPattern = Pattern.compile("urL(\\d+)$");
+        this.zkc = zkc;
+
+        checkLayout();
+    }
+
+    private void createOptimistic(String path, byte[] data) throws KeeperException, InterruptedException {
+        try {
+            zkc.create(path, data,
+                       Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (KeeperException.NoNodeException nne) {
+            int lastSlash = path.lastIndexOf('/');
+            if (lastSlash <= 0) {
+                throw nne;
+            }
+            String parent = path.substring(0, lastSlash);
+            createOptimistic(parent, new byte[0]);
+            zkc.create(path, data,
+                       Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }        
+    }
+
+    private void checkLayout()
+            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
+        if (zkc.exists(basePath, false) == null) {
+            try {
+                zkc.create(basePath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } catch (KeeperException.NodeExistsException nee) {
+                // do nothing, someone each could have created it
+            }
+        }
+        while (true) {
+            if (zkc.exists(layoutZNode, false) == null) {
+                LedgerRereplicationLayoutFormat.Builder builder
+                    = LedgerRereplicationLayoutFormat.newBuilder();
+                builder.setType(LAYOUT).setVersion(LAYOUT_VERSION);
+                try {
+                    zkc.create(layoutZNode, TextFormat.printToString(builder.build()).getBytes(UTF8),
+                               Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                } catch (KeeperException.NodeExistsException nne) {
+                    // someone else managed to create it
+                    continue;
+                }
+            } else {
+                byte[] layoutData = zkc.getData(layoutZNode, false, null);
+
+                LedgerRereplicationLayoutFormat.Builder builder
+                    = LedgerRereplicationLayoutFormat.newBuilder();
+
+                try {
+                    TextFormat.merge(new String(layoutData, UTF8), builder);
+                    LedgerRereplicationLayoutFormat layout = builder.build();
+                    if (!layout.getType().equals(LAYOUT)
+                            || layout.getVersion() != LAYOUT_VERSION) {
+                        throw new ReplicationException.CompatibilityException(
+                                "Incompatible layout found (" + LAYOUT + ":" + LAYOUT_VERSION + ")");
+                    }
+                } catch (TextFormat.ParseException pe) {
+                    throw new ReplicationException.CompatibilityException(
+                            "Invalid data found", pe);
+                }
+                break;
+            }
+        }
+        if (zkc.exists(urLedgerPath, false) == null) {
+            try {
+                zkc.create(urLedgerPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } catch (KeeperException.NodeExistsException nee) {
+                // do nothing, someone each could have created it
+            }
+        }
+        if (zkc.exists(urLockPath, false) == null) {
+            try {
+                zkc.create(urLockPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } catch (KeeperException.NodeExistsException nee) {
+                // do nothing, someone each could have created it
+            }
+        }
+    }
+
+    private long getLedgerId(String path) throws NumberFormatException {
+        Matcher m = idExtractionPattern.matcher(path);
+        if (m.find()) {
+            return Long.valueOf(m.group(1));
+        } else {
+            throw new NumberFormatException("Couldn't find ledgerid in path");
+        }
+    }
+
+    private String getParentZnodePath(String base, long ledgerId) {
+        String subdir1 = String.format("%04x", ledgerId >> 48 & 0xffff);
+        String subdir2 = String.format("%04x", ledgerId >> 32 & 0xffff);
+        String subdir3 = String.format("%04x", ledgerId >> 16 & 0xffff);
+        String subdir4 = String.format("%04x", ledgerId & 0xffff);
+        
+        return String.format("%s/%s/%s/%s/%s",
+                             base, subdir1, subdir2, subdir3, subdir4);
+    }
+
+    private String getUrLedgerZnode(long ledgerId) {
+        return String.format("%s/urL%010d", getParentZnodePath(urLedgerPath, ledgerId), ledgerId);
+    }
+
+
+    @Override
+    public void markLedgerUnderreplicated(long ledgerId, String missingReplica)
+            throws ReplicationException.UnavailableException {
+        LOG.debug("markLedgerUnderreplicated {} {}", ledgerId, missingReplica);
+        try {
+            String znode = getUrLedgerZnode(ledgerId);
+            while (true) {
+                UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
+                try {
+                    builder.addReplica(missingReplica);
+                    createOptimistic(znode,
+                                     TextFormat.printToString(builder.build()).getBytes(UTF8));
+                } catch (KeeperException.NodeExistsException nee) {
+                    Stat s = zkc.exists(znode, false);
+                    if (s == null) {
+                        continue;
+                    }
+                    try {
+                        byte[] bytes = zkc.getData(znode, false, s);
+                        TextFormat.merge(new String(bytes, UTF8), builder);
+                        UnderreplicatedLedgerFormat data = builder.build();
+                        for (String r : data.getReplicaList()) {
+                            if (r.equals(missingReplica)) {
+                                break; // nothing to add
+                            }
+                        }
+                        builder.addReplica(missingReplica);
+                        zkc.setData(znode,
+                                    TextFormat.printToString(builder.build()).getBytes(UTF8),
+                                    s.getVersion());
+                    } catch (KeeperException.NoNodeException nne) {
+                        continue;
+                    } catch (KeeperException.BadVersionException bve) {
+                        continue;
+                    } catch (TextFormat.ParseException pe) {
+                        throw new ReplicationException.UnavailableException(
+                                "Invalid data found", pe);
+                    }
+                }
+                break;
+            }
+        } catch (KeeperException ke) {
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
+        }
+    }
+
+    @Override
+    public void markLedgerReplicated(long ledgerId) throws ReplicationException.UnavailableException {
+        LOG.debug("markLedgerReplicated(ledgerId={})", ledgerId);
+        try {
+            Lock l = heldLocks.get(ledgerId);
+            if (l != null) {
+                zkc.delete(getUrLedgerZnode(ledgerId), l.getLedgerZNodeVersion());
+            }
+        } catch (KeeperException.NoNodeException nne) {
+            // this is ok
+        } catch (KeeperException.BadVersionException bve) {
+            // if this is the case, some has marked the ledger
+            // for rereplication again. Leave the underreplicated
+            // znode in place, so the ledger is checked.
+        } catch (KeeperException ke) {
+            LOG.error("Error deleting underreplicated ledger znode", ke);
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
+        } finally {
+            releaseUnderreplicatedLedger(ledgerId);
+        }
+    }
+
+    private long getLedgerToRereplicateFromHierarchy(String parent, long depth, Watcher w)
+            throws KeeperException, InterruptedException {
+        if (depth == 4) {
+            List<String> children = zkc.getChildren(parent, w);
+            Collections.shuffle(children);
+
+            while (children.size() > 0) {
+                String tryChild = children.get(0);
+                try {
+                    String lockPath = urLockPath + "/" + tryChild;
+                    if (zkc.exists(lockPath, w) != null) {
+                        children.remove(tryChild);
+                        continue;
+                    }
+
+                    Stat stat = zkc.exists(parent + "/" + tryChild, false);
+                    if (stat == null) {
+                        LOG.debug("{}/{} doesn't exist", parent, tryChild);
+                        children.remove(tryChild);
+                        continue;
+                    }
+
+                    long ledgerId = getLedgerId(tryChild);
+                    zkc.create(lockPath, new byte[0],
+                               Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+                    heldLocks.put(ledgerId, new Lock(lockPath, stat.getVersion()));
+                    return ledgerId;
+                } catch (KeeperException.NodeExistsException nee) {
+                    children.remove(tryChild);
+                } catch (NumberFormatException nfe) {
+                    children.remove(tryChild);
+                }
+            }
+            return -1;
+        }
+
+        List<String> children = zkc.getChildren(parent, w);
+        Collections.shuffle(children);
+
+        while (children.size() > 0) {
+            String tryChild = children.get(0);
+            String tryPath = parent + "/" + tryChild;
+            long ledger = getLedgerToRereplicateFromHierarchy(tryPath, depth + 1, w);
+            if (ledger != -1) {
+                return ledger;
+            }
+            children.remove(tryChild);
+        }
+        return -1;
+    }
+
+    @Override
+    public long getLedgerToRereplicate() throws ReplicationException.UnavailableException {
+        LOG.debug("getLedgerToRereplicate()");
+        try {
+            while (true) {
+                final CountDownLatch changedLatch = new CountDownLatch(1);
+                Watcher w = new Watcher() {
+                        public void process(WatchedEvent e) {
+                            if (e.getType() == Watcher.Event.EventType.NodeChildrenChanged
+                                || e.getType() == Watcher.Event.EventType.NodeDeleted) {
+                                changedLatch.countDown();
+                            }
+                        }
+                    };
+                long ledger = getLedgerToRereplicateFromHierarchy(urLedgerPath, 0, w);
+                if (ledger != -1) {
+                    return ledger;
+                }
+                // nothing found, wait for a watcher to trigger
+                changedLatch.await();
+            }
+        } catch (KeeperException ke) {
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while conecting zookeeper", ie);
+        }
+    }
+
+    @Override
+    public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationException.UnavailableException {
+        LOG.debug("releaseLedger(ledgerId={})", ledgerId);
+        try {
+            Lock l = heldLocks.remove(ledgerId);
+            if (l != null) {
+                zkc.delete(l.getLockZNode(), -1);
+            }
+        } catch (KeeperException.NoNodeException nne) {
+            // this is ok
+        } catch (KeeperException ke) {
+            LOG.error("Error deleting underreplicated ledger lock", ke);
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while conecting zookeeper", ie);
+        }
+    }
+
+    @Override
+    public void close() throws ReplicationException.UnavailableException {
+        LOG.debug("close()");
+        try {
+            for (Map.Entry<Long, Lock> e : heldLocks.entrySet()) {
+                zkc.delete(e.getValue().getLockZNode(), -1);
+            }
+        } catch (KeeperException.NoNodeException nne) {
+            // this is ok
+        } catch (KeeperException ke) {
+            LOG.error("Error deleting underreplicated ledger lock", ke);
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while conecting zookeeper", ie);
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java?rev=1372808&r1=1372807&r2=1372808&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java Tue Aug 14 09:40:34 2012
@@ -1624,6 +1624,843 @@ public final class DataFormats {
     // @@protoc_insertion_point(class_scope:LedgerMetadataFormat)
   }
   
+  public interface LedgerRereplicationLayoutFormatOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+    
+    // required string type = 1;
+    boolean hasType();
+    String getType();
+    
+    // required int32 version = 2;
+    boolean hasVersion();
+    int getVersion();
+  }
+  public static final class LedgerRereplicationLayoutFormat extends
+      com.google.protobuf.GeneratedMessage
+      implements LedgerRereplicationLayoutFormatOrBuilder {
+    // Use LedgerRereplicationLayoutFormat.newBuilder() to construct.
+    private LedgerRereplicationLayoutFormat(Builder builder) {
+      super(builder);
+    }
+    private LedgerRereplicationLayoutFormat(boolean noInit) {}
+    
+    private static final LedgerRereplicationLayoutFormat defaultInstance;
+    public static LedgerRereplicationLayoutFormat getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public LedgerRereplicationLayoutFormat getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.bookkeeper.proto.DataFormats.internal_static_LedgerRereplicationLayoutFormat_descriptor;
+    }
+    
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.bookkeeper.proto.DataFormats.internal_static_LedgerRereplicationLayoutFormat_fieldAccessorTable;
+    }
+    
+    private int bitField0_;
+    // required string type = 1;
+    public static final int TYPE_FIELD_NUMBER = 1;
+    private java.lang.Object type_;
+    public boolean hasType() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public String getType() {
+      java.lang.Object ref = type_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          type_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getTypeBytes() {
+      java.lang.Object ref = type_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        type_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // required int32 version = 2;
+    public static final int VERSION_FIELD_NUMBER = 2;
+    private int version_;
+    public boolean hasVersion() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public int getVersion() {
+      return version_;
+    }
+    
+    private void initFields() {
+      type_ = "";
+      version_ = 0;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasType()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasVersion()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getTypeBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeInt32(2, version_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getTypeBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(2, version_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormatOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.bookkeeper.proto.DataFormats.internal_static_LedgerRereplicationLayoutFormat_descriptor;
+      }
+      
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.bookkeeper.proto.DataFormats.internal_static_LedgerRereplicationLayoutFormat_fieldAccessorTable;
+      }
+      
+      // Construct using org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private Builder(BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        type_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        version_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat.getDescriptor();
+      }
+      
+      public org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat getDefaultInstanceForType() {
+        return org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat.getDefaultInstance();
+      }
+      
+      public org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat build() {
+        org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat buildPartial() {
+        org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat result = new org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.type_ = type_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.version_ = version_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat) {
+          return mergeFrom((org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+      
+      public Builder mergeFrom(org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat other) {
+        if (other == org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat.getDefaultInstance()) return this;
+        if (other.hasType()) {
+          setType(other.getType());
+        }
+        if (other.hasVersion()) {
+          setVersion(other.getVersion());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasType()) {
+          
+          return false;
+        }
+        if (!hasVersion()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder(
+            this.getUnknownFields());
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              this.setUnknownFields(unknownFields.build());
+              onChanged();
+              return this;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                this.setUnknownFields(unknownFields.build());
+                onChanged();
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              type_ = input.readBytes();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              version_ = input.readInt32();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required string type = 1;
+      private java.lang.Object type_ = "";
+      public boolean hasType() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getType() {
+        java.lang.Object ref = type_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          type_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setType(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        type_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearType() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        type_ = getDefaultInstance().getType();
+        onChanged();
+        return this;
+      }
+      void setType(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000001;
+        type_ = value;
+        onChanged();
+      }
+      
+      // required int32 version = 2;
+      private int version_ ;
+      public boolean hasVersion() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public int getVersion() {
+        return version_;
+      }
+      public Builder setVersion(int value) {
+        bitField0_ |= 0x00000002;
+        version_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearVersion() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        version_ = 0;
+        onChanged();
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:LedgerRereplicationLayoutFormat)
+    }
+    
+    static {
+      defaultInstance = new LedgerRereplicationLayoutFormat(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:LedgerRereplicationLayoutFormat)
+  }
+  
+  public interface UnderreplicatedLedgerFormatOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+    
+    // repeated string replica = 1;
+    java.util.List<String> getReplicaList();
+    int getReplicaCount();
+    String getReplica(int index);
+  }
+  public static final class UnderreplicatedLedgerFormat extends
+      com.google.protobuf.GeneratedMessage
+      implements UnderreplicatedLedgerFormatOrBuilder {
+    // Use UnderreplicatedLedgerFormat.newBuilder() to construct.
+    private UnderreplicatedLedgerFormat(Builder builder) {
+      super(builder);
+    }
+    private UnderreplicatedLedgerFormat(boolean noInit) {}
+    
+    private static final UnderreplicatedLedgerFormat defaultInstance;
+    public static UnderreplicatedLedgerFormat getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public UnderreplicatedLedgerFormat getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.bookkeeper.proto.DataFormats.internal_static_UnderreplicatedLedgerFormat_descriptor;
+    }
+    
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.bookkeeper.proto.DataFormats.internal_static_UnderreplicatedLedgerFormat_fieldAccessorTable;
+    }
+    
+    // repeated string replica = 1;
+    public static final int REPLICA_FIELD_NUMBER = 1;
+    private com.google.protobuf.LazyStringList replica_;
+    public java.util.List<String>
+        getReplicaList() {
+      return replica_;
+    }
+    public int getReplicaCount() {
+      return replica_.size();
+    }
+    public String getReplica(int index) {
+      return replica_.get(index);
+    }
+    
+    private void initFields() {
+      replica_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      for (int i = 0; i < replica_.size(); i++) {
+        output.writeBytes(1, replica_.getByteString(i));
+      }
+      getUnknownFields().writeTo(output);
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      {
+        int dataSize = 0;
+        for (int i = 0; i < replica_.size(); i++) {
+          dataSize += com.google.protobuf.CodedOutputStream
+            .computeBytesSizeNoTag(replica_.getByteString(i));
+        }
+        size += dataSize;
+        size += 1 * getReplicaList().size();
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormatOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.bookkeeper.proto.DataFormats.internal_static_UnderreplicatedLedgerFormat_descriptor;
+      }
+      
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.bookkeeper.proto.DataFormats.internal_static_UnderreplicatedLedgerFormat_fieldAccessorTable;
+      }
+      
+      // Construct using org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private Builder(BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        replica_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat.getDescriptor();
+      }
+      
+      public org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat getDefaultInstanceForType() {
+        return org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat.getDefaultInstance();
+      }
+      
+      public org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat build() {
+        org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat buildPartial() {
+        org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat result = new org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat(this);
+        int from_bitField0_ = bitField0_;
+        if (((bitField0_ & 0x00000001) == 0x00000001)) {
+          replica_ = new com.google.protobuf.UnmodifiableLazyStringList(
+              replica_);
+          bitField0_ = (bitField0_ & ~0x00000001);
+        }
+        result.replica_ = replica_;
+        onBuilt();
+        return result;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat) {
+          return mergeFrom((org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+      
+      public Builder mergeFrom(org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat other) {
+        if (other == org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat.getDefaultInstance()) return this;
+        if (!other.replica_.isEmpty()) {
+          if (replica_.isEmpty()) {
+            replica_ = other.replica_;
+            bitField0_ = (bitField0_ & ~0x00000001);
+          } else {
+            ensureReplicaIsMutable();
+            replica_.addAll(other.replica_);
+          }
+          onChanged();
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder(
+            this.getUnknownFields());
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              this.setUnknownFields(unknownFields.build());
+              onChanged();
+              return this;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                this.setUnknownFields(unknownFields.build());
+                onChanged();
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              ensureReplicaIsMutable();
+              replica_.add(input.readBytes());
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // repeated string replica = 1;
+      private com.google.protobuf.LazyStringList replica_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+      private void ensureReplicaIsMutable() {
+        if (!((bitField0_ & 0x00000001) == 0x00000001)) {
+          replica_ = new com.google.protobuf.LazyStringArrayList(replica_);
+          bitField0_ |= 0x00000001;
+         }
+      }
+      public java.util.List<String>
+          getReplicaList() {
+        return java.util.Collections.unmodifiableList(replica_);
+      }
+      public int getReplicaCount() {
+        return replica_.size();
+      }
+      public String getReplica(int index) {
+        return replica_.get(index);
+      }
+      public Builder setReplica(
+          int index, String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureReplicaIsMutable();
+        replica_.set(index, value);
+        onChanged();
+        return this;
+      }
+      public Builder addReplica(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureReplicaIsMutable();
+        replica_.add(value);
+        onChanged();
+        return this;
+      }
+      public Builder addAllReplica(
+          java.lang.Iterable<String> values) {
+        ensureReplicaIsMutable();
+        super.addAll(values, replica_);
+        onChanged();
+        return this;
+      }
+      public Builder clearReplica() {
+        replica_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        onChanged();
+        return this;
+      }
+      void addReplica(com.google.protobuf.ByteString value) {
+        ensureReplicaIsMutable();
+        replica_.add(value);
+        onChanged();
+      }
+      
+      // @@protoc_insertion_point(builder_scope:UnderreplicatedLedgerFormat)
+    }
+    
+    static {
+      defaultInstance = new UnderreplicatedLedgerFormat(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:UnderreplicatedLedgerFormat)
+  }
+  
   private static com.google.protobuf.Descriptors.Descriptor
     internal_static_LedgerMetadataFormat_descriptor;
   private static
@@ -1634,6 +2471,16 @@ public final class DataFormats {
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_LedgerMetadataFormat_Segment_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_LedgerRereplicationLayoutFormat_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_LedgerRereplicationLayoutFormat_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_UnderreplicatedLedgerFormat_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_UnderreplicatedLedgerFormat_fieldAccessorTable;
   
   public static com.google.protobuf.Descriptors.FileDescriptor
       getDescriptor() {
@@ -1654,8 +2501,10 @@ public final class DataFormats {
       "\026\n\016ensembleMember\030\001 \003(\t\022\024\n\014firstEntryId\030" +
       "\002 \002(\003\".\n\005State\022\010\n\004OPEN\020\001\022\017\n\013IN_RECOVERY\020",
       "\002\022\n\n\006CLOSED\020\003\"!\n\nDigestType\022\t\n\005CRC32\020\001\022\010" +
-      "\n\004HMAC\020\002B\037\n\033org.apache.bookkeeper.protoH" +
-      "\001"
+      "\n\004HMAC\020\002\"@\n\037LedgerRereplicationLayoutFor" +
+      "mat\022\014\n\004type\030\001 \002(\t\022\017\n\007version\030\002 \002(\005\".\n\033Un" +
+      "derreplicatedLedgerFormat\022\017\n\007replica\030\001 \003" +
+      "(\tB\037\n\033org.apache.bookkeeper.protoH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1678,6 +2527,22 @@ public final class DataFormats {
               new java.lang.String[] { "EnsembleMember", "FirstEntryId", },
               org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.Segment.class,
               org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.Segment.Builder.class);
+          internal_static_LedgerRereplicationLayoutFormat_descriptor =
+            getDescriptor().getMessageTypes().get(1);
+          internal_static_LedgerRereplicationLayoutFormat_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_LedgerRereplicationLayoutFormat_descriptor,
+              new java.lang.String[] { "Type", "Version", },
+              org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat.class,
+              org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat.Builder.class);
+          internal_static_UnderreplicatedLedgerFormat_descriptor =
+            getDescriptor().getMessageTypes().get(2);
+          internal_static_UnderreplicatedLedgerFormat_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_UnderreplicatedLedgerFormat_descriptor,
+              new java.lang.String[] { "Replica", },
+              org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat.class,
+              org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat.Builder.class);
           return null;
         }
       };

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java?rev=1372808&r1=1372807&r2=1372808&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java Tue Aug 14 09:40:34 2012
@@ -21,7 +21,7 @@ package org.apache.bookkeeper.replicatio
 /**
  * Exceptions for use within the replication service
  */
-abstract class ReplicationException extends Exception {
+public abstract class ReplicationException extends Exception {
     protected ReplicationException(String message, Throwable cause) {
         super(message, cause);
     }
@@ -33,14 +33,14 @@ abstract class ReplicationException exte
     /**
      * The replication service has become unavailable
      */
-    static class UnavailableException extends ReplicationException {
+    public static class UnavailableException extends ReplicationException {
         private static final long serialVersionUID = 31872209L;
 
-        UnavailableException(String message, Throwable cause) {
+        public UnavailableException(String message, Throwable cause) {
             super(message, cause);
         }
 
-        UnavailableException(String message) {
+        public UnavailableException(String message) {
             super(message);
         }
     }
@@ -49,14 +49,14 @@ abstract class ReplicationException exte
      * Compatibility error. This version of the code, doesn't know how to
      * deal with the metadata it has found.
      */
-    static class CompatibilityException extends ReplicationException {
+    public static class CompatibilityException extends ReplicationException {
         private static final long serialVersionUID = 98551903L;
 
-        CompatibilityException(String message, Throwable cause) {
+        public CompatibilityException(String message, Throwable cause) {
             super(message, cause);
         }
 
-        CompatibilityException(String message) {
+        public CompatibilityException(String message) {
             super(message);
         }
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/DataFormats.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/DataFormats.proto?rev=1372808&r1=1372807&r2=1372808&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/DataFormats.proto (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/DataFormats.proto Tue Aug 14 09:40:34 2012
@@ -46,4 +46,13 @@ message LedgerMetadataFormat {
     }
     optional DigestType digestType = 7;
     optional bytes password = 8;
+}
+
+message LedgerRereplicationLayoutFormat {
+    required string type = 1;
+    required int32 version = 2;
+}
+ 
+message UnderreplicatedLedgerFormat {
+    repeated string replica = 1;
 }
\ No newline at end of file

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java?rev=1372808&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java Tue Aug 14 09:40:34 2012
@@ -0,0 +1,359 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.replication;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+import static org.junit.Assert.*;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Test the zookeeper implementation of the ledger replication manager
+ */
+public class TestLedgerUnderreplicationManager {
+    static final Logger LOG = LoggerFactory.getLogger(TestLedgerUnderreplicationManager.class);
+
+    ZooKeeperUtil zkUtil = null;
+
+    ServerConfiguration conf = null;
+    ExecutorService executor = null;
+    LedgerManagerFactory lmf1 = null;
+    LedgerManagerFactory lmf2 = null;
+    ZooKeeper zkc1 = null;
+    ZooKeeper zkc2 = null;
+
+    @Before
+    public void setupZooKeeper() throws Exception {
+        zkUtil = new ZooKeeperUtil();
+        zkUtil.startServer();
+
+        conf = new ServerConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
+
+        executor = Executors.newCachedThreadPool();
+
+        zkc1 = zkUtil.getNewZooKeeperClient();
+        zkc2 = zkUtil.getNewZooKeeperClient();
+        lmf1 = LedgerManagerFactory.newLedgerManagerFactory(conf, zkc1);
+        lmf2 = LedgerManagerFactory.newLedgerManagerFactory(conf, zkc2);
+    }
+
+    @After
+    public void teardownZooKeeper() throws Exception {
+        if (zkUtil != null) {
+            zkUtil.killServer();
+            zkUtil = null;
+        }
+        if (executor != null) {
+            executor = null;
+        }
+        if (zkc1 != null) {
+            zkc1.close();
+            zkc1 = null;
+        }
+        if (zkc2 != null) {
+            zkc2.close();
+            zkc2 = null;
+        }
+        if (lmf1 != null) {
+            lmf1.uninitialize();
+            lmf1 = null;
+        }
+        if (lmf2 != null) {
+            lmf2.uninitialize();
+            lmf2 = null;
+        }
+    }
+
+    private Future<Long> getLedgerToReplicate(final LedgerUnderreplicationManager m) {
+        return executor.submit(new Callable<Long>() {
+                public Long call() {
+                    try {
+                        return m.getLedgerToRereplicate();
+                    } catch (Exception e) {
+                        LOG.error("Error getting ledger id", e);
+                        return -1L;
+                    }
+                }
+            });
+    }
+
+    /**
+     * Test basic interactions with the ledger underreplication
+     * manager.
+     * Mark some ledgers as underreplicated.
+     * Ensure that getLedgerToReplicate will block until it a ledger
+     * becomes available.
+     */
+    @Test
+    public void testBasicInteraction() throws Exception {
+        Set<Long> ledgers = new HashSet<Long>();
+        ledgers.add(0xdeadbeefL);
+        ledgers.add(0xbeefcafeL);
+        ledgers.add(0xffffbeefL);
+        ledgers.add(0xfacebeefL);
+        String missingReplica = "localhost:3181";
+
+        int count = 0;
+        LedgerUnderreplicationManager m = lmf1.newLedgerUnderreplicationManager();
+        Iterator<Long> iter = ledgers.iterator();
+        while (iter.hasNext()) {
+            m.markLedgerUnderreplicated(iter.next(), missingReplica);
+            count++;
+        }
+
+        List<Future<Long>> futures = new ArrayList<Future<Long>>();
+        for (int i = 0; i < count; i++) {
+            futures.add(getLedgerToReplicate(m));
+        }
+
+        for (Future<Long> f : futures) {
+            Long l = f.get(5, TimeUnit.SECONDS);
+            assertTrue(ledgers.remove(l));
+        }
+
+        Future f = getLedgerToReplicate(m);
+        try {
+            f.get(5, TimeUnit.SECONDS);
+            fail("Shouldn't be able to find a ledger to replicate");
+        } catch (TimeoutException te) {
+            // correct behaviour
+        }
+        Long newl = 0xfefefefefefeL;
+        m.markLedgerUnderreplicated(newl, missingReplica);
+        assertEquals("Should have got the one just added", newl, f.get(5, TimeUnit.SECONDS));
+    }
+
+    /**
+     * Test locking for ledger unreplication manager.
+     * If there's only one ledger marked for rereplication,
+     * and one client has it, it should be locked; another
+     * client shouldn't be able to get it. If the first client dies
+     * however, the second client should be able to get it.
+     */
+    @Test
+    public void testLocking() throws Exception {
+        String missingReplica = "localhost:3181";
+
+        LedgerUnderreplicationManager m1 = lmf1.newLedgerUnderreplicationManager();
+        LedgerUnderreplicationManager m2 = lmf2.newLedgerUnderreplicationManager();
+
+        Long ledger = 0xfeadeefdacL;
+        m1.markLedgerUnderreplicated(ledger, missingReplica);
+        Future<Long> f = getLedgerToReplicate(m1);
+        Long l = f.get(5, TimeUnit.SECONDS);
+        assertEquals("Should be the ledger I just marked", ledger, l);
+
+        f = getLedgerToReplicate(m2);
+        try {
+            f.get(5, TimeUnit.SECONDS);
+            fail("Shouldn't be able to find a ledger to replicate");
+        } catch (TimeoutException te) {
+            // correct behaviour
+        }
+        zkc1.close(); // should kill the lock
+        zkc1 = null;
+
+        l = f.get(5, TimeUnit.SECONDS);
+        assertEquals("Should be the ledger I marked", ledger, l);
+    }
+
+
+    /**
+     * Test that when a ledger has been marked as replicated, it
+     * will not be offered to anther client.
+     * This test checked that by marking two ledgers, and acquiring
+     * them on a single client. It marks one as replicated and then
+     * the client is killed. We then check that another client can
+     * acquire a ledger, and that it's not the one that was previously
+     * marked as replicated.
+     */
+    @Test
+    public void testMarkingAsReplicated() throws Exception {
+        String missingReplica = "localhost:3181";
+
+        LedgerUnderreplicationManager m1 = lmf1.newLedgerUnderreplicationManager();
+        LedgerUnderreplicationManager m2 = lmf2.newLedgerUnderreplicationManager();
+
+        Long ledgerA = 0xfeadeefdacL;
+        Long ledgerB = 0xdefadebL;
+        m1.markLedgerUnderreplicated(ledgerA, missingReplica);
+        m1.markLedgerUnderreplicated(ledgerB, missingReplica);
+
+        Future<Long> fA = getLedgerToReplicate(m1);
+        Future<Long> fB = getLedgerToReplicate(m1);
+
+        Long lA = fA.get(5, TimeUnit.SECONDS);
+        Long lB = fB.get(5, TimeUnit.SECONDS);
+
+        assertTrue("Should be the ledgers I just marked",
+                   (lA.equals(ledgerA) && lB.equals(ledgerB))
+                   || (lA.equals(ledgerB) && lB.equals(ledgerA)));
+
+        Future<Long> f = getLedgerToReplicate(m2);
+        try {
+            f.get(5, TimeUnit.SECONDS);
+            fail("Shouldn't be able to find a ledger to replicate");
+        } catch (TimeoutException te) {
+            // correct behaviour
+        }
+        m1.markLedgerReplicated(lA);
+        zkc1.close(); // should kill the lock
+        zkc1 = null;
+
+        Long l = f.get(5, TimeUnit.SECONDS);
+        assertEquals("Should be the ledger I marked", lB, l);
+    }
+
+    /**
+     * Test releasing of a ledger
+     * A ledger is released when a client decides it does not want
+     * to replicate it (or cannot at the moment).
+     * When a client releases a previously acquired ledger, another
+     * client should then be able to acquire it.
+     */
+    @Test
+    public void testRelease() throws Exception {
+        String missingReplica = "localhost:3181";
+
+        LedgerUnderreplicationManager m1 = lmf1.newLedgerUnderreplicationManager();
+        LedgerUnderreplicationManager m2 = lmf2.newLedgerUnderreplicationManager();
+
+        Long ledgerA = 0xfeadeefdacL;
+        Long ledgerB = 0xdefadebL;
+        m1.markLedgerUnderreplicated(ledgerA, missingReplica);
+        m1.markLedgerUnderreplicated(ledgerB, missingReplica);
+
+        Future<Long> fA = getLedgerToReplicate(m1);
+        Future<Long> fB = getLedgerToReplicate(m1);
+
+        Long lA = fA.get(5, TimeUnit.SECONDS);
+        Long lB = fB.get(5, TimeUnit.SECONDS);
+
+        assertTrue("Should be the ledgers I just marked",
+                   (lA.equals(ledgerA) && lB.equals(ledgerB))
+                   || (lA.equals(ledgerB) && lB.equals(ledgerA)));
+
+        Future<Long> f = getLedgerToReplicate(m2);
+        try {
+            f.get(5, TimeUnit.SECONDS);
+            fail("Shouldn't be able to find a ledger to replicate");
+        } catch (TimeoutException te) {
+            // correct behaviour
+        }
+        m1.markLedgerReplicated(lA);
+        m1.releaseUnderreplicatedLedger(lB);
+
+        Long l = f.get(5, TimeUnit.SECONDS);
+        assertEquals("Should be the ledger I marked", lB, l);
+    }
+
+    /**
+     * Test that when a failure occurs on a ledger, while the ledger
+     * is already being rereplicated, the ledger will still be in the
+     * under replicated ledger list when first rereplicating client marks
+     * it as replicated.
+     */
+    @Test
+    public void testManyFailures() throws Exception {
+        String missingReplica1 = "localhost:3181";
+        String missingReplica2 = "localhost:3182";
+
+        LedgerUnderreplicationManager m1 = lmf1.newLedgerUnderreplicationManager();
+        LedgerUnderreplicationManager m2 = lmf2.newLedgerUnderreplicationManager();
+
+        Long ledgerA = 0xfeadeefdacL;
+        m1.markLedgerUnderreplicated(ledgerA, missingReplica1);
+
+        Future<Long> fA = getLedgerToReplicate(m1);
+        Long lA = fA.get(5, TimeUnit.SECONDS);
+
+        m1.markLedgerUnderreplicated(ledgerA, missingReplica2);
+
+        assertEquals("Should be the ledger I just marked",
+                     lA, ledgerA);
+        m1.markLedgerReplicated(lA);
+
+        Future<Long> f = getLedgerToReplicate(m1);
+        lA = f.get(5, TimeUnit.SECONDS);
+        assertEquals("Should be the ledger I had marked previously",
+                     lA, ledgerA);
+    }
+
+    /**
+     * Test that when a ledger is marked as underreplicated with
+     * the same missing replica twice, only marking as replicated
+     * will be enough to remove it from the list.
+     */
+    @Test
+    public void test2reportSame() throws Exception {
+        String missingReplica1 = "localhost:3181";
+
+        LedgerUnderreplicationManager m1 = lmf1.newLedgerUnderreplicationManager();
+        LedgerUnderreplicationManager m2 = lmf2.newLedgerUnderreplicationManager();
+
+        Long ledgerA = 0xfeadeefdacL;
+        m1.markLedgerUnderreplicated(ledgerA, missingReplica1);
+        m2.markLedgerUnderreplicated(ledgerA, missingReplica1);
+
+        Future<Long> fA = getLedgerToReplicate(m1);
+        Long lA = fA.get(5, TimeUnit.SECONDS);
+
+        assertEquals("Should be the ledger I just marked",
+                     lA, ledgerA);
+        m1.markLedgerReplicated(lA);
+
+        Future<Long> f = getLedgerToReplicate(m2);
+        try {
+            f.get(5, TimeUnit.SECONDS);
+            fail("Shouldn't be able to find a ledger to replicate");
+        } catch (TimeoutException te) {
+            // correct behaviour
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java?rev=1372808&r1=1372807&r2=1372808&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java Tue Aug 14 09:40:34 2012
@@ -72,6 +72,25 @@ public class ZooKeeperUtil {
         return zkc;
     }
 
+    public ZooKeeper getNewZooKeeperClient() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        ZooKeeper zkc = new ZooKeeper(getZooKeeperConnectString(), 10000,
+                new Watcher() {
+                    @Override
+                    public void process(WatchedEvent event) {
+                        // handle session disconnects and expires
+                        if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
+                            latch.countDown();
+                        }
+                    }
+                });
+        if (!latch.await(10000, TimeUnit.MILLISECONDS)) {
+            zkc.close();
+            fail("Could not connect to zookeeper server");
+        }
+        return zkc;
+    }
+
     public String getZooKeeperConnectString() {
         return connectString;
     }
@@ -97,20 +116,7 @@ public class ZooKeeperUtil {
         // create a zookeeper client
         LOG.debug("Instantiate ZK Client");
         final CountDownLatch latch = new CountDownLatch(1);
-        zkc = new ZooKeeper(getZooKeeperConnectString(), 10000,
-                            new Watcher() {
-                                @Override
-                                public void process(WatchedEvent event) {
-                                    // handle session disconnects and expires
-                                    if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
-                                        latch.countDown();
-                                    }
-                                }
-                            });
-        if (!latch.await(10000, TimeUnit.MILLISECONDS)) {
-            zkc.close();
-            fail("Could not connect to zookeeper server");
-        }
+        zkc = getNewZooKeeperClient();
 
         // initialize the zk client with values
         zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);



Mime
View raw message