Author: ivank
Date: Tue Jun 3 11:56:25 2014
New Revision: 1599492
URL: http://svn.apache.org/r1599492
Log:
BOOKKEEPER-746: 5 new shell commands. List ledgers, list metadata, list underreplicated, show
auditor and simpletest (ivank)
Modified:
zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
Modified: zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt?rev=1599492&r1=1599491&r2=1599492&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt Tue Jun 3 11:56:25 2014
@@ -40,6 +40,8 @@ Release 4.2.3 - 2013-12-04
BOOKKEEPER-747: Implement register/unregister LedgerMetadataListener in MSLedgerManagerFactory
(fpj via sijie)
+ BOOKKEEPER-746: 5 new shell commands. List ledgers, list metadata, list underreplicated,
show auditor and simpletest (ivank)
+
Release 4.2.2 - 2013-10-02
Backward compatible changes:
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1599492&r1=1599491&r2=1599492&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
Tue Jun 3 11:56:25 2014
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
+
import java.nio.ByteBuffer;
import java.util.Formatter;
import java.util.HashMap;
@@ -32,17 +33,35 @@ import org.apache.bookkeeper.meta.Ledger
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.bookie.Journal.LastLogMark;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.util.EntryFormatter;
import org.apache.bookkeeper.util.Tool;
import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import static com.google.common.base.Charsets.UTF_8;
+
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
@@ -69,6 +88,11 @@ public class BookieShell implements Tool
static final String CMD_BOOKIEFORMAT = "bookieformat";
static final String CMD_RECOVER = "recover";
static final String CMD_LEDGER = "ledger";
+ static final String CMD_LISTLEDGERS = "listledgers";
+ static final String CMD_LEDGERMETADATA = "ledgermetadata";
+ static final String CMD_LISTUNDERREPLICATED = "listunderreplicated";
+ static final String CMD_WHOISAUDITOR = "whoisauditor";
+ static final String CMD_SIMPLETEST = "simpletest";
static final String CMD_READLOG = "readlog";
static final String CMD_READJOURNAL = "readjournal";
static final String CMD_LASTMARK = "lastmark";
@@ -151,7 +175,7 @@ public class BookieShell implements Tool
@Override
String getUsage() {
- return "metaformat [-nonInteractive] [-force]";
+ return "metaformat [-nonInteractive] [-force]";
}
@Override
@@ -229,7 +253,7 @@ public class BookieShell implements Tool
@Override
String getUsage() {
- return "recover <bookieSrc> [bookieDest]";
+ return "recover <bookieSrc> [bookieDest]";
}
@Override
@@ -326,7 +350,272 @@ public class BookieShell implements Tool
@Override
String getUsage() {
- return "ledger [-m] <ledger_id>";
+ return "ledger [-m] <ledger_id>";
+ }
+
+ @Override
+ Options getOptions() {
+ return lOpts;
+ }
+ }
+
+ /**
+ * Command for listing underreplicated ledgers
+ */
+ class ListUnderreplicatedCmd extends MyCommand {
+ Options opts = new Options();
+
+ public ListUnderreplicatedCmd() {
+ super(CMD_LISTUNDERREPLICATED);
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "List ledgers marked as underreplicated";
+ }
+
+ @Override
+ String getUsage() {
+ return "listunderreplicated";
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ ZooKeeper zk = null;
+ try {
+ ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
+ zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+ LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf,
zk);
+ LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
+ Iterator<Long> iter = underreplicationManager.listLedgersToRereplicate();
+ while (iter.hasNext()) {
+ System.out.println(iter.next());
+ }
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+
+ return 0;
+ }
+ }
+
+ final static int LIST_BATCH_SIZE = 1000;
+ /**
+ * Command to list all ledgers in the cluster
+ */
+ class ListLedgersCmd extends MyCommand {
+ Options lOpts = new Options();
+
+ ListLedgersCmd() {
+ super(CMD_LISTLEDGERS);
+ lOpts.addOption("m", "meta", false, "Print metadata");
+
+ }
+
+ @Override
+ public int runCmd(CommandLine cmdLine) throws Exception {
+ ZooKeeper zk = null;
+ try {
+ ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
+ zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+ LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf,
zk);
+ LedgerManager m = mFactory.newLedgerManager();
+ LedgerRangeIterator iter = m.getLedgerRanges();
+ if (cmdLine.hasOption("m")) {
+ List<ReadMetadataCallback> futures
+ = new ArrayList<ReadMetadataCallback>(LIST_BATCH_SIZE);
+ while (iter.hasNext()) {
+ LedgerRange r = iter.next();
+ for (Long lid : r.getLedgers()) {
+ ReadMetadataCallback cb = new ReadMetadataCallback(lid);
+ m.readLedgerMetadata(lid, cb);
+ futures.add(cb);
+ }
+ if (futures.size() >= LIST_BATCH_SIZE) {
+ while (futures.size() > 0) {
+ ReadMetadataCallback cb = futures.remove(0);
+ printLedgerMetadata(cb);
+ }
+ }
+ }
+ while (futures.size() > 0) {
+ ReadMetadataCallback cb = futures.remove(0);
+ printLedgerMetadata(cb);
+ }
+ } else {
+ while (iter.hasNext()) {
+ LedgerRange r = iter.next();
+ for (Long lid : r.getLedgers()) {
+ System.out.println(Long.toString(lid));
+ }
+ }
+ }
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ String getDescription() {
+ return "List all ledgers on the cluster (this may take a long time)";
+ }
+
+ @Override
+ String getUsage() {
+ return "listledgers [-meta]";
+ }
+
+ @Override
+ Options getOptions() {
+ return lOpts;
+ }
+ }
+
+ static void printLedgerMetadata(ReadMetadataCallback cb) throws Exception {
+ LedgerMetadata md = cb.get();
+ System.out.println("ledgerID: " + cb.getLedgerId());
+ System.out.println(new String(md.serialize(), UTF_8));
+ }
+
+ static class ReadMetadataCallback extends AbstractFuture<LedgerMetadata>
+ implements GenericCallback<LedgerMetadata> {
+ final long ledgerId;
+
+ ReadMetadataCallback(long ledgerId) {
+ this.ledgerId = ledgerId;
+ }
+
+ long getLedgerId() {
+ return ledgerId;
+ }
+
+ public void operationComplete(int rc, LedgerMetadata result) {
+ if (rc != 0) {
+ setException(BKException.create(rc));
+ } else {
+ set(result);
+ }
+ }
+ }
+
+ /**
+ * Print the metadata for a ledger
+ */
+ class LedgerMetadataCmd extends MyCommand {
+ Options lOpts = new Options();
+
+ LedgerMetadataCmd() {
+ super(CMD_LEDGERMETADATA);
+ lOpts.addOption("l", "ledgerid", true, "Ledger ID");
+ }
+
+ @Override
+ public int runCmd(CommandLine cmdLine) throws Exception {
+ final long lid = getOptionLongValue(cmdLine, "ledgerid", -1);
+ if (lid == -1) {
+ System.err.println("Must specify a ledger id");
+ return -1;
+ }
+
+ ZooKeeper zk = null;
+ try {
+ ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
+ zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+ LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf,
zk);
+ LedgerManager m = mFactory.newLedgerManager();
+ ReadMetadataCallback cb = new ReadMetadataCallback(lid);
+ m.readLedgerMetadata(lid, cb);
+ printLedgerMetadata(cb);
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ String getDescription() {
+ return "Print the metadata for a ledger";
+ }
+
+ @Override
+ String getUsage() {
+ return "ledgermetadata -ledgerid <ledgerid>";
+ }
+
+ @Override
+ Options getOptions() {
+ return lOpts;
+ }
+ }
+
+ /**
+ * Simple test to create a ledger and write to it
+ */
+ class SimpleTestCmd extends MyCommand {
+ Options lOpts = new Options();
+
+ SimpleTestCmd() {
+ super(CMD_SIMPLETEST);
+ lOpts.addOption("e", "ensemble", true, "Ensemble size (default 3)");
+ lOpts.addOption("w", "writeQuorum", true, "Write quorum size (default 2)");
+ lOpts.addOption("a", "ackQuorum", true, "Ack quorum size (default 2)");
+ lOpts.addOption("n", "numEntries", true, "Entries to write (default 1000)");
+ }
+
+ @Override
+ public int runCmd(CommandLine cmdLine) throws Exception {
+ byte[] data = new byte[100]; // test data
+
+ int ensemble = getOptionIntValue(cmdLine, "ensemble", 3);
+ int writeQuorum = getOptionIntValue(cmdLine, "writeQuorum", 2);
+ int ackQuorum = getOptionIntValue(cmdLine, "ackQuorum", 2);
+ int numEntries = getOptionIntValue(cmdLine, "numEntries", 1000);
+
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.addConfiguration(bkConf);
+ BookKeeper bk = new BookKeeper(conf);
+ LedgerHandle lh = bk.createLedger(ensemble, writeQuorum, ackQuorum,
+ BookKeeper.DigestType.MAC, new byte[0]);
+ System.out.println("Ledger ID: " + lh.getId());
+ long lastReport = System.nanoTime();
+ for (int i = 0; i < numEntries; i++) {
+ lh.addEntry(data);
+ if (TimeUnit.SECONDS.convert(System.nanoTime() - lastReport,
+ TimeUnit.NANOSECONDS) > 1) {
+ System.out.println(i + " entries written");
+ lastReport = System.nanoTime();
+ }
+ }
+
+ lh.close();
+ bk.close();
+ System.out.println(numEntries + " entries written to ledger " + lh.getId());
+
+ return 0;
+ }
+
+ @Override
+ String getDescription() {
+ return "Simple test to create a ledger and write entries to it";
+ }
+
+ @Override
+ String getUsage() {
+ return "simpletest [-ensemble N] [-writeQuorum N] [-ackQuorum N] [-numEntries
N]";
}
@Override
@@ -387,7 +676,7 @@ public class BookieShell implements Tool
@Override
String getUsage() {
- return "readlog [-m] <entry_log_id | entry_log_file_name>";
+ return "readlog [-msg] <entry_log_id | entry_log_file_name>";
}
@Override
@@ -448,7 +737,7 @@ public class BookieShell implements Tool
@Override
String getUsage() {
- return "readjournal [-m] <journal_id | journal_file_name>";
+ return "readjournal [-msg] <journal_id | journal_file_name>";
}
@Override
@@ -460,25 +749,44 @@ public class BookieShell implements Tool
/**
* Command to print last log mark
*/
- class LastMarkCmd implements Command {
+ class LastMarkCmd extends MyCommand {
+ LastMarkCmd() {
+ super(CMD_LASTMARK);
+ }
+
@Override
- public int runCmd(String[] args) throws Exception {
+ public int runCmd(CommandLine c) throws Exception {
printLastLogMark();
return 0;
}
@Override
- public void printUsage() {
- System.err.println("lastmark: Print last log marker.");
+ String getDescription() {
+ return "Print last log marker.";
+ }
+
+ @Override
+ String getUsage() {
+ return "lastmark";
+ }
+
+ @Override
+ Options getOptions() {
+ return new Options();
}
}
/**
* Command to print help message
*/
- class HelpCmd implements Command {
+ class HelpCmd extends MyCommand {
+ HelpCmd() {
+ super(CMD_HELP);
+ }
+
@Override
- public int runCmd(String[] args) throws Exception {
+ public int runCmd(CommandLine cmdLine) throws Exception {
+ String[] args = cmdLine.getArgs();
if (args.length == 0) {
printShellUsage();
return 0;
@@ -495,9 +803,18 @@ public class BookieShell implements Tool
}
@Override
- public void printUsage() {
- System.err.println("help: Describe the usage of this program or its subcommands.");
- System.err.println("usage: help [COMMAND]");
+ String getDescription() {
+ return "Describe the usage of this program or its subcommands.";
+ }
+
+ @Override
+ String getUsage() {
+ return "help [COMMAND]";
+ }
+
+ @Override
+ Options getOptions() {
+ return new Options();
}
}
@@ -572,13 +889,68 @@ public class BookieShell implements Tool
}
}
- final Map<String, Command> commands;
+ /**
+ * Print which node has the auditor lock
+ */
+ class WhoIsAuditorCmd extends MyCommand {
+ Options opts = new Options();
+
+ public WhoIsAuditorCmd() {
+ super(CMD_WHOISAUDITOR);
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Print the node which holds the auditor lock";
+ }
+
+ @Override
+ String getUsage() {
+ return "whoisauditor";
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ ZooKeeper zk = null;
+ try {
+ ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(bkConf.getZkTimeout());
+ zk = ZkUtils.createConnectedZookeeperClient(bkConf.getZkServers(), w);
+ InetSocketAddress bookieId = AuditorElector.getCurrentAuditor(bkConf, zk);
+ if (bookieId == null) {
+ LOG.info("No auditor elected");
+ return -1;
+ }
+ LOG.info("Auditor: {}/{}:{}",
+ new Object[] {
+ bookieId.getAddress().getCanonicalHostName(),
+ bookieId.getAddress().getHostAddress(),
+ bookieId.getPort() });
+ } finally {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+
+ return 0;
+ }
+ }
+
+ final Map<String, MyCommand> commands = new HashMap<String, MyCommand>();
{
- commands = new HashMap<String, Command>();
commands.put(CMD_METAFORMAT, new MetaFormatCmd());
commands.put(CMD_BOOKIEFORMAT, new BookieFormatCmd());
commands.put(CMD_RECOVER, new RecoverCmd());
commands.put(CMD_LEDGER, new LedgerCmd());
+ commands.put(CMD_LISTLEDGERS, new ListLedgersCmd());
+ commands.put(CMD_LISTUNDERREPLICATED, new ListUnderreplicatedCmd());
+ commands.put(CMD_WHOISAUDITOR, new WhoIsAuditorCmd());
+ commands.put(CMD_LEDGERMETADATA, new LedgerMetadataCmd());
+ commands.put(CMD_SIMPLETEST, new SimpleTestCmd());
commands.put(CMD_READLOG, new ReadLogCmd());
commands.put(CMD_READJOURNAL, new ReadJournalCmd());
commands.put(CMD_LASTMARK, new LastMarkCmd());
@@ -597,18 +969,17 @@ public class BookieShell implements Tool
entriesPerPage = pageSize / 8;
}
- private static void printShellUsage() {
+ private void printShellUsage() {
System.err.println("Usage: BookieShell [-conf configuration] <command>");
System.err.println();
- System.err.println(" metaformat [-nonInteractive] [-force]");
- System.err.println(" bookieformat [-nonInteractive] [-force]");
- System.err.println(" recover <bookieSrc> [bookieDest]");
- System.err.println(" ledger [-meta] <ledger_id>");
- System.err.println(" readlog [-msg] <entry_log_id|entry_log_file_name>");
- System.err.println(" readjournal [-msg] <journal_id|journal_file_name>");
- System.err.println(" autorecovery [-enable|-disable]");
- System.err.println(" lastmark");
- System.err.println(" help");
+ List<String> commandNames = new ArrayList<String>();
+ for (MyCommand c : commands.values()) {
+ commandNames.add(" " + c.getUsage());
+ }
+ Collections.sort(commandNames);
+ for (String s : commandNames) {
+ System.err.println(s);
+ }
}
@Override
@@ -631,8 +1002,9 @@ public class BookieShell implements Tool
}
public static void main(String argv[]) throws Exception {
+ BookieShell shell = new BookieShell();
if (argv.length <= 0) {
- printShellUsage();
+ shell.printShellUsage();
System.exit(-1);
}
@@ -640,7 +1012,7 @@ public class BookieShell implements Tool
// load configuration
if ("-conf".equals(argv[0])) {
if (argv.length <= 1) {
- printShellUsage();
+ shell.printShellUsage();
System.exit(-1);
}
conf.addConfiguration(new PropertiesConfiguration(
@@ -651,7 +1023,7 @@ public class BookieShell implements Tool
argv = newArgv;
}
- BookieShell shell = new BookieShell();
+
shell.setConf(conf);
int res = shell.run(argv);
System.exit(res);
@@ -931,4 +1303,30 @@ public class BookieShell implements Tool
}
return sb.toString();
}
+
+ private static int getOptionIntValue(CommandLine cmdLine, String option, int defaultVal)
{
+ if (cmdLine.hasOption(option)) {
+ String val = cmdLine.getOptionValue(option);
+ try {
+ return Integer.parseInt(val);
+ } catch (NumberFormatException nfe) {
+ System.err.println("ERROR: invalid value for option " + option + " : " +
val);
+ return defaultVal;
+ }
+ }
+ return defaultVal;
+ }
+
+ private static long getOptionLongValue(CommandLine cmdLine, String option, long defaultVal)
{
+ if (cmdLine.hasOption(option)) {
+ String val = cmdLine.getOptionValue(option);
+ try {
+ return Long.parseLong(val);
+ } catch (NumberFormatException nfe) {
+ System.err.println("ERROR: invalid value for option " + option + " : " +
val);
+ return defaultVal;
+ }
+ }
+ return defaultVal;
+ }
}
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java?rev=1599492&r1=1599491&r2=1599492&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
Tue Jun 3 11:56:25 2014
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.meta;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.replication.ReplicationException;
+import java.util.Iterator;
+
/**
* Interface for marking ledgers which need to be rereplicated
*/
@@ -39,6 +41,14 @@ public interface LedgerUnderreplicationM
throws ReplicationException.UnavailableException;
/**
+ * Get a list of all the ledgers which have been
+ * marked for rereplication.
+ *
+ * @return an iterator which returns ledger ids
+ */
+ Iterator<Long> listLedgersToRereplicate();
+
+ /**
* Acquire a underreplicated ledger for rereplication. The ledger
* should be locked, so that no other agent will receive the ledger
* from this call.
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java?rev=1599492&r1=1599491&r2=1599492&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
(original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
Tue Jun 3 11:56:25 2014
@@ -51,7 +51,12 @@ import java.util.Map;
import java.util.List;
import java.util.Collections;
import java.util.Arrays;
-
+import java.util.Deque;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.ArrayList;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
@@ -313,6 +318,56 @@ public class ZkLedgerUnderreplicationMan
}
}
+ @Override
+ public Iterator<Long> listLedgersToRereplicate() {
+ final Queue<String> queue = new LinkedList<String>();
+ queue.add(urLedgerPath);
+
+ return new Iterator<Long>() {
+ final Queue<Long> curBatch = new LinkedList<Long>();
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (curBatch.size() > 0) {
+ return true;
+ }
+
+ while (queue.size() > 0 && curBatch.size() == 0) {
+ String parent = queue.remove();
+ try {
+ for (String c : zkc.getChildren(parent,false)) {
+ String child = parent + "/" + c;
+ if (c.startsWith("urL")) {
+ curBatch.add(getLedgerId(child));
+ } else {
+ queue.add(child);
+ }
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (KeeperException.NoNodeException nne) {
+ // ignore
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading list", e);
+ }
+ }
+ return curBatch.size() > 0;
+ }
+
+ @Override
+ public Long next() {
+ assert curBatch.size() > 0;
+ return curBatch.remove();
+ }
+ };
+ }
+
private long getLedgerToRereplicateFromHierarchy(String parent, long depth, Watcher w)
throws KeeperException, InterruptedException {
if (depth == 4) {
|