bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-894: add command to read ledger entries form shell
Date Wed, 16 Mar 2016 04:03:43 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 1a98088e3 -> a13d75d7e


BOOKKEEPER-894: add command to read ledger entries form shell

Author: Siddharth Boobna <sboobna@yahoo-inc.com>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #24 from sboobna/BOOKKEEPER-894


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/a13d75d7
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/a13d75d7
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/a13d75d7

Branch: refs/heads/master
Commit: a13d75d7eae29f1ce42dbfe5c6b4878a822fdf91
Parents: 1a98088
Author: Siddharth Boobna <sboobna@yahoo-inc.com>
Authored: Tue Mar 15 21:03:33 2016 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Tue Mar 15 21:03:33 2016 -0700

----------------------------------------------------------------------
 .../apache/bookkeeper/bookie/BookieShell.java   | 123 +++++++++++++++----
 .../bookkeeper/client/BookKeeperAdmin.java      | 118 ++++++++++++++++++
 .../apache/bookkeeper/client/LedgerHandle.java  |  10 +-
 3 files changed, 227 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a13d75d7/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index c7ed2f8..7d49a6a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -18,70 +18,69 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Formatter;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
-import java.util.Collection;
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeperAdmin;
-import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.client.UpdateLedgerOp;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.util.EntryFormatter;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.Tool;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
-
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-
-import com.google.common.util.concurrent.AbstractFuture;
-import static com.google.common.base.Charsets.UTF_8;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.MissingArgumentException;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.HexDump;
+import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.commons.lang.StringUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.AbstractFuture;
+
 /**
  * Bookie Shell is to provide utilities for users to administer a bookkeeper cluster.
  */
@@ -95,6 +94,7 @@ public class BookieShell implements Tool {
     static final String CMD_BOOKIEFORMAT = "bookieformat";
     static final String CMD_RECOVER = "recover";
     static final String CMD_LEDGER = "ledger";
+    static final String CMD_READ_LEDGER_ENTRIES = "readledger";
     static final String CMD_LISTLEDGERS = "listledgers";
     static final String CMD_LEDGERMETADATA = "ledgermetadata";
     static final String CMD_LISTUNDERREPLICATED = "listunderreplicated";
@@ -397,6 +397,86 @@ public class BookieShell implements Tool {
     }
 
     /**
+     * Command for reading ledger entries
+     */
+    class ReadLedgerEntriesCmd extends MyCommand {
+        Options lOpts = new Options();
+
+        ReadLedgerEntriesCmd() {
+            super(CMD_READ_LEDGER_ENTRIES);
+        }
+
+        @Override
+        Options getOptions() {
+            return lOpts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Read a range of entries from a ledger";
+        }
+
+        @Override
+        String getUsage() {
+            return "readledger <ledger_id> [<start_entry_id> [<end_entry_id>]]";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            String[] leftArgs = cmdLine.getArgs();
+            if (leftArgs.length <= 0) {
+                System.err.println("ERROR: missing ledger id");
+                printUsage();
+                return -1;
+            }
+
+            long ledgerId;
+            long firstEntry = 0;
+            long lastEntry = -1;
+            try {
+                ledgerId = Long.parseLong(leftArgs[0]);
+                if (leftArgs.length >= 2) {
+                    firstEntry = Long.parseLong(leftArgs[1]);
+                }
+                if (leftArgs.length >= 3) {
+                    lastEntry = Long.parseLong(leftArgs[2]);
+                }
+            } catch (NumberFormatException nfe) {
+                System.err.println("ERROR: invalid number " + nfe.getMessage());
+                printUsage();
+                return -1;
+            }
+
+            ClientConfiguration conf = new ClientConfiguration();
+            conf.addConfiguration(bkConf);
+
+            BookKeeperAdmin bk = null;
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            try {
+                bk = new BookKeeperAdmin(conf);
+                Iterator<LedgerEntry> entries = bk.readEntries(ledgerId, firstEntry,
lastEntry).iterator();
+                while (entries.hasNext()) {
+                    LedgerEntry entry = entries.next();
+                    HexDump.dump(entry.getEntry(), 0, out, 0);
+                    System.out.println("Entry Id: " + entry.getEntryId() + ", Data: " + new
String(out.toByteArray()));
+                    out.reset();
+                }
+            } catch (Exception e) {
+                LOG.error("Error reading entries from ledger {}", ledgerId, e.getCause());
+                return -1;
+            } finally {
+                out.close();
+                if (bk != null) {
+                    bk.close();
+                }
+            }
+
+            return 0;
+        }
+
+    }
+
+    /**
      * Command for listing underreplicated ledgers
      */
     class ListUnderreplicatedCmd extends MyCommand {
@@ -1425,6 +1505,7 @@ public class BookieShell implements Tool {
         commands.put(CMD_BOOKIEFORMAT, new BookieFormatCmd());
         commands.put(CMD_RECOVER, new RecoverCmd());
         commands.put(CMD_LEDGER, new LedgerCmd());
+        commands.put(CMD_READ_LEDGER_ENTRIES, new ReadLedgerEntriesCmd());
         commands.put(CMD_LISTLEDGERS, new ListLedgersCmd());
         commands.put(CMD_LISTUNDERREPLICATED, new ListUnderreplicatedCmd());
         commands.put(CMD_WHOISAUDITOR, new WhoIsAuditorCmd());

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a13d75d7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index ff339db..0bc5c45 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -21,6 +21,7 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -310,6 +311,123 @@ public class BookKeeperAdmin {
         return counter.getLh();
     }
 
+    /**
+     * Read entries from a ledger synchronously. If the lastEntry is -1, it will read all
the entries in the ledger from
+     * the firstEntry.
+     * 
+     * @param ledgerId
+     * @param firstEntry
+     * @param lastEntry
+     * @return
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public Iterable<LedgerEntry> readEntries(long ledgerId, long firstEntry, long lastEntry)
+            throws InterruptedException, BKException {
+        checkArgument(ledgerId >= 0 && firstEntry >= 0);
+        return new LedgerEntriesIterable(ledgerId, firstEntry, lastEntry);
+    }
+
+    class LedgerEntriesIterable implements Iterable<LedgerEntry> {
+        final long ledgerId;
+        final long firstEntryId;
+        final long lastEntryId;
+
+        public LedgerEntriesIterable(long ledgerId, long firstEntry) {
+            this(ledgerId, firstEntry, -1);
+        }
+
+        public LedgerEntriesIterable(long ledgerId, long firstEntry, long lastEntry) {
+            this.ledgerId = ledgerId;
+            this.firstEntryId = firstEntry;
+            this.lastEntryId = lastEntry;
+        }
+
+        @Override
+        public Iterator<LedgerEntry> iterator() {
+            try {
+                return new LedgerEntriesIterator(ledgerId, firstEntryId, lastEntryId);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    class LedgerEntriesIterator implements Iterator<LedgerEntry> {
+        final LedgerHandle handle;
+        final long ledgerId;
+        final long lastEntryId;
+
+        long nextEntryId;
+        LedgerEntry currentEntry;
+
+        public LedgerEntriesIterator(long ledgerId, long firstEntry, long lastEntry)
+                throws InterruptedException, BKException {
+            this.handle = openLedgerNoRecovery(ledgerId);
+            this.ledgerId = ledgerId;
+            this.nextEntryId = firstEntry;
+            this.lastEntryId = lastEntry;
+            this.currentEntry = null;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (currentEntry != null) {
+                return true;
+            }
+            if (lastEntryId == -1 || nextEntryId <= lastEntryId) {
+                try {
+                    SyncCounter counter = new SyncCounter();
+                    counter.inc();
+
+                    handle.asyncReadEntriesInternal(nextEntryId, nextEntryId, new LedgerHandle.SyncReadCallback(),
+                            counter);
+                    counter.block(0);
+                    if (counter.getrc() != BKException.Code.OK) {
+                        throw BKException.create(counter.getrc());
+                    }
+                    currentEntry = counter.getSequence().nextElement();
+                    return true;
+                } catch (Exception e) {
+                    if (e instanceof BKException.BKNoSuchEntryException && lastEntryId
== -1) {
+                        // there are no more entries in the ledger, so we just return false
and ignore this exception
+                        // since the last entry id was undefined
+                        close();
+                        return false;
+                    }
+                    LOG.error("Error reading entry {} from ledger {}", new Object[] { nextEntryId,
ledgerId }, e);
+                    close();
+                    throw new RuntimeException(e);
+                }
+            }
+            close();
+            return false;
+        }
+
+        @Override
+        public LedgerEntry next() {
+            ++nextEntryId;
+            LedgerEntry entry = currentEntry;
+            currentEntry = null;
+            return entry;
+        }
+
+        @Override
+        public void remove() {
+            // noop
+        }
+
+        private void close() {
+            if (handle != null) {
+                try {
+                    handle.close();
+                } catch (Exception e) {
+                    LOG.error("Error closing ledger handle {}", handle, e);
+                }
+            }
+        }
+    }
+
     // Object used for calling async methods and waiting for them to complete.
     static class SyncObject {
         boolean value;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/a13d75d7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 12d689c..9af2db7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -458,6 +458,10 @@ public class LedgerHandle {
             return;
         }
 
+        asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx);
+    }
+
+    void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb, Object
ctx) {
         try {
             new PendingReadOp(this, bk.scheduler,
                               firstEntry, lastEntry, cb, ctx).initiate();
@@ -1294,7 +1298,7 @@ public class LedgerHandle {
         }
     }
 
-    private static class SyncReadCallback implements ReadCallback {
+    static class SyncReadCallback implements ReadCallback {
         /**
          * Implementation of callback interface for synchronous read method.
          *
@@ -1346,7 +1350,7 @@ public class LedgerHandle {
         }
     }
 
-    private static class SyncReadLastConfirmedCallback implements ReadLastConfirmedCallback
{
+    static class SyncReadLastConfirmedCallback implements ReadLastConfirmedCallback {
         /**
          * Implementation of  callback interface for synchronous read last confirmed method.
          */
@@ -1362,7 +1366,7 @@ public class LedgerHandle {
         }
     }
 
-    private static class SyncCloseCallback implements CloseCallback {
+    static class SyncCloseCallback implements CloseCallback {
         /**
          * Close callback method
          *


Mime
View raw message