zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1599492 - in /zookeeper/bookkeeper/branches/branch-4.2: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/
Date Tue, 03 Jun 2014 11:56:25 GMT
Author: ivank
Date: Tue Jun  3 11:56:25 2014
New Revision: 1599492

URL: http://svn.apache.org/r1599492
Log:
BOOKKEEPER-746: 5 new shell commands. List ledgers, list metadata, list underreplicated, show
auditor and simpletest (ivank)

Modified:
    zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
    zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
    zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
    zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java

Modified: zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt?rev=1599492&r1=1599491&r2=1599492&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt Tue Jun  3 11:56:25 2014
@@ -40,6 +40,8 @@ Release 4.2.3 - 2013-12-04
 
         BOOKKEEPER-747: Implement register/unregister LedgerMetadataListener in MSLedgerManagerFactory
(fpj via sijie)
 
+        BOOKKEEPER-746: 5 new shell commands. List ledgers, list metadata, list underreplicated,
show auditor and simpletest (ivank)
+
 Release 4.2.2 - 2013-10-02
 
   Backward compatible changes:

Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1599492&r1=1599491&r2=1599492&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
Tue Jun  3 11:56:25 2014
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+
 import java.nio.ByteBuffer;
 import java.util.Formatter;
 import java.util.HashMap;
@@ -32,17 +33,35 @@ import org.apache.bookkeeper.meta.Ledger
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
 
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.util.EntryFormatter;
 import org.apache.bookkeeper.util.Tool;
 import org.apache.bookkeeper.util.ZkUtils;
 
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import static com.google.common.base.Charsets.UTF_8;
+
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.PropertiesConfiguration;
@@ -69,6 +88,11 @@ public class BookieShell implements Tool
     static final String CMD_BOOKIEFORMAT = "bookieformat";
     static final String CMD_RECOVER = "recover";
     static final String CMD_LEDGER = "ledger";
+    static final String CMD_LISTLEDGERS = "listledgers";
+    static final String CMD_LEDGERMETADATA = "ledgermetadata";
+    static final String CMD_LISTUNDERREPLICATED = "listunderreplicated";
+    static final String CMD_WHOISAUDITOR = "whoisauditor";
+    static final String CMD_SIMPLETEST = "simpletest";
     static final String CMD_READLOG = "readlog";
     static final String CMD_READJOURNAL = "readjournal";
     static final String CMD_LASTMARK = "lastmark";
@@ -151,7 +175,7 @@ public class BookieShell implements Tool
 
         @Override
         String getUsage() {
-            return "metaformat [-nonInteractive] [-force]";
+            return "metaformat   [-nonInteractive] [-force]";
         }
 
         @Override
@@ -229,7 +253,7 @@ public class BookieShell implements Tool
 
         @Override
         String getUsage() {
-            return "recover <bookieSrc> [bookieDest]";
+            return "recover      <bookieSrc> [bookieDest]";
         }
 
         @Override
@@ -326,7 +350,272 @@ public class BookieShell implements Tool
 
         @Override
         String getUsage() {
-            return "ledger [-m] <ledger_id>";
+            return "ledger       [-m] <ledger_id>";
+        }
+
+        @Override
+        Options getOptions() {
+            return lOpts;
+        }
+    }
+
+    /**
+     * Command for listing underreplicated ledgers
+     */
+    class ListUnderreplicatedCmd extends MyCommand {
+        Options opts = new Options();
+
+        public ListUnderreplicatedCmd() {
+            super(CMD_LISTUNDERREPLICATED);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "List ledgers marked as underreplicated";
+        }
+
+        @Override
+        String getUsage() {
+            return "listunderreplicated";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            ZooKeeper zk = null;
+            try {
+                ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
+                zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+                LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf,
zk);
+                LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
+                Iterator<Long> iter = underreplicationManager.listLedgersToRereplicate();
+                while (iter.hasNext()) {
+                    System.out.println(iter.next());
+                }
+            } finally {
+                if (zk != null) {
+                    zk.close();
+                }
+            }
+
+            return 0;
+        }
+    }
+
+    final static int LIST_BATCH_SIZE = 1000;
+    /**
+     * Command to list all ledgers in the cluster
+     */
+    class ListLedgersCmd extends MyCommand {
+        Options lOpts = new Options();
+
+        ListLedgersCmd() {
+            super(CMD_LISTLEDGERS);
+            lOpts.addOption("m", "meta", false, "Print metadata");
+
+        }
+
+        @Override
+        public int runCmd(CommandLine cmdLine) throws Exception {
+            ZooKeeper zk = null;
+            try {
+                ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
+                zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+                LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf,
zk);
+                LedgerManager m = mFactory.newLedgerManager();
+                LedgerRangeIterator iter = m.getLedgerRanges();
+                if (cmdLine.hasOption("m")) {
+                    List<ReadMetadataCallback> futures
+                        = new ArrayList<ReadMetadataCallback>(LIST_BATCH_SIZE);
+                    while (iter.hasNext()) {
+                        LedgerRange r = iter.next();
+                        for (Long lid : r.getLedgers()) {
+                            ReadMetadataCallback cb = new ReadMetadataCallback(lid);
+                            m.readLedgerMetadata(lid, cb);
+                            futures.add(cb);
+                        }
+                        if (futures.size() >= LIST_BATCH_SIZE) {
+                            while (futures.size() > 0) {
+                                ReadMetadataCallback cb = futures.remove(0);
+                                printLedgerMetadata(cb);
+                            }
+                        }
+                    }
+                    while (futures.size() > 0) {
+                        ReadMetadataCallback cb = futures.remove(0);
+                        printLedgerMetadata(cb);
+                    }
+                } else {
+                    while (iter.hasNext()) {
+                        LedgerRange r = iter.next();
+                        for (Long lid : r.getLedgers()) {
+                            System.out.println(Long.toString(lid));
+                        }
+                    }
+                }
+            } finally {
+                if (zk != null) {
+                    zk.close();
+                }
+            }
+
+            return 0;
+        }
+
+        @Override
+        String getDescription() {
+            return "List all ledgers on the cluster (this may take a long time)";
+        }
+
+        @Override
+        String getUsage() {
+            return "listledgers  [-meta]";
+        }
+
+        @Override
+        Options getOptions() {
+            return lOpts;
+        }
+    }
+
+    static void printLedgerMetadata(ReadMetadataCallback cb) throws Exception {
+        LedgerMetadata md = cb.get();
+        System.out.println("ledgerID: " + cb.getLedgerId());
+        System.out.println(new String(md.serialize(), UTF_8));
+    }
+
+    static class ReadMetadataCallback extends AbstractFuture<LedgerMetadata>
+        implements GenericCallback<LedgerMetadata> {
+        final long ledgerId;
+
+        ReadMetadataCallback(long ledgerId) {
+            this.ledgerId = ledgerId;
+        }
+
+        long getLedgerId() {
+            return ledgerId;
+        }
+
+        public void operationComplete(int rc, LedgerMetadata result) {
+            if (rc != 0) {
+                setException(BKException.create(rc));
+            } else {
+                set(result);
+            }
+        }
+    }
+
+    /**
+     * Print the metadata for a ledger
+     */
+    class LedgerMetadataCmd extends MyCommand {
+        Options lOpts = new Options();
+
+        LedgerMetadataCmd() {
+            super(CMD_LEDGERMETADATA);
+            lOpts.addOption("l", "ledgerid", true, "Ledger ID");
+        }
+
+        @Override
+        public int runCmd(CommandLine cmdLine) throws Exception {
+            final long lid = getOptionLongValue(cmdLine, "ledgerid", -1);
+            if (lid == -1) {
+                System.err.println("Must specify a ledger id");
+                return -1;
+            }
+
+            ZooKeeper zk = null;
+            try {
+                ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
+                zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+                LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf,
zk);
+                LedgerManager m = mFactory.newLedgerManager();
+                ReadMetadataCallback cb = new ReadMetadataCallback(lid);
+                m.readLedgerMetadata(lid, cb);
+                printLedgerMetadata(cb);
+            } finally {
+                if (zk != null) {
+                    zk.close();
+                }
+            }
+
+            return 0;
+        }
+
+        @Override
+        String getDescription() {
+            return "Print the metadata for a ledger";
+        }
+
+        @Override
+        String getUsage() {
+            return "ledgermetadata -ledgerid <ledgerid>";
+        }
+
+        @Override
+        Options getOptions() {
+            return lOpts;
+        }
+    }
+
+    /**
+     * Simple test to create a ledger and write to it
+     */
+    class SimpleTestCmd extends MyCommand {
+        Options lOpts = new Options();
+
+        SimpleTestCmd() {
+            super(CMD_SIMPLETEST);
+            lOpts.addOption("e", "ensemble", true, "Ensemble size (default 3)");
+            lOpts.addOption("w", "writeQuorum", true, "Write quorum size (default 2)");
+            lOpts.addOption("a", "ackQuorum", true, "Ack quorum size (default 2)");
+            lOpts.addOption("n", "numEntries", true, "Entries to write (default 1000)");
+        }
+
+        @Override
+        public int runCmd(CommandLine cmdLine) throws Exception {
+            byte[] data = new byte[100]; // test data
+
+            int ensemble = getOptionIntValue(cmdLine, "ensemble", 3);
+            int writeQuorum = getOptionIntValue(cmdLine, "writeQuorum", 2);
+            int ackQuorum = getOptionIntValue(cmdLine, "ackQuorum", 2);
+            int numEntries = getOptionIntValue(cmdLine, "numEntries", 1000);
+
+            ClientConfiguration conf = new ClientConfiguration();
+            conf.addConfiguration(bkConf);
+            BookKeeper bk = new BookKeeper(conf);
+            LedgerHandle lh = bk.createLedger(ensemble, writeQuorum, ackQuorum,
+                                              BookKeeper.DigestType.MAC, new byte[0]);
+            System.out.println("Ledger ID: " + lh.getId());
+            long lastReport = System.nanoTime();
+            for (int i = 0; i < numEntries; i++) {
+                lh.addEntry(data);
+                if (TimeUnit.SECONDS.convert(System.nanoTime() - lastReport,
+                                             TimeUnit.NANOSECONDS) > 1) {
+                    System.out.println(i + " entries written");
+                    lastReport = System.nanoTime();
+                }
+            }
+
+            lh.close();
+            bk.close();
+            System.out.println(numEntries + " entries written to ledger " + lh.getId());
+
+            return 0;
+        }
+
+        @Override
+        String getDescription() {
+            return "Simple test to create a ledger and write entries to it";
+        }
+
+        @Override
+        String getUsage() {
+            return "simpletest   [-ensemble N] [-writeQuorum N] [-ackQuorum N] [-numEntries
N]";
         }
 
         @Override
@@ -387,7 +676,7 @@ public class BookieShell implements Tool
 
         @Override
         String getUsage() {
-            return "readlog [-m] <entry_log_id | entry_log_file_name>";
+            return "readlog      [-msg] <entry_log_id | entry_log_file_name>";
         }
 
         @Override
@@ -448,7 +737,7 @@ public class BookieShell implements Tool
 
         @Override
         String getUsage() {
-            return "readjournal [-m] <journal_id | journal_file_name>";
+            return "readjournal  [-msg] <journal_id | journal_file_name>";
         }
 
         @Override
@@ -460,25 +749,44 @@ public class BookieShell implements Tool
     /**
      * Command to print last log mark
      */
-    class LastMarkCmd implements Command {
+    class LastMarkCmd extends MyCommand {
+        LastMarkCmd() {
+            super(CMD_LASTMARK);
+        }
+
         @Override
-        public int runCmd(String[] args) throws Exception {
+        public int runCmd(CommandLine c) throws Exception {
             printLastLogMark();
             return 0;
         }
 
         @Override
-        public void printUsage() {
-            System.err.println("lastmark: Print last log marker.");
+        String getDescription() {
+            return "Print last log marker.";
+        }
+
+        @Override
+        String getUsage() {
+            return "lastmark";
+        }
+
+        @Override
+        Options getOptions() {
+            return new Options();
         }
     }
 
     /**
      * Command to print help message
      */
-    class HelpCmd implements Command {
+    class HelpCmd extends MyCommand {
+        HelpCmd() {
+            super(CMD_HELP);
+        }
+
         @Override
-        public int runCmd(String[] args) throws Exception {
+        public int runCmd(CommandLine cmdLine) throws Exception {
+            String[] args = cmdLine.getArgs();
             if (args.length == 0) {
                 printShellUsage();
                 return 0;
@@ -495,9 +803,18 @@ public class BookieShell implements Tool
         }
 
         @Override
-        public void printUsage() {
-            System.err.println("help: Describe the usage of this program or its subcommands.");
-            System.err.println("usage: help [COMMAND]");
+        String getDescription() {
+            return "Describe the usage of this program or its subcommands.";
+        }
+
+        @Override
+        String getUsage() {
+            return "help         [COMMAND]";
+        }
+
+        @Override
+        Options getOptions() {
+            return new Options();
         }
     }
 
@@ -572,13 +889,68 @@ public class BookieShell implements Tool
         }
     }
 
-    final Map<String, Command> commands;
+    /**
+     * Print which node has the auditor lock
+     */
+    class WhoIsAuditorCmd extends MyCommand {
+        Options opts = new Options();
+
+        public WhoIsAuditorCmd() {
+            super(CMD_WHOISAUDITOR);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Print the node which holds the auditor lock";
+        }
+
+        @Override
+        String getUsage() {
+            return "whoisauditor";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            ZooKeeper zk = null;
+            try {
+                ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
+                zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+                InetSocketAddress bookieId = AuditorElector.getCurrentAuditor(bkConf, zk);
+                if (bookieId == null) {
+                    LOG.info("No auditor elected");
+                    return -1;
+                }
+                LOG.info("Auditor: {}/{}:{}",
+                         new Object[] {
+                             bookieId.getAddress().getCanonicalHostName(),
+                             bookieId.getAddress().getHostAddress(),
+                             bookieId.getPort() });
+            } finally {
+                if (zk != null) {
+                    zk.close();
+                }
+            }
+
+            return 0;
+        }
+    }
+
+    final Map<String, MyCommand> commands = new HashMap<String, MyCommand>();
     {
-        commands = new HashMap<String, Command>();
         commands.put(CMD_METAFORMAT, new MetaFormatCmd());
         commands.put(CMD_BOOKIEFORMAT, new BookieFormatCmd());
         commands.put(CMD_RECOVER, new RecoverCmd());
         commands.put(CMD_LEDGER, new LedgerCmd());
+        commands.put(CMD_LISTLEDGERS, new ListLedgersCmd());
+        commands.put(CMD_LISTUNDERREPLICATED, new ListUnderreplicatedCmd());
+        commands.put(CMD_WHOISAUDITOR, new WhoIsAuditorCmd());
+        commands.put(CMD_LEDGERMETADATA, new LedgerMetadataCmd());
+        commands.put(CMD_SIMPLETEST, new SimpleTestCmd());
         commands.put(CMD_READLOG, new ReadLogCmd());
         commands.put(CMD_READJOURNAL, new ReadJournalCmd());
         commands.put(CMD_LASTMARK, new LastMarkCmd());
@@ -597,18 +969,17 @@ public class BookieShell implements Tool
         entriesPerPage = pageSize / 8;
     }
 
-    private static void printShellUsage() {
+    private void printShellUsage() {
         System.err.println("Usage: BookieShell [-conf configuration] <command>");
         System.err.println();
-        System.err.println("       metaformat   [-nonInteractive] [-force]");
-        System.err.println("       bookieformat [-nonInteractive] [-force]");
-        System.err.println("       recover      <bookieSrc> [bookieDest]");
-        System.err.println("       ledger       [-meta] <ledger_id>");
-        System.err.println("       readlog      [-msg] <entry_log_id|entry_log_file_name>");
-        System.err.println("       readjournal  [-msg] <journal_id|journal_file_name>");
-        System.err.println("       autorecovery [-enable|-disable]");
-        System.err.println("       lastmark");
-        System.err.println("       help");
+        List<String> commandNames = new ArrayList<String>();
+        for (MyCommand c : commands.values()) {
+            commandNames.add("       " + c.getUsage());
+        }
+        Collections.sort(commandNames);
+        for (String s : commandNames) {
+            System.err.println(s);
+        }
     }
 
     @Override
@@ -631,8 +1002,9 @@ public class BookieShell implements Tool
     }
 
     public static void main(String argv[]) throws Exception {
+        BookieShell shell = new BookieShell();
         if (argv.length <= 0) {
-            printShellUsage();
+            shell.printShellUsage();
             System.exit(-1);
         }
 
@@ -640,7 +1012,7 @@ public class BookieShell implements Tool
         // load configuration
         if ("-conf".equals(argv[0])) {
             if (argv.length <= 1) {
-                printShellUsage();
+                shell.printShellUsage();
                 System.exit(-1);
             }
             conf.addConfiguration(new PropertiesConfiguration(
@@ -651,7 +1023,7 @@ public class BookieShell implements Tool
             argv = newArgv;
         }
 
-        BookieShell shell = new BookieShell();
+
         shell.setConf(conf);
         int res = shell.run(argv);
         System.exit(res);
@@ -931,4 +1303,30 @@ public class BookieShell implements Tool
         }
         return sb.toString();
     }
+
+    private static int getOptionIntValue(CommandLine cmdLine, String option, int defaultVal)
{
+        if (cmdLine.hasOption(option)) {
+            String val = cmdLine.getOptionValue(option);
+            try {
+                return Integer.parseInt(val);
+            } catch (NumberFormatException nfe) {
+                System.err.println("ERROR: invalid value for option " + option + " : " +
val);
+                return defaultVal;
+            }
+        }
+        return defaultVal;
+    }
+
+    private static long getOptionLongValue(CommandLine cmdLine, String option, long defaultVal)
{
+        if (cmdLine.hasOption(option)) {
+            String val = cmdLine.getOptionValue(option);
+            try {
+                return Long.parseLong(val);
+            } catch (NumberFormatException nfe) {
+                System.err.println("ERROR: invalid value for option " + option + " : " +
val);
+                return defaultVal;
+            }
+        }
+        return defaultVal;
+    }
 }

Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java?rev=1599492&r1=1599491&r2=1599492&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
Tue Jun  3 11:56:25 2014
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.meta;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.replication.ReplicationException;
 
+import java.util.Iterator;
+
 /**
  * Interface for marking ledgers which need to be rereplicated
  */
@@ -39,6 +41,14 @@ public interface LedgerUnderreplicationM
             throws ReplicationException.UnavailableException;
 
     /**
+     * Get a list of all the ledgers which have been
+     * marked for rereplication.
+     *
+     * @return an iterator which returns ledger ids
+     */
+    Iterator<Long> listLedgersToRereplicate();
+
+    /**
      * Acquire a underreplicated ledger for rereplication. The ledger
      * should be locked, so that no other agent will receive the ledger
      * from this call.

Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java?rev=1599492&r1=1599491&r2=1599492&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
Tue Jun  3 11:56:25 2014
@@ -51,7 +51,12 @@ import java.util.Map;
 import java.util.List;
 import java.util.Collections;
 import java.util.Arrays;
-
+import java.util.Deque;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.ArrayList;
 
 import java.util.regex.Pattern;
 import java.util.regex.Matcher;
@@ -313,6 +318,56 @@ public class ZkLedgerUnderreplicationMan
         }
     }
 
+    @Override
+    public Iterator<Long> listLedgersToRereplicate() {
+        final Queue<String> queue = new LinkedList<String>();
+        queue.add(urLedgerPath);
+
+        return new Iterator<Long>() {
+            final Queue<Long> curBatch = new LinkedList<Long>();
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public boolean hasNext() {
+                if (curBatch.size() > 0) {
+                    return true;
+                }
+
+                while (queue.size() > 0 && curBatch.size() == 0) {
+                    String parent = queue.remove();
+                    try {
+                        for (String c : zkc.getChildren(parent,false)) {
+                            String child = parent + "/" + c;
+                            if (c.startsWith("urL")) {
+                                curBatch.add(getLedgerId(child));
+                            } else {
+                                queue.add(child);
+                            }
+                        }
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        return false;
+                    } catch (KeeperException.NoNodeException nne) {
+                        // ignore
+                    } catch (Exception e) {
+                        throw new RuntimeException("Error reading list", e);
+                    }
+                }
+                return curBatch.size() > 0;
+            }
+
+            @Override
+            public Long next() {
+                assert curBatch.size() > 0;
+                return curBatch.remove();
+            }
+        };
+    }
+
     private long getLedgerToRereplicateFromHierarchy(String parent, long depth, Watcher w)
             throws KeeperException, InterruptedException {
         if (depth == 4) {



Mime
View raw message