zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1374265 - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/src/main/java/org/apache/hedwig/admin/ hedwig-server/src/main/java/org/apache/hedwig/admin/console/ hedwig-server/src/main/java/org/apache/hedwig/server/meta/ hedwig-server/src/mai...
Date Fri, 17 Aug 2012 13:56:33 GMT
Author: ivank
Date: Fri Aug 17 13:56:32 2012
New Revision: 1374265

URL: http://svn.apache.org/viewvc?rev=1374265&view=rev
Log:
BOOKKEEPER-283: Improve Hedwig Console to use Hedwig Metadata Manager. (sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1374265&r1=1374264&r2=1374265&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Aug 17 13:56:32 2012
@@ -96,6 +96,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-340: Test backward compatibility for hedwig between different versions.
(sijie via ivank)
 
+        BOOKKEEPER-283: Improve Hedwig Console to use Hedwig Metadata Manager. (sijie via
ivank)
+
       hedwig-client:
 
         BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java?rev=1374265&r1=1374264&r2=1374265&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
Fri Aug 17 13:56:32 2012
@@ -24,12 +24,16 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
 import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
@@ -37,6 +41,14 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
 import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.meta.MetadataManagerFactory;
+import org.apache.hedwig.server.meta.SubscriptionDataManager;
+import org.apache.hedwig.server.meta.TopicOwnershipManager;
+import org.apache.hedwig.server.meta.TopicPersistenceManager;
+import org.apache.hedwig.server.subscriptions.InMemorySubscriptionState;
+import org.apache.hedwig.server.topics.HubInfo;
+import org.apache.hedwig.server.topics.HubLoad;
+import org.apache.hedwig.util.Callback;
 import org.apache.hedwig.util.HedwigSocketAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,16 +70,82 @@ public class HedwigAdmin {
     // NOTE: now it is fixed passwd used in hedwig
     static byte[] passwd = "sillysecret".getBytes();
 
-    protected ZooKeeper zk;
-    protected BookKeeper bk;
+    protected final ZooKeeper zk;
+    protected final BookKeeper bk;
+    protected final MetadataManagerFactory mmFactory;
+    protected final SubscriptionDataManager sdm;
+    protected final TopicOwnershipManager tom;
+    protected final TopicPersistenceManager tpm;
+
     // hub configurations
-    protected ServerConfiguration serverConf;
+    protected final ServerConfiguration serverConf;
     // bookkeeper configurations
-    protected ClientConfiguration bkClientConf;
+    protected final ClientConfiguration bkClientConf;
+
+    protected final CountDownLatch zkReadyLatch = new CountDownLatch(1);
 
     // Empty watcher
-    private static class MyWatcher implements Watcher {
+    private class MyWatcher implements Watcher {
         public void process(WatchedEvent event) {
+            if (Event.KeeperState.SyncConnected.equals(event.getState())) {
+                zkReadyLatch.countDown();
+            }
+        }
+    }
+
+    static class SyncObj<T> {
+        boolean finished = false;
+        boolean success = false;
+        T value = null;
+        PubSubException exception = null;
+
+        synchronized void success(T v) {
+            finished = true;
+            success = true;
+            value = v;
+            notify();
+        }
+
+        synchronized void fail(PubSubException pse) {
+            finished = true;
+            success = false;
+            exception = pse;
+            notify();
+        }
+
+        synchronized void block() {
+            try {
+                while (!finished) {
+                    wait();
+                }
+            } catch (InterruptedException ie) {
+            }
+        }
+
+        synchronized boolean isSuccess() {
+            return success;
+        }
+    }
+
+    /**
+     * Stats of a hub
+     */
+    public static class HubStats {
+        HubInfo hubInfo;
+        HubLoad hubLoad;
+
+        public HubStats(HubInfo info, HubLoad load) {
+            this.hubInfo = info;
+            this.hubLoad = load;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("info : [").append(hubInfo.toString().trim().replaceAll("\n", ", "))
+              .append("], load : [").append(hubLoad.toString().trim().replaceAll("\n", ",
"))
+              .append("]");
+            return sb.toString();
         }
     }
 
@@ -90,6 +168,16 @@ public class HedwigAdmin {
             LOG.debug("Connecting to zookeeper " + hubConf.getZkHost() + ", timeout = "
                     + hubConf.getZkTimeout());
         }
+        // wait until connection is ready
+        if (!zkReadyLatch.await(hubConf.getZkTimeout() * 2, TimeUnit.MILLISECONDS)) {
+            throw new Exception("Count not establish connection with ZooKeeper after " +
hubConf.getZkTimeout() * 2 + " ms.");
+        }
+
+        // construct the metadata manager factory
+        mmFactory = MetadataManagerFactory.newMetadataManagerFactory(hubConf, zk);
+        tpm = mmFactory.newTopicPersistenceManager();
+        tom = mmFactory.newTopicOwnershipManager();
+        sdm = mmFactory.newSubscriptionDataManager();
 
         // connect to bookkeeper
         bk = new BookKeeper(bkClientConf, zk);
@@ -104,6 +192,10 @@ public class HedwigAdmin {
      * @throws Exception
      */
     public void close() throws Exception {
+        tpm.close();
+        tom.close();
+        sdm.close();
+        mmFactory.shutdown();
         bk.close();
         zk.close();
     }
@@ -162,9 +254,31 @@ public class HedwigAdmin {
      * @throws Exception
      */
     public boolean hasTopic(ByteString topic) throws Exception {
-        String topicPath = serverConf.getZkTopicPath(new StringBuilder(), topic).toString();
-        Stat stat = zk.exists(topicPath, false);
-        return null != stat;
+        // current persistence info is bound with a topic, so if there is persistence info
+        // there is topic.
+        final SyncObj<Boolean> syncObj = new SyncObj<Boolean>();
+        tpm.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>()
{
+            @Override
+            public void operationFinished(Object ctx, Versioned<LedgerRanges> result)
{
+                if (null == result) {
+                    syncObj.success(false);
+                } else {
+                    syncObj.success(true);
+                }
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException pse) {
+                syncObj.fail(pse);
+            }
+        }, syncObj);
+
+        syncObj.block();
+
+        if (!syncObj.isSuccess()) {
+            throw syncObj.exception;
+        }
+
+        return syncObj.value;
     }
 
     /**
@@ -173,27 +287,29 @@ public class HedwigAdmin {
      * @return available hubs and their loads
      * @throws Exception
      */
-    public Map<HedwigSocketAddress, Integer> getAvailableHubs() throws Exception {
+    public Map<HedwigSocketAddress, HubStats> getAvailableHubs() throws Exception {
         String zkHubsPath = serverConf.getZkHostsPrefix(new StringBuilder()).toString();
-        Map<HedwigSocketAddress, Integer> hubs =
-            new HashMap<HedwigSocketAddress, Integer>();
+        Map<HedwigSocketAddress, HubStats> hubs =
+            new HashMap<HedwigSocketAddress, HubStats>();
         List<String> hosts = zk.getChildren(zkHubsPath, false);
         for (String host : hosts) {
             String zkHubPath = serverConf.getZkHostsPrefix(new StringBuilder())
                                          .append("/").append(host).toString();
-            int load = 0;
+            HedwigSocketAddress addr = new HedwigSocketAddress(host);
             try {
                 Stat stat = new Stat();
                 byte[] data = zk.getData(zkHubPath, false, stat);
-                if (data != null) {
-                    load = Integer.parseInt(new String(data));
+                if (data == null) {
+                    continue;
                 }
+                HubLoad load = HubLoad.parse(new String(data));
+                HubInfo info = new HubInfo(addr, stat.getCzxid());
+                hubs.put(addr, new HubStats(info, load));
             } catch (KeeperException ke) {
                 LOG.warn("Couldn't read hub data from ZooKeeper", ke);
             } catch (InterruptedException ie) {
                 LOG.warn("Interrupted during read", ie);
             }
-            hubs.put(new HedwigSocketAddress(host), load);
         }
         return hubs;
     }
@@ -204,19 +320,8 @@ public class HedwigAdmin {
      * @return list of topics
      * @throws Exception
      */
-    public List<String> getTopics() throws Exception {
-        return zk.getChildren(serverConf.getZkTopicsPrefix(new StringBuilder()).toString(),
false);
-    }
-
-    /**
-     * Return the znode path of owner of a topic
-     *
-     * @param topic
-     *            Topic name
-     * @return znode path of owner of a topic
-     */
-    String hubPath(ByteString topic) {
-        return serverConf.getZkTopicPath(new StringBuilder(), topic).append("/hub").toString();
+    public Iterator<ByteString> getTopics() throws Exception {
+        return mmFactory.getTopics();
     }
 
     /**
@@ -227,28 +332,30 @@ public class HedwigAdmin {
      * @return the address of the owner of a topic
      * @throws Exception
      */
-    public HedwigSocketAddress getTopicOwner(ByteString topic) throws Exception {
-        Stat stat = new Stat();
-        byte[] owner = null;
-        try {
-            owner = zk.getData(hubPath(topic), false, stat);
-        } catch (KeeperException.NoNodeException nne) {
-        }
-        if (null == owner) {
-            return null;
+    public HubInfo getTopicOwner(ByteString topic) throws Exception {
+        final SyncObj<HubInfo> syncObj = new SyncObj<HubInfo>();
+        tom.readOwnerInfo(topic, new Callback<Versioned<HubInfo>>() {
+            @Override
+            public void operationFinished(Object ctx, Versioned<HubInfo> result) {
+                if (null == result) {
+                    syncObj.success(null);
+                } else {
+                    syncObj.success(result.getValue());
+                }
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException pse) {
+                syncObj.fail(pse);
+            }
+        }, syncObj);
+
+        syncObj.block();
+
+        if (!syncObj.isSuccess()) {
+            throw syncObj.exception;
         }
-        return new HedwigSocketAddress(new String(owner));
-    }
 
-    /**
-     * Return the znode path to store ledgers info of a topic
-     *
-     * @param topic
-     *          Topic name
-     * @return znode path to store ledgers info of a topic
-     */
-    String ledgersPath(ByteString topic) {
-        return serverConf.getZkTopicPath(new StringBuilder(), topic).append("/ledgers").toString();
+        return syncObj.value;
     }
 
     /**
@@ -260,15 +367,29 @@ public class HedwigAdmin {
      * @throws Exception
      */
     public List<LedgerRange> getTopicLedgers(ByteString topic) throws Exception {
-        LedgerRanges ranges = null;
-        try {
-            Stat stat = new Stat();
-            byte[] ledgersData = zk.getData(ledgersPath(topic), false, stat);
-            if (null != ledgersData) {
-                ranges = LedgerRanges.parseFrom(ledgersData);
+        final SyncObj<LedgerRanges> syncObj = new SyncObj<LedgerRanges>();
+        tpm.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>()
{
+            @Override
+            public void operationFinished(Object ctx, Versioned<LedgerRanges> result)
{
+                if (null == result) {
+                    syncObj.success(null);
+                } else {
+                    syncObj.success(result.getValue());
+                }
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException pse) {
+                syncObj.fail(pse);
             }
-        } catch (KeeperException.NoNodeException nne) {
+        }, syncObj);
+
+        syncObj.block();
+
+        if (!syncObj.isSuccess()) {
+            throw syncObj.exception;
         }
+
+        LedgerRanges ranges = syncObj.value;
         if (null == ranges) {
             return null;
         }
@@ -318,32 +439,6 @@ public class HedwigAdmin {
     }
 
     /**
-     * Return the znode path store all the subscribers of a topic.
-     *
-     * @param sb
-     *          String builder to hold the znode path
-     * @param topic
-     *          Topic name
-     */
-    private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString topic) {
-        return serverConf.getZkTopicPath(sb, topic).append("/subscribers");
-    }
-
-    /**
-     * Return the znode path of a subscriber of a topic.
-     *
-     * @param topic
-     *          Topic name
-     * @param subscriber
-     *          Subscriber name
-     */
-
-    private String topicSubscriberPath(ByteString topic, ByteString subscriber) {
-        return topicSubscribersPath(new StringBuilder(), topic).append("/")
-               .append(subscriber.toStringUtf8()).toString();
-    }
-
-    /**
      * Return subscriptions of a topic
      *
      * @param topic
@@ -355,22 +450,36 @@ public class HedwigAdmin {
         throws Exception {
         Map<ByteString, SubscriptionState> states =
             new HashMap<ByteString, SubscriptionState>();
-        try {
-            String subsPath = topicSubscribersPath(new StringBuilder(), topic).toString();
-            List<String> children = zk.getChildren(subsPath, false);
-            for (String child : children) {
-                ByteString subscriberId = ByteString.copyFromUtf8(child);
-                String subPath = topicSubscriberPath(topic, subscriberId);
-                Stat stat = new Stat();
-                byte[] subData = zk.getData(subPath, false, stat);
-                if (null == subData) {
-                    continue;
-                }
-                SubscriptionState state = SubscriptionState.parseFrom(subData);
-                states.put(subscriberId, state);
+
+        final SyncObj<Map<ByteString, InMemorySubscriptionState>> syncObj =
+            new SyncObj<Map<ByteString, InMemorySubscriptionState>>();
+        sdm.readSubscriptions(topic, new Callback<Map<ByteString, InMemorySubscriptionState>>()
{
+            @Override
+            public void operationFinished(Object ctx, Map<ByteString, InMemorySubscriptionState>
result) {
+                syncObj.success(result);
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException pse) {
+                syncObj.fail(pse);
             }
-        } catch (KeeperException.NoNodeException nne) {
+        }, syncObj);
+
+        syncObj.block();
+
+        if (!syncObj.isSuccess()) {
+            throw syncObj.exception;
+        }
+
+        Map<ByteString, InMemorySubscriptionState> subStats = syncObj.value;
+
+        if (null == subStats) {
+            return states;
+        }
+
+        for (Map.Entry<ByteString, InMemorySubscriptionState> entry : subStats.entrySet())
{
+            states.put(entry.getKey(), entry.getValue().getSubscriptionState());
         }
+
         return states;
     }
 
@@ -385,16 +494,24 @@ public class HedwigAdmin {
      * @throws Exception
      */
     public SubscriptionState getSubscription(ByteString topic, ByteString subscriber) throws
Exception {
-        String subPath = topicSubscriberPath(topic, subscriber);
-        Stat stat = new Stat();
-        byte[] subData = null;
-        try {
-            subData = zk.getData(subPath, false, stat);
-        } catch (KeeperException.NoNodeException nne) {
-        }
-        if (null == subData) {
-            return null;
+        final SyncObj<SubscriptionState> syncObj = new SyncObj<SubscriptionState>();
+        sdm.readSubscriptionState(topic, subscriber, new Callback<SubscriptionState>()
{
+            @Override
+            public void operationFinished(Object ctx, SubscriptionState result) {
+                syncObj.success(result);
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException pse) {
+                syncObj.fail(pse);
+            }
+        }, syncObj);
+
+        syncObj.block();
+
+        if (!syncObj.isSuccess()) {
+            throw syncObj.exception;
         }
-        return SubscriptionState.parseFrom(subData);
+
+        return syncObj.value;
     }
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java?rev=1374265&r1=1374264&r2=1374265&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
Fri Aug 17 13:56:32 2012
@@ -18,19 +18,19 @@
 
 package org.apache.hedwig.admin.console;
 
+import jline.ConsoleReader;
+import jline.History;
+import jline.Terminal;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -46,26 +46,22 @@ import org.apache.hedwig.client.api.Subs
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.HedwigClient;
 import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
 import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.topics.HubInfo;
 import org.apache.hedwig.util.Callback;
 import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 import static org.apache.hedwig.admin.console.HedwigCommands.*;
 import static org.apache.hedwig.admin.console.HedwigCommands.COMMAND.*;
@@ -81,6 +77,8 @@ public class HedwigConsole {
     // history file name
     static final String HW_HISTORY_FILE = ".hw_history";
 
+    static final char[] CONTINUE_OR_QUIT = new char[] { 'Q', 'q', '\n' };
+
     protected MyCommandOptions cl = new MyCommandOptions();
     protected HashMap<Integer, String> history = new LinkedHashMap<Integer, String>();
     protected int commandCount = 0;
@@ -88,6 +86,7 @@ public class HedwigConsole {
     protected Map<String, MyCommand> myCommands;
 
     protected boolean inConsole = true;
+    protected ConsoleReader console = null;
 
     protected HedwigAdmin admin;
     protected HedwigClient hubClient;
@@ -95,6 +94,7 @@ public class HedwigConsole {
     protected Subscriber subscriber;
     protected ConsoleMessageHandler consoleHandler =
             new ConsoleMessageHandler();
+    protected Terminal terminal;
 
     protected String myRegion;
 
@@ -524,6 +524,8 @@ public class HedwigConsole {
 
     class ShowCmd implements MyCommand {
 
+        static final int MAX_TOPICS_PER_SHOW = 100;
+
         @Override
         public boolean runCmd(String[] args) throws Exception {
             if (args.length < 2) {
@@ -551,18 +553,37 @@ public class HedwigConsole {
         }
 
         protected void showHubs() throws Exception {
-            Map<HedwigSocketAddress, Integer> hubs = admin.getAvailableHubs();
+            Map<HedwigSocketAddress, HedwigAdmin.HubStats> hubs = admin.getAvailableHubs();
             System.out.println("Available Hub Servers:");
-            for (Map.Entry<HedwigSocketAddress, Integer> entry : hubs.entrySet()) {
+            for (Map.Entry<HedwigSocketAddress, HedwigAdmin.HubStats> entry : hubs.entrySet())
{
                 System.out.println("\t" + entry.getKey() + " :\t" + entry.getValue());
             }
         }
 
         protected void showTopics() throws Exception {
-            List<String> topics = admin.getTopics();
+            List<String> topics = new ArrayList<String>();
+            Iterator<ByteString> iter = admin.getTopics();
+
             System.out.println("Topic List:");
-            System.out.println(topics);
+            boolean stop = false;
+            while (iter.hasNext()) {
+                if (topics.size() >= MAX_TOPICS_PER_SHOW) {
+                    System.out.println(topics);
+                    topics.clear();
+                    stop = !continueOrQuit();
+                    if (stop) {
+                        break;
+                    }
+                }
+                ByteString t = iter.next();
+                topics.add(t.toStringUtf8());
+            }
+            if (!stop) {
+                System.out.println(topics);
+            }
         }
+
+        
         
     }
 
@@ -582,13 +603,14 @@ public class HedwigConsole {
 
         protected boolean describeTopic(String topic) throws Exception {
             ByteString btopic = ByteString.copyFromUtf8(topic);
-            HedwigSocketAddress owner = admin.getTopicOwner(btopic);
+            HubInfo owner = admin.getTopicOwner(btopic);
             List<LedgerRange> ranges = admin.getTopicLedgers(btopic);
             Map<ByteString, SubscriptionState> states = admin.getTopicSubscriptions(btopic);
 
             System.out.println("===== Topic Information : " + topic + " =====");
             System.out.println();
-            System.out.println("Owner : " + (owner == null ? "NULL" : owner));
+            System.out.println("Owner : " + (owner == null ? "NULL" :
+                               owner.toString().trim().replaceAll("\n", ", ")));
             System.out.println();
 
             // print ledgers
@@ -779,6 +801,8 @@ public class HedwigConsole {
      * @throws InterruptedException 
      */
     public HedwigConsole(String[] args) throws IOException, InterruptedException {
+        // Setup Terminal
+        terminal = Terminal.setupTerminal();
         HedwigCommands.init();
         cl.parseOptions(args);
 
@@ -815,8 +839,6 @@ public class HedwigConsole {
             }
         }
 
-
-
         printMessage("Connecting to zookeeper/bookkeeper using HedwigAdmin");
         try {
             admin = new HedwigAdmin(bkClientConf, hubServerConf);
@@ -844,6 +866,23 @@ public class HedwigConsole {
         return sb.toString();
     }
 
+    protected boolean continueOrQuit() throws IOException {
+        System.out.println("Press <Return> for more, or Q to cancel ...");
+        int ch;
+        if (null != console) {
+            ch = console.readCharacter(CONTINUE_OR_QUIT);
+        } else {
+            do {
+                ch = terminal.readCharacter(System.in);
+            } while (ch != 'q' && ch != 'Q' && ch != '\n');
+        }
+        if (ch == 'q' ||
+            ch == 'Q') {
+            return false;
+        }
+        return true;
+    }
+
     protected void addToHistory(int i, String cmd) {
         history.put(i, cmd);
     }
@@ -908,122 +947,38 @@ public class HedwigConsole {
         myCommands = buildMyCommands();
         if (cl.getCommand() == null) {
             System.out.println("Welcome to Hedwig!");
+            System.out.println("JLine support is enabled");
 
-            boolean jlinemissing = false;
-            // only use jline if it's in the classpath
-            try {
-                Class consoleC = Class.forName("jline.ConsoleReader");
-                Class completorC =
-                    Class.forName("org.apache.hedwig.admin.console.JLineHedwigCompletor");
-
-                System.out.println("JLine support is enabled");
-
-                Object console =
-                    consoleC.getConstructor().newInstance();
-
-                Object completor =
-                    completorC.getConstructor(HedwigAdmin.class).newInstance(admin);
-                Method addCompletor = consoleC.getMethod("addCompletor",
-                        Class.forName("jline.Completor"));
-                addCompletor.invoke(console, completor);
-
-                // load history file
-                boolean historyEnabled = false;
-                Object history = null;
-                Method addHistory = null;
-                // Method flushHistory = null;
-                try {
-                    Class historyC = Class.forName("jline.History");
-                    history = historyC.getConstructor().newInstance();
-
-                    File file = new File(System.getProperty("hw.history",
-                                         new File(System.getProperty("user.home"), HW_HISTORY_FILE).toString()));
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("History file is " + file.toString());
-                    }
-                    Method setHistoryFile = historyC.getMethod("setHistoryFile", File.class);
-                    setHistoryFile.invoke(history, file);
-
-                    // set history to console reader
-                    Method setHistory = consoleC.getMethod("setHistory", historyC);
-                    setHistory.invoke(console, history);
-
-                    // load history from history file
-                    Method moveToFirstEntry = historyC.getMethod("moveToFirstEntry");
-                    moveToFirstEntry.invoke(history);
-
-                    addHistory = historyC.getMethod("addToHistory", String.class);
-                    // flushHistory = historyC.getMethod("flushBuffer");
-
-                    Method nextEntry = historyC.getMethod("next");
-                    Method current = historyC.getMethod("current");
-                    while ((Boolean)(nextEntry.invoke(history))) {
-                        String entry = (String)current.invoke(history);
-                        if (!entry.equals("")) {
-                            addToHistory(commandCount, entry);
-                        }
-                        commandCount++;
-                    }
-
-                    historyEnabled = true;
-                    System.out.println("JLine history support is enabled");
-                } catch (ClassNotFoundException e) {
-                    System.out.println("JLine history support is disabled");
-                    LOG.debug("JLine history disabled with exception", e);
-                    historyEnabled = false;
-                } catch (NoSuchMethodException e) {
-                    System.out.println("JLine history support is disabled");
-                    LOG.debug("JLine history disabled with exception", e);
-                    historyEnabled = false;
-                } catch (InvocationTargetException e) {
-                    System.out.println("JLine history support is disabled");
-                    LOG.debug("JLine history disabled with exception", e);
-                    historyEnabled = false;
-                } catch (IllegalAccessException e) {
-                    System.out.println("JLine history support is disabled");
-                    LOG.debug("JLine history disabled with exception", e);
-                    historyEnabled = false;
-                } catch (InstantiationException e) {
-                    System.out.println("JLine history support is disabled");
-                    LOG.debug("JLine history disabled with exception", e);
-                    historyEnabled = false;
-                }
-
-                String line;
-                Method readLine = consoleC.getMethod("readLine", String.class);
-                while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
-                    executeLine(line);
-                    if (historyEnabled) {
-                        addHistory.invoke(history, line);
-                        // flushHistory.invoke(history);
-                    }
-                }
-            } catch (ClassNotFoundException e) {
-                LOG.debug("Unable to start jline", e);
-                jlinemissing = true;
-            } catch (NoSuchMethodException e) {
-                LOG.debug("Unable to start jline", e);
-                jlinemissing = true;
-            } catch (InvocationTargetException e) {
-                LOG.debug("Unable to start jline", e);
-                jlinemissing = true;
-            } catch (IllegalAccessException e) {
-                LOG.debug("Unable to start jline", e);
-                jlinemissing = true;
-            } catch (InstantiationException e) {
-                LOG.debug("Unable to start jline", e);
-                jlinemissing = true;
-            }
-
-            if (jlinemissing) {
-                System.out.println("JLine support is disabled");
-                BufferedReader br =
-                    new BufferedReader(new InputStreamReader(System.in));
-
-                String line;
-                while ((line = br.readLine()) != null) {
-                    executeLine(line);
-                }
+            console = new ConsoleReader();
+            JLineHedwigCompletor completor = new JLineHedwigCompletor(admin);
+            console.addCompletor(completor);
+
+            // load history file
+            History history = new History();
+            File file = new File(System.getProperty("hw.history",
+                                 new File(System.getProperty("user.home"), HW_HISTORY_FILE).toString()));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("History file is " + file.toString());
+            }
+            history.setHistoryFile(file);
+            // set history to console reader
+            console.setHistory(history);
+            // load history from history file
+            history.moveToFirstEntry();
+
+            while (history.next()) {
+                String entry = history.current();
+                if (!entry.equals("")) {
+                    addToHistory(commandCount, entry);
+                }
+                commandCount++;
+            }
+            System.out.println("JLine history support is enabled");
+
+            String line;
+            while ((line = console.readLine(getPrompt())) != null) {
+                executeLine(line);
+                history.addToHistory(line);
             }
         }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java?rev=1374265&r1=1374264&r2=1374265&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
Fri Aug 17 13:56:32 2012
@@ -18,11 +18,14 @@
 
 package org.apache.hedwig.admin.console;
 
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.zookeeper.KeeperException;
 import org.apache.hedwig.admin.HedwigAdmin;
 
+import com.google.protobuf.ByteString;
+
 import jline.Completor;
 
 import static org.apache.hedwig.admin.console.HedwigCommands.*;
@@ -31,6 +34,9 @@ import static org.apache.hedwig.admin.co
  * A jline completor for hedwig console
  */
 public class JLineHedwigCompletor implements Completor {
+    // for topic completion
+    static final int MAX_TOPICS_TO_SEARCH = 1000;
+
     private HedwigAdmin admin;
 
     public JLineHedwigCompletor(HedwigAdmin admin) {
@@ -78,11 +84,14 @@ public class JLineHedwigCompletor implem
 
     private int completeTopic(String buffer, String token, List<String> candidates)
{
         try {
-            List<String> children = admin.getTopics();
-            for (String child : children) {
+            Iterator<ByteString> children = admin.getTopics();
+            int i = 0;
+            while (children.hasNext() && i <= MAX_TOPICS_TO_SEARCH) {
+                String child = children.next().toStringUtf8();
                 if (child.startsWith(token)) {
                     candidates.add(child);
                 }
+                ++i;
             }
         } catch (Exception e) {
             return buffer.length();

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java?rev=1374265&r1=1374264&r2=1374265&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java
Fri Aug 17 13:56:32 2012
@@ -18,6 +18,7 @@
 package org.apache.hedwig.server.meta;
 
 import java.io.IOException;
+import java.util.Iterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,6 +29,8 @@ import org.apache.hedwig.server.common.S
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 
+import com.google.protobuf.ByteString;
+
 /**
  * Metadata Manager used to manage metadata used by hedwig.
  */
@@ -67,6 +70,15 @@ public abstract class MetadataManagerFac
     public abstract void shutdown() throws IOException;
 
     /**
+     * Iterate over the topics list.
+     * Used by HedwigConsole to list available topics.
+     *
+     * @return iterator of the topics list.
+     * @throws IOException
+     */
+    public abstract Iterator<ByteString> getTopics() throws IOException;
+
+    /**
      * Create topic persistence manager.
      *
      * @return topic persistence manager

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java?rev=1374265&r1=1374264&r2=1374265&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
Fri Aug 17 13:56:32 2012
@@ -20,6 +20,7 @@ package org.apache.hedwig.server.meta;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -86,6 +87,34 @@ public class ZkMetadataManagerFactory ex
     }
 
     @Override
+    public Iterator<ByteString> getTopics() throws IOException {
+        List<String> topics;
+        try {
+            topics = zk.getChildren(cfg.getZkTopicsPrefix(new StringBuilder()).toString(),
false);
+        } catch (KeeperException ke) {
+            throw new IOException("Failed to get topics list : ", ke);
+        } catch (InterruptedException ie) {
+            throw new IOException("Interrupted on getting topics list : ", ie);
+        }
+        final Iterator<String> iter = topics.iterator();
+        return new Iterator<ByteString>() {
+            @Override
+            public boolean hasNext() {
+                return iter.hasNext();
+            }
+            @Override
+            public ByteString next() {
+                String t = iter.next();
+                return ByteString.copyFromUtf8(t);
+            }
+            @Override
+            public void remove() {
+                iter.remove();
+            }
+        };
+    }
+
+    @Override
     public TopicPersistenceManager newTopicPersistenceManager() {
         return new ZkTopicPersistenceManagerImpl(cfg, zk);
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java?rev=1374265&r1=1374264&r2=1374265&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
Fri Aug 17 13:56:32 2012
@@ -32,7 +32,7 @@ import com.google.protobuf.TextFormat;
 /**
  * This class encapsulates metrics for determining the load on a hub server.
  */
-class HubLoad implements Comparable<HubLoad> {
+public class HubLoad implements Comparable<HubLoad> {
 
     public static final HubLoad MAX_LOAD = new HubLoad(Long.MAX_VALUE);
     public static final HubLoad MIN_LOAD = new HubLoad(0);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java?rev=1374265&r1=1374264&r2=1374265&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManagerFactory.java
Fri Aug 17 13:56:32 2012
@@ -29,9 +29,12 @@ import java.io.IOException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.CountDownLatch;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import com.google.protobuf.ByteString;
+
 import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
@@ -78,6 +81,10 @@ public class TestMetadataManagerFactory 
 
         public void shutdown() {}
 
+        public Iterator<ByteString> getTopics() {
+            return null;
+        }
+
         public TopicPersistenceManager newTopicPersistenceManager() {
             return null;
         }



Mime
View raw message