zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1241858 [1/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/ hedwig-server/bin/ hedwig-server/src/main/java/org/apache/hedwig/admin/ hedwig-server/src/main/java/org/apache/hedwig/admin/console/
Date Wed, 08 Feb 2012 10:49:11 GMT
Author: ivank
Date: Wed Feb  8 10:49:10 2012
New Revision: 1241858

URL: http://svn.apache.org/viewvc?rev=1241858&view=rev
Log:
BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/bin/hedwig
    zookeeper/bookkeeper/trunk/hedwig-server/pom.xml

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1241858&r1=1241857&r2=1241858&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Feb  8 10:49:10 2012
@@ -37,7 +37,6 @@ Trunk (unreleased changes)
         BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)
 
         BOOKKEEPER-133: Hub server should update subscription state to zookeeper when losing topic or shutting down (Sijie Gou via ivank)
-
     IMPROVEMENTS:
 
       bookkeeper-server/
@@ -48,6 +47,9 @@ Trunk (unreleased changes)
 
 	BOOKKEEPER-157:	For small packets, increasing number of bookies actually degrades performance. (ivank via fpj)
 
+      hedwig-server/
+
+        BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)
 
 Release 4.0.0 - 2011-11-30
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/bin/hedwig
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/bin/hedwig?rev=1241858&r1=1241857&r2=1241858&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/bin/hedwig (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/bin/hedwig Wed Feb  8 10:49:10 2012
@@ -34,6 +34,8 @@ HW_HOME=`cd $BINDIR/..;pwd`
 
 DEFAULT_CONF=$HW_HOME/conf/hw_server.conf
 
+. $HW_HOME/conf/hwenv.sh
+
 RELEASE_JAR=`ls $HW_HOME/hedwig-server-*.jar 2> /dev/null | tail -1`
 if [ $? == 0 ]; then
     HEDWIG_JAR=$RELEASE_JAR
@@ -78,6 +80,7 @@ hedwig_help() {
 Usage: hedwig <command>
 where command is one of:
     server           Run the hedwig server
+    console          Run the hedwig admin console
     help             This help message
 
 or command is the full name of a class with a defined main() method.
@@ -118,6 +121,8 @@ OPTS="$OPTS -Djava.net.preferIPv4Stack=t
 
 if [ $COMMAND == "server" ]; then
     exec java $OPTS org.apache.hedwig.server.netty.PubSubServer $HEDWIG_SERVER_CONF $@
+elif [ $COMMAND == "console" ]; then
+    exec java $OPTS org.apache.hedwig.admin.console.HedwigConsole -server-cfg $HEDWIG_SERVER_CONF $@
 elif [ $COMMAND == "help" ]; then
     hedwig_help;
 else

Modified: zookeeper/bookkeeper/trunk/hedwig-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/pom.xml?rev=1241858&r1=1241857&r2=1241858&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/pom.xml Wed Feb  8 10:49:10 2012
@@ -110,6 +110,11 @@
 	</exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>jline</groupId>
+      <artifactId>jline</artifactId>
+      <version>0.9.94</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java?rev=1241858&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java Wed Feb  8 10:49:10 2012
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.admin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Hedwig Admin
+ */
+public class HedwigAdmin {
+    static final Logger LOG = LoggerFactory.getLogger(HedwigAdmin.class);
+
+    // NOTE: now it is fixed passwd used in hedwig
+    static byte[] passwd = "sillysecret".getBytes();
+
+    protected ZooKeeper zk;
+    protected BookKeeper bk;
+    // hub configurations
+    protected ServerConfiguration serverConf;
+    // bookkeeper configurations
+    protected ClientConfiguration bkClientConf;
+
+    // Empty watcher
+    private class MyWatcher implements Watcher {
+        public void process(WatchedEvent event) {
+        }
+    }
+
+    /**
+     * Hedwig Admin Constructor
+     *
+     * @param bkConf
+     *          BookKeeper Client Configuration.
+     * @param hubConf
+     *          Hub Server Configuration.
+     * @throws Exception
+     */
+    public HedwigAdmin(ClientConfiguration bkConf, ServerConfiguration hubConf) throws Exception {
+        this.serverConf = hubConf;
+        this.bkClientConf = bkConf;
+
+        // connect to zookeeper
+        zk = new ZooKeeper(bkClientConf.getZkServers(), bkClientConf.getZkTimeout(), new MyWatcher());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Connecting to zookeeper " + bkClientConf.getZkServers() + ", timeout = "
+                    + bkClientConf.getZkTimeout());
+        }
+
+        // connect to bookkeeper
+        bk = new BookKeeper(bkClientConf, zk);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Connecting to bookkeeper");
+        }
+    }
+
+    /**
+     * Close the hedwig admin.
+     *
+     * @throws Exception
+     */
+    public void close() throws Exception {
+        bk.close();
+        zk.close();
+    }
+
+    /**
+     * Return zookeeper handle used in hedwig admin.
+     *
+     * @return zookeeper handle
+     */
+    public ZooKeeper getZkHandle() {
+        return zk;
+    }
+
+    /**
+     * Return bookkeeper handle used in hedwig admin.
+     *
+     * @return bookkeeper handle
+     */
+    public BookKeeper getBkHandle() {
+        return bk;
+    }
+
+    /**
+     * Return hub server configuration used in hedwig admin
+     *
+     * @return hub server configuration
+     */
+    public ServerConfiguration getHubServerConf() {
+        return serverConf;
+    }
+
+    /**
+     * Return bookeeper passwd used in hedwig admin
+     *
+     * @return bookeeper passwd
+     */
+    public byte[] getBkPasswd() {
+        return passwd;
+    }
+
+    /**
+     * Return digest type used in hedwig admin
+     *
+     * @return bookeeper digest type
+     */
+    public DigestType getBkDigestType() {
+        return DigestType.CRC32;
+    }
+
+    /**
+     * Dose topic exist?
+     *
+     * @param topic
+     *            Topic name
+     * @return whether topic exists or not?
+     * @throws Exception
+     */
+    public boolean hasTopic(ByteString topic) throws Exception {
+        String topicPath = serverConf.getZkTopicPath(new StringBuilder(), topic).toString();
+        Stat stat = zk.exists(topicPath, false);
+        return null != stat;
+    }
+
+    /**
+     * Get available hubs.
+     *
+     * @return available hubs and their loads
+     * @throws Exception
+     */
+    public Map<HedwigSocketAddress, Integer> getAvailableHubs() throws Exception {
+        String zkHubsPath = serverConf.getZkHostsPrefix(new StringBuilder()).toString();
+        Map<HedwigSocketAddress, Integer> hubs =
+            new HashMap<HedwigSocketAddress, Integer>();
+        List<String> hosts = zk.getChildren(zkHubsPath, false);
+        for (String host : hosts) {
+            String zkHubPath = serverConf.getZkHostsPrefix(new StringBuilder())
+                                         .append("/").append(host).toString();
+            int load = 0;
+            try {
+                Stat stat = new Stat();
+                byte[] data = zk.getData(zkHubPath, false, stat);
+                if (data != null) {
+                    load = Integer.parseInt(new String(data));
+                }
+            } catch (Exception e) {
+                // igore
+            }
+            hubs.put(new HedwigSocketAddress(host), load);
+        }
+        return hubs;
+    }
+
+    /**
+     * Get list of topics
+     *
+     * @return list of topics
+     * @throws Exception
+     */
+    public List<String> getTopics() throws Exception {
+        return zk.getChildren(serverConf.getZkTopicsPrefix(new StringBuilder()).toString(), false);
+    }
+
+    /**
+     * Return the znode path of owner of a topic
+     *
+     * @param topic
+     *            Topic name
+     * @return znode path of owner of a topic
+     */
+    String hubPath(ByteString topic) {
+        return serverConf.getZkTopicPath(new StringBuilder(), topic).append("/hub").toString();
+    }
+
+    /**
+     * Return the topic owner of a topic
+     *
+     * @param topic
+     *            Topic name
+     * @return the address of the owner of a topic
+     * @throws Exception
+     */
+    public HedwigSocketAddress getTopicOwner(ByteString topic) throws Exception {
+        Stat stat = new Stat();
+        byte[] owner = null;
+        try {
+            owner = zk.getData(hubPath(topic), false, stat);
+        } catch (KeeperException.NoNodeException nne) {
+        }
+        if (null == owner) {
+            return null;
+        }
+        return new HedwigSocketAddress(new String(owner));
+    }
+
+    /**
+     * Return the znode path to store ledgers info of a topic
+     *
+     * @param topic
+     *          Topic name
+     * @return znode path to store ledgers info of a topic
+     */
+    String ledgersPath(ByteString topic) {
+        return serverConf.getZkTopicPath(new StringBuilder(), topic).append("/ledgers").toString();
+    }
+
+    /**
+     * Return the ledger range forming the topic
+     *
+     * @param topic
+     *          Topic name
+     * @return ledger ranges forming the topic
+     * @throws Exception
+     */
+    public List<LedgerRange> getTopicLedgers(ByteString topic) throws Exception {
+        LedgerRanges ranges = null;
+        try {
+            Stat stat = new Stat();
+            byte[] ledgersData = zk.getData(ledgersPath(topic), false, stat);
+            if (null != ledgersData) {
+                ranges = LedgerRanges.parseFrom(ledgersData);
+            }
+        } catch (KeeperException.NoNodeException nne) {
+        }
+        if (null == ranges) {
+            return null;
+        }
+        List<LedgerRange> lrs = ranges.getRangesList();
+        if (lrs.isEmpty()) {
+            return lrs;
+        }
+        // try to check last ledger (it may still open)
+        LedgerRange lastRange = lrs.get(lrs.size() - 1);
+        if (lastRange.hasEndSeqIdIncluded()) {
+            return lrs;
+        }
+        // read last confirmed of the opened ledger
+        try {
+            List<LedgerRange> newLrs = new ArrayList<LedgerRange>();
+            newLrs.addAll(lrs);
+            lrs = newLrs;
+            MessageSeqId lastSeqId;
+            if (lrs.size() == 1) {
+                lastSeqId = MessageSeqId.newBuilder().setLocalComponent(1).build();
+            } else {
+                lastSeqId = lrs.get(lrs.size() - 2).getEndSeqIdIncluded();
+            }
+            LedgerRange newLastRange = refreshLastLedgerRange(lastSeqId, lastRange);
+            lrs.set(lrs.size() - 1, newLastRange);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return lrs;
+    }
+
+    /**
+     * Refresh last ledger range to get lastConfirmed entry, which make it available to read
+     *
+     * @param lastSeqId
+     *            Last sequence id of previous ledger
+     * @param oldRange
+     *            Ledger range to set lastConfirmed entry
+     */
+    LedgerRange refreshLastLedgerRange(MessageSeqId lastSeqId, LedgerRange oldRange)
+        throws BKException, KeeperException, InterruptedException {
+        LedgerHandle lh = bk.openLedgerNoRecovery(oldRange.getLedgerId(), DigestType.CRC32, passwd);
+        long lastConfirmed = lh.readLastConfirmed();
+        MessageSeqId newSeqId = MessageSeqId.newBuilder().mergeFrom(lastSeqId)
+                                .setLocalComponent(lastSeqId.getLocalComponent() + lastConfirmed).build();
+        return LedgerRange.newBuilder().mergeFrom(oldRange).setEndSeqIdIncluded(newSeqId).build();
+    }
+
+    /**
+     * Return the znode path store all the subscribers of a topic.
+     *
+     * @param sb
+     *          String builder to hold the znode path
+     * @param topic
+     *          Topic name
+     */
+    private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString topic) {
+        return serverConf.getZkTopicPath(sb, topic).append("/subscribers");
+    }
+
+    /**
+     * Return the znode path of a subscriber of a topic.
+     *
+     * @param topic
+     *          Topic name
+     * @param subscriber
+     *          Subscriber name
+     */
+
+    private String topicSubscriberPath(ByteString topic, ByteString subscriber) {
+        return topicSubscribersPath(new StringBuilder(), topic).append("/")
+               .append(subscriber.toStringUtf8()).toString();
+    }
+
+    /**
+     * Return subscriptions of a topic
+     *
+     * @param topic
+     *          Topic name
+     * @return subscriptions of a topic
+     * @throws Exception
+     */
+    public Map<ByteString, SubscriptionState> getTopicSubscriptions(ByteString topic)
+        throws Exception {
+        Map<ByteString, SubscriptionState> states =
+            new HashMap<ByteString, SubscriptionState>();
+        try {
+            String subsPath = topicSubscribersPath(new StringBuilder(), topic).toString();
+            List<String> children = zk.getChildren(subsPath, false);
+            for (String child : children) {
+                ByteString subscriberId = ByteString.copyFromUtf8(child);
+                String subPath = topicSubscriberPath(topic, subscriberId);
+                Stat stat = new Stat();
+                byte[] subData = zk.getData(subPath, false, stat);
+                if (null == subData) {
+                    continue;
+                }
+                SubscriptionState state = SubscriptionState.parseFrom(subData);
+                states.put(subscriberId, state);
+            }
+        } catch (KeeperException.NoNodeException nne) {
+        }
+        return states;
+    }
+
+    /**
+     * Return subscription state of a subscriber of topic
+     *
+     * @param topic
+     *          Topic name
+     * @param subscriber
+     *          Subscriber name
+     * @return subscription state
+     * @throws Exception
+     */
+    public SubscriptionState getSubscription(ByteString topic, ByteString subscriber) throws Exception {
+        String subPath = topicSubscriberPath(topic, subscriber);
+        Stat stat = new Stat();
+        byte[] subData = null;
+        try {
+            subData = zk.getData(subPath, false, stat);
+        } catch (KeeperException.NoNodeException nne) {
+        }
+        if (null == subData) {
+            return null;
+        }
+        return SubscriptionState.parseFrom(subData);
+    }
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java?rev=1241858&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java Wed Feb  8 10:49:10 2012
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.admin.console;
+
+import java.util.Map;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.LinkedHashMap;
+
+/**
+ * List all the available commands
+ */
+public final class HedwigCommands {
+
+    static final String[] EMPTY_ARRAY = new String[0];
+
+    //
+    // List all commands used to play with hedwig
+    //
+
+    /* PUB : publish a message to hedwig */
+    static final String PUB = "pub";
+    static final String PUB_DESC = "Publish a message to a topic in Hedwig";
+    static final String[] PUB_USAGE = new String[] {
+        "usage: pub {topic} {message}",
+        "",
+        "  {topic}   : topic name.",
+        "              any printable string without spaces.",
+        "  {message} : message body.",
+        "              remaining arguments are used as message body to publish.",
+    };
+
+    /* SUB : subscriber a topic in hedwig for a specified subscriber */
+    static final String SUB = "sub";
+    static final String SUB_DESC = "Subscribe a topic for a specified subscriber";
+    static final String[] SUB_USAGE = new String[] {
+        "usage: sub {topic} {subscriber} [mode]",
+        "",
+        "  {topic}      : topic name.",
+        "                 any printable string without spaces.",
+        "  {subscriber} : subscriber id.",
+        "                 any printable string without spaces.",
+        "  [mode]       : mode to create subscription.",
+        "  [receive]    : bool. whether to start delivery to receive messages.",
+        "",
+        "  available modes: (default value is 1)",
+        "    0 = CREATE: create subscription.",
+        "                if the subscription is exsited, it will fail.",
+        "    1 = ATTACH: attach to exsited subscription.",
+        "                if the subscription is not existed, it will faile.",
+        "    2 = CREATE_OR_ATTACH:",
+        "                attach to subscription, if not existed create one."
+    };
+
+    /* CLOSESUB : close the subscription of a subscriber for a topic */
+    static final String CLOSESUB = "closesub";
+    static final String CLOSESUB_DESC = "Close subscription of a subscriber to a specified topic";
+    static final String[] CLOSESUB_USAGE = new String[] {
+        "usage: closesub {topic} {subscriber}",
+        "",
+        "  {topic}      : topic name.",
+        "                 any printable string without spaces.",
+        "  {subscriber} : subscriber id.",
+        "                 any printable string without spaces.",
+        "",
+        " NOTE: this command just cleanup subscription states on client side.",
+        "       You can try UNSUB to clean subscription states on server side.",
+    };
+
+    /* UNSUB: unsubscribe of a subscriber to a topic */
+    static final String UNSUB = "unsub";
+    static final String UNSUB_DESC = "Unsubscribe a topic for a subscriber";
+    static final String[] UNSUB_USAGE = new String[] {
+        "usage: unsub {topic} {subscriber}",
+        "",
+        "  {topic}      : topic name.",
+        "                 any printable string without spaces.",
+        "  {subscriber} : subscriber id.",
+        "                 any printable string without spaces.",
+        "",
+        " NOTE: this command will cleanup subscription states on server side.",
+        "       You can try CLOSESUB to just clean subscription states on client side.",
+    };
+
+    static final String RMSUB = "rmsub";
+    static final String RMSUB_DESC = "Remove subscriptions for topics";
+    static final String[] RMSUB_USAGE = new String[] {
+        "usage: rmsub {topic_prefix} {start_topic} {end_topic} {subscriber_prefix} {start_sub} {end_sub}",
+        "",
+        "  {topic_prefix}       : topic prefix.",
+        "  {start_topic}        : start topic id.",
+        "  {end_topic}          : end topic id.",
+        "  {subscriber_prefix}  : subscriber prefix.",
+        "  {start_sub}          : start subscriber id.",
+        "  {end_sub}            : end subscriber id.",
+    };
+
+    /* CONSUME: move consume ptr of a subscription with specified steps */
+    static final String CONSUME = "consume";
+    static final String CONSUME_DESC = "Move consume ptr of a subscription with sepcified steps";
+    static final String[] CONSUME_USAGE = new String[] {
+        "usage: consume {topic} {subscriber} {nmsgs}",
+        "",
+        "  {topic}      : topic name.",
+        "                 any printable string without spaces.",
+        "  {subscriber} : subscriber id.",
+        "                 any printable string without spaces.",
+        "  {nmsgs}      : how many messages to move consume ptr.",
+        "",
+        "  Example:",
+        "  suppose, from zk we know subscriber B consumed topic T to message 10",
+        "  [hedwig: (standalone) 1] consume T B 2",
+        "  after executed above command, a consume(10+2) request will be sent to hedwig.",
+        "",
+        "  NOTE:",
+        "  since Hedwig updates subscription consume ptr lazily, so you need to know that",
+        "    1) the consumption ptr read from zookeeper may be stable; ",
+        "    2) after sent the consume request, hedwig may just move ptr in its memory and lazily update it to zookeeper. you may not see the ptr changed when DESCRIBE the topic.",
+    };
+
+    /* CONSUMETO: move consume ptr of a subscription to a specified pos */
+    static final String CONSUMETO = "consumeto";
+    static final String CONSUMETO_DESC = "Move consume ptr of a subscription to a specified message id";
+    static final String[] CONSUMETO_USAGE = new String[] {
+        "usage: consumeto {topic} {subscriber} {msg_id}",
+        "",
+        "  {topic}      : topic name.",
+        "                 any printable string without spaces.",
+        "  {subscriber} : subscriber id.",
+        "                 any printable string without spaces.",
+        "  {msg_id}     : message id that consume ptr will be moved to.",
+        "                 if the message id is less than current consume ptr,",
+        "                 hedwig will do nothing.",
+        "",
+        "  Example:",
+        "  suppose, from zk we know subscriber B consumed topic T to message 10",
+        "  [hedwig: (standalone) 1] consumeto T B 12",
+        "  after executed above command, a consume(12) request will be sent to hedwig.",
+        "",
+        "  NOTE:",
+        "  since Hedwig updates subscription consume ptr lazily, so you need to know that",
+        "    1) the consumption ptr read from zookeeper may be stable; ",
+        "    2) after sent the consume request, hedwig may just move ptr in its memory and lazily update it to zookeeper. you may not see the ptr changed when DESCRIBE the topic.",
+    };
+
+    /* PUBSUB: a healthy checking command to ensure cluster is running */
+    static final String PUBSUB = "pubsub";
+    static final String PUBSUB_DESC = "A healthy checking command to ensure hedwig is in running state";
+    static final String[] PUBSUB_USAGE = new String[] {
+        "usage: pubsub {topic} {subscriber} {timeout_secs} {message}",
+        "",
+        "  {topic}        : topic name.",
+        "                   any printable string without spaces.",
+        "  {subscriber}   : subscriber id.",
+        "                   any printable string without spaces.",
+        "  {timeout_secs} : how long will the subscriber wait for published message.",
+        "  {message}      : message body.",
+        "                   remaining arguments are used as message body to publish.",
+        "",
+        "  Example:",
+        "  [hedwig: (standalone) 1] pubsub TOPIC SUBID 10 TEST_MESSAGS",
+        "",
+        "  1) hw will subscribe topic TOPIC as subscriber SUBID;",
+        "  2) subscriber SUBID will wait a message until 10 seconds;",
+        "  3) hw publishes TEST_MESSAGES to topic TOPIC;",
+        "  4) if subscriber recevied message in 10 secs, it checked that whether the message is published message.",
+        "     if true, it will return SUCCESS, otherwise return FAILED.",
+    };
+
+    //
+    // List all commands used to admin hedwig
+    //
+
+    /* SHOW: list all available hub servers or topics */
+    static final String SHOW = "show";
+    static final String SHOW_DESC = "list all available hub servers or topics";
+    static final String[] SHOW_USAGE = new String[] {
+        "usage: show [topics | hubs]",
+        "",
+        "  show topics :",
+        "    listing all available topics in hedwig.",
+        "",
+        "  show hubs :",
+        "    listing all available hubs in hedwig.",
+        "",
+        "  NOTES:",
+        "  'show topics' will not works when there are millions of topics in hedwig, since we have packetLen limitation fetching data from zookeeper.",
+    };
+
+    static final String SHOW_TOPICS = "topics";
+    static final String SHOW_HUBS   = "hubs";
+
+    /* DESCRIBE: show the metadata of a topic */
+    static final String DESCRIBE = "describe";
+    static final String DESCRIBE_DESC = "show metadata of a topic, including topic owner, persistence info, subscriptions info";
+    static final String[] DESCRIBE_USAGE = new String[] {
+        "usage: describe topic {topic}",
+        "",
+        "  {topic} : topic name.",
+        "            any printable string without spaces.",
+        "",
+        "  Example: describe topic ttttt",
+        "",
+        "  Output:",
+        "  ===== Topic Information : ttttt =====",
+        "",
+        "  Owner : 98.137.99.27:9875:9876",
+        "",
+        "  >>> Persistence Info <<<",
+        "  Ledger 54729 [ 1 ~ 59 ]",
+        "  Ledger 54731 [ 60 ~ 60 ]",
+        "  Ledger 54733 [ 61 ~ 61 ]",
+        "",
+        "  >>> Subscription Info <<<",
+        "  Subscriber mysub : consumeSeqId: local:50",
+    };
+
+    static final String DESCRIBE_TOPIC = "topic";
+
+    /* READTOPIC: read messages of a specified topic */
+    static final String READTOPIC = "readtopic";
+    static final String READTOPIC_DESC = "read messages of a specified topic";
+    static final String[] READTOPIC_USAGE = new String[] {
+        "usage: readtopic {topic} [start_msg_id]",
+        "",
+        "  {topic}        : topic name.",
+        "                   any printable string without spaces.",
+        "  [start_msg_id] : message id that start to read from.",
+        "",
+        "  no start_msg_id provided:",
+        "    it will start from least_consumed_message_id + 1.",
+        "    least_consume_message_id is computed from all its subscribers.",
+        "",
+        "  start_msg_id provided:",
+        "    it will start from MAX(start_msg_id, least_consumed_message_id).",
+        "",
+        "  MESSAGE FORMAT:",
+        "",
+        "  ---------- MSGID=LOCAL(51) ----------",
+        "  MsgId:     LOCAL(51)",
+        "  SrcRegion: standalone",
+        "  Message:",
+        "",
+        "  hello",
+    };
+
+    //
+    // List other useful commands
+    //
+
+    /* SET: set whether printing zk watches or not */
+    static final String SET = "set";
+    static final String SET_DESC = "set whether printing zk watches or not";
+    static final String[] SET_USAGE = EMPTY_ARRAY;
+
+    /* HISTORY: list history commands */
+    static final String HISTORY = "history";
+    static final String HISTORY_DESC = "list history commands";
+    static final String[] HISTORY_USAGE = EMPTY_ARRAY;
+
+    /* REDO: redo previous command */
+    static final String REDO = "redo";
+    static final String REDO_DESC = "redo history command";
+    static final String[] REDO_USAGE = new String[] {
+        "usage: redo [{cmdno} | !]",
+        "",
+        "  {cmdno} : history command no.",
+        "  !       : last command.",
+    };
+
+    /* HELP: print usage information of a specified command */
+    static final String HELP = "help";
+    static final String HELP_DESC = "print usage information of a specified command";
+    static final String[] HELP_USAGE = new String[] {
+        "usage: help {command}",
+        "",
+        "  {command} : command name",
+    };
+
+    static final String QUIT = "quit";
+    static final String QUIT_DESC = "exit console";
+    static final String[] QUIT_USAGE = EMPTY_ARRAY;
+
+    static final String EXIT = "exit";
+    static final String EXIT_DESC = QUIT_DESC;
+    static final String[] EXIT_USAGE = EMPTY_ARRAY;
+
+    public static enum COMMAND {
+
+        CMD_PUB (PUB, PUB_DESC, PUB_USAGE),
+        CMD_SUB (SUB, SUB_DESC, SUB_USAGE),
+        CMD_CLOSESUB (CLOSESUB, CLOSESUB_DESC, CLOSESUB_USAGE),
+        CMD_UNSUB (UNSUB, UNSUB_DESC, UNSUB_USAGE),
+        CMD_RMSUB (RMSUB, RMSUB_DESC, RMSUB_USAGE),
+        CMD_CONSUME (CONSUME, CONSUME_DESC, CONSUME_USAGE),
+        CMD_CONSUMETO (CONSUMETO, CONSUMETO_DESC, CONSUMETO_USAGE),
+        CMD_PUBSUB (PUBSUB, PUBSUB_DESC, PUBSUB_USAGE),
+        CMD_SHOW (SHOW, SHOW_DESC, SHOW_USAGE),
+        CMD_DESCRIBE (DESCRIBE, DESCRIBE_DESC, DESCRIBE_USAGE),
+        CMD_READTOPIC (READTOPIC, READTOPIC_DESC, READTOPIC_USAGE),
+        CMD_SET (SET, SET_DESC, SET_USAGE),
+        CMD_HISTORY (HISTORY, HISTORY_DESC, HISTORY_USAGE),
+        CMD_REDO (REDO, REDO_DESC, REDO_USAGE),
+        CMD_HELP (HELP, HELP_DESC, HELP_USAGE),
+        CMD_QUIT (QUIT, QUIT_DESC, QUIT_USAGE),
+        CMD_EXIT (EXIT, EXIT_DESC, EXIT_USAGE),
+        // sub commands
+        CMD_SHOW_TOPICS (SHOW_TOPICS, "", EMPTY_ARRAY),
+        CMD_SHOW_HUBS (SHOW_HUBS, "", EMPTY_ARRAY),
+        CMD_DESCRIBE_TOPIC (DESCRIBE_TOPIC, "", EMPTY_ARRAY);
+
+        COMMAND(String name, String desc, String[] usage) {
+            this.name = name;
+            this.desc = desc;
+            this.usage = usage;
+            this.subCmds = new LinkedHashMap<String, COMMAND>();
+        }
+
+        public String getName() { return name; }
+
+        public String getDescription() { return desc; }
+
+        public String[] getUsage() { return usage; }
+
+        public Map<String, COMMAND> getSubCommands() { return subCmds; }
+
+        public void addSubCommand(COMMAND c) {
+            this.subCmds.put(c.name, c);
+        };
+
+        public void printUsage() {
+            System.err.println(name + ": " + desc);
+            for(String line : usage) {
+                System.err.println(line);
+            }
+            System.err.println();
+        }
+
+        protected String name;
+        protected String desc;
+        protected String[] usage;
+        protected Map<String, COMMAND> subCmds;
+    }
+
+    static Map<String, COMMAND> commands = null;
+
+    private static void addCommand(COMMAND c) {
+        commands.put(c.getName(), c);
+    }
+
+    static {
+        commands = new LinkedHashMap<String, COMMAND>();
+
+        addCommand(COMMAND.CMD_PUB);
+        addCommand(COMMAND.CMD_SUB);
+        addCommand(COMMAND.CMD_CLOSESUB);
+        addCommand(COMMAND.CMD_UNSUB);
+        addCommand(COMMAND.CMD_RMSUB);
+        addCommand(COMMAND.CMD_CONSUME);
+        addCommand(COMMAND.CMD_CONSUMETO);
+        addCommand(COMMAND.CMD_PUBSUB);
+
+        // show
+        COMMAND.CMD_SHOW.addSubCommand(COMMAND.CMD_SHOW_TOPICS);
+        COMMAND.CMD_SHOW.addSubCommand(COMMAND.CMD_SHOW_HUBS);
+        addCommand(COMMAND.CMD_SHOW);
+
+        // describe
+        COMMAND.CMD_DESCRIBE.addSubCommand(COMMAND.CMD_DESCRIBE_TOPIC);
+        addCommand(COMMAND.CMD_DESCRIBE);
+
+        addCommand(COMMAND.CMD_READTOPIC);
+        addCommand(COMMAND.CMD_SET);
+        addCommand(COMMAND.CMD_HISTORY);
+        addCommand(COMMAND.CMD_REDO);
+        addCommand(COMMAND.CMD_HELP);
+        addCommand(COMMAND.CMD_QUIT);
+        addCommand(COMMAND.CMD_EXIT);
+    }
+
+    public static Map<String, COMMAND> getHedwigCommands() {
+        return commands;
+    }
+
+    /**
+     * Find candidate commands by the specified token list
+     *
+     * @param token token list
+     *
+     * @return list of candidate commands
+     */
+    public static List<String> findCandidateCommands(String[] tokens) {
+        List<String> cmds = new LinkedList<String>();
+
+        Map<String, COMMAND> cmdMap = commands;
+        for (int i=0; i<(tokens.length - 1); i++) {
+            COMMAND c = cmdMap.get(tokens[i]);
+            // no commands
+            if (c == null || c.getSubCommands().size() <= 0) {
+                return cmds;
+            } else {
+                cmdMap = c.getSubCommands();
+            }
+        }
+        cmds.addAll(cmdMap.keySet());
+        return cmds;
+    }
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java?rev=1241858&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java Wed Feb  8 10:49:10 2012
@@ -0,0 +1,1029 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.admin.console;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.hedwig.admin.HedwigAdmin;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import static org.apache.hedwig.admin.console.HedwigCommands.*;
+import static org.apache.hedwig.admin.console.HedwigCommands.COMMAND.*;
+
+/**
+ * Console Client to Hedwig
+ */
+public class HedwigConsole {
+    private static final Logger LOG = LoggerFactory.getLogger(HedwigConsole.class);
+    // NOTE: now it is fixed passwd in bookkeeper
+    static byte[] passwd = "sillysecret".getBytes();
+
+    // history file name
+    static final String HW_HISTORY_FILE = ".hw_history";
+
+    protected MyCommandOptions cl = new MyCommandOptions();
+    protected HashMap<Integer, String> history = new LinkedHashMap<Integer, String>();
+    protected int commandCount = 0;
+    protected boolean printWatches = true;
+    protected Map<String, MyCommand> myCommands;
+
+    protected boolean inConsole = true;
+
+    protected HedwigAdmin admin;
+    protected HedwigClient hubClient;
+    protected Publisher publisher;
+    protected Subscriber subscriber;
+    protected ConsoleMessageHandler consoleHandler =
+            new ConsoleMessageHandler();
+
+    protected String myRegion;
+
+    interface MyCommand {
+        boolean runCmd(String[] args) throws Exception;
+    }
+
+    class HelpCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            boolean printUsage = true;
+            if (args.length >= 2) {
+                String command = args[1];
+                COMMAND c = getHedwigCommands().get(command);
+                if (c != null) {
+                    c.printUsage();
+                    printUsage = false;
+                }
+            }
+            if (printUsage) {
+                usage();
+            }
+            return true;
+        }
+    }
+
+    class ExitCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            printMessage("Quitting ...");
+            hubClient.close();
+            admin.close();
+            System.exit(0);
+            return true;
+        }
+    }
+
+    class RedoCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            if (args.length < 2) {
+                return false;
+            }
+
+            int index;
+            if ("!".equals(args[1])) {
+                index = commandCount - 1;
+            } else {
+                index = Integer.decode(args[1]);
+                if (commandCount <= index) {
+                    System.err.println("Command index out of range");
+                    return false;
+                }
+            }
+            cl.parseCommand(history.get(index));
+            if (cl.getCommand().equals("redo")) {
+                System.err.println("No redoing redos");
+                return false;
+            }
+            history.put(commandCount, history.get(index));
+            processCmd(cl);
+            return true;
+        }
+        
+    }
+
+    class HistoryCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            for (int i=commandCount - 10; i<=commandCount; ++i) {
+                if (i < 0) {
+                    continue;
+                }
+                System.out.println(i + " - " + history.get(i));
+            }
+            return true;
+        }
+        
+    }
+
+    class SetCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            if (args.length < 3 || !"printwatches".equals(args[1])) {
+                return false;
+            } else if (args.length == 2) {
+                System.out.println("printwatches is " + (printWatches ? "on" : "off"));
+            } else {
+                printWatches = args[2].equals("on");
+            }
+            return true;
+        }
+        
+    }
+
+    class PubCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            if (args.length < 3) {
+                return false;
+            }
+            ByteString topic = ByteString.copyFromUtf8(args[1]);
+
+            StringBuilder sb = new StringBuilder();
+            for (int i=2; i<args.length; i++) {
+                sb.append(args[i]);
+                if (i != args.length - 1) {
+                    sb.append(' ');
+                }
+            }
+            ByteString msgBody = ByteString.copyFromUtf8(sb.toString());
+            Message msg = Message.newBuilder().setBody(msgBody).build();
+            try {
+                publisher.publish(topic, msg);
+                System.out.println("PUB DONE");
+            } catch (Exception e) {
+                System.err.println("PUB FAILED");
+                e.printStackTrace();
+            }
+            return true;
+        }
+        
+    }
+
+    class ConsoleMessageHandler implements MessageHandler {
+
+        @Override
+        public void deliver(ByteString topic, ByteString subscriberId,
+                Message msg, Callback<Void> callback, Object context) {
+            System.out.println("Received message from topic " + topic.toStringUtf8() + 
+                    " for subscriber " + subscriberId.toStringUtf8() + " : "
+                    + msg.getBody().toStringUtf8());
+            callback.operationFinished(context, null);
+        }
+        
+    }
+
+    class SubCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            CreateOrAttach mode;
+            boolean receive = true;
+            if (args.length < 3) {
+                return false;
+            } else if (args.length == 3) {
+                mode = CreateOrAttach.ATTACH;
+                receive = true;
+            } else {
+                try {
+                    mode = CreateOrAttach.valueOf(Integer.parseInt(args[3]));
+                } catch (Exception e) {
+                    System.err.println("Unknow mode : " + args[3]);
+                    return false;
+                }
+                if (args.length >= 5) {
+                    try {
+                        receive = Boolean.parseBoolean(args[4]);
+                    } catch (Exception e) {
+                        receive = false;
+                    }
+                }
+            }
+            if (mode == null) {
+                System.err.println("Unknow mode : " + args[3]);
+                return false;
+            }
+            ByteString topic = ByteString.copyFromUtf8(args[1]);
+            ByteString subId = ByteString.copyFromUtf8(args[2]);
+            try {
+                subscriber.subscribe(topic, subId, mode);
+                if (receive) {
+                    subscriber.startDelivery(topic, subId, consoleHandler);
+                    System.out.println("SUB DONE AND RECEIVE");
+                } else {
+                    System.out.println("SUB DONE BUT NOT RECEIVE");
+                }
+            } catch (Exception e) {
+                System.err.println("SUB FAILED");
+                e.printStackTrace();
+            }
+            return true;
+        }
+    }
+
+    class UnsubCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            if (args.length < 3) {
+                return false;
+            }
+            ByteString topic = ByteString.copyFromUtf8(args[1]);
+            ByteString subId = ByteString.copyFromUtf8(args[2]);
+            try {
+                subscriber.stopDelivery(topic, subId);
+                subscriber.unsubscribe(topic, subId);
+                System.out.println("UNSUB DONE");
+            } catch (Exception e) {
+                System.err.println("UNSUB FAILED");
+                e.printStackTrace();
+            }
+            return true;
+        }
+        
+    }
+
+    class RmsubCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            if (args.length < 7) {
+                return false;
+            }
+            String topicPrefix = args[1];
+            int startTopic = Integer.parseInt(args[2]);
+            int endTopic = Integer.parseInt(args[3]);
+            String subPrefix = args[4];
+            int startSub = Integer.parseInt(args[5]);
+            int endSub = Integer.parseInt(args[6]);
+            if (startTopic > endTopic || endSub < startSub) {
+                return false;
+            }
+            for (int i=startTopic; i<=endTopic; i++) {
+                ByteString topic = ByteString.copyFromUtf8(topicPrefix + i);
+                try {
+                    for (int j=startSub; j<=endSub; j++) {
+                        ByteString sub = ByteString.copyFromUtf8(subPrefix + j);
+                        subscriber.subscribe(topic, sub, CreateOrAttach.CREATE_OR_ATTACH);
+                        subscriber.unsubscribe(topic, sub);
+                    }
+                    System.out.println("RMSUB " + topic.toStringUtf8() + " DONE");
+                } catch (Exception e) {
+                    System.err.println("RMSUB " + topic.toStringUtf8() + " FAILED");
+                    e.printStackTrace();
+                }
+            }
+            return true;
+        }
+
+    }
+    
+    class CloseSubscriptionCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            if (args.length < 3) {
+                return false;
+            }
+            ByteString topic = ByteString.copyFromUtf8(args[1]);
+            ByteString sudId = ByteString.copyFromUtf8(args[2]);
+            
+            try {
+                subscriber.stopDelivery(topic, sudId);
+                subscriber.closeSubscription(topic, sudId);
+            } catch (Exception e) {
+                System.err.println("CLOSESUB FAILED");
+            }
+            return true;
+        }
+        
+    }
+    
+    class ConsumeToCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            if (args.length < 4) {
+                return false;
+            }
+            ByteString topic = ByteString.copyFromUtf8(args[1]);
+            ByteString subId = ByteString.copyFromUtf8(args[2]);
+            long msgId = Long.parseLong(args[3]);
+            MessageSeqId consumeId = MessageSeqId.newBuilder().setLocalComponent(msgId).build();
+            try {
+                subscriber.consume(topic, subId, consumeId);
+            } catch (Exception e) {
+                System.err.println("CONSUMETO FAILED");
+            }
+            return true;
+        }
+        
+    }
+    
+    class ConsumeCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            if (args.length < 4) {
+                return false;
+            }
+            long lastConsumedId = 0;
+            SubscriptionState state = admin.getSubscription(ByteString.copyFromUtf8(args[1]), ByteString.copyFromUtf8(args[2]));
+            if (null == state) {
+                System.err.println("Failed to read subscription for topic: " + args[1]
+                                 + " subscriber: " + args[2]);
+                return true;
+            }
+            long numMessagesToConsume = Long.parseLong(args[3]);
+            long idToConsumed = lastConsumedId + numMessagesToConsume;
+            System.out.println("Try to move subscriber(" + args[2] + ") consume ptr of topic(" + args[1]
+                             + ") from " + lastConsumedId + " to " + idToConsumed);
+            MessageSeqId consumeId = MessageSeqId.newBuilder().setLocalComponent(idToConsumed).build();
+            ByteString topic = ByteString.copyFromUtf8(args[1]);
+            ByteString subId = ByteString.copyFromUtf8(args[2]);
+            try {
+                subscriber.consume(topic, subId, consumeId);
+            } catch (Exception e) {
+                System.err.println("CONSUME FAILED");
+            }
+            return true;
+        }
+        
+    }
+
+    class PubSubCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            if (args.length < 5) {
+                return false;
+            }
+            final long startTime = System.currentTimeMillis();
+
+            final ByteString topic = ByteString.copyFromUtf8(args[1]);
+            final ByteString subId = ByteString.copyFromUtf8(args[2] + "-" + startTime);
+            int timeoutSecs = 60;
+            try {
+                timeoutSecs = Integer.parseInt(args[3]);
+            } catch (NumberFormatException nfe) {
+            }
+
+            StringBuilder sb = new StringBuilder();
+            for (int i=4; i<args.length; i++) {
+                sb.append(args[i]);
+                if (i != args.length - 1) {
+                    sb.append(' ');
+                }
+            }
+            // append a timestamp tag
+            ByteString msgBody = ByteString.copyFromUtf8(sb.toString() + "-" + startTime);
+            final Message msg = Message.newBuilder().setBody(msgBody).build();
+
+            boolean subscribed = false;
+            boolean success = false;
+            final AtomicBoolean isDone = new AtomicBoolean(false);
+            long elapsedTime = 0L;
+
+            System.out.println("Starting PUBSUB test ...");
+            try {
+                // sub the topic
+                subscriber.subscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH);
+                subscribed = true;
+
+                System.out.println("Sub topic " + topic.toStringUtf8() + ", subscriber id " + subId.toStringUtf8());
+
+                
+
+                // pub topic
+                publisher.publish(topic, msg);
+                System.out.println("Pub topic " + topic.toStringUtf8() + " : " + msg.getBody().toStringUtf8());
+
+                // ensure subscriber first, publish next, then we start delivery to receive message
+                // if start delivery first before publish, isDone may notify before wait
+                subscriber.startDelivery(topic, subId, new MessageHandler() {
+
+                    @Override
+                    public void deliver(ByteString thisTopic, ByteString subscriberId,
+                            Message message, Callback<Void> callback, Object context) {
+                        if (thisTopic.equals(topic) && subscriberId.equals(subId) &&
+                            msg.getBody().equals(message.getBody())) {
+                            System.out.println("Received message : " + message.getBody().toStringUtf8());
+                            synchronized(isDone) {
+                                isDone.set(true);
+                                isDone.notify();
+                            }
+                        }
+                        callback.operationFinished(context, null);
+                    }
+
+                });
+
+                // wait for the message
+                synchronized (isDone) {
+                    isDone.wait(timeoutSecs * 1000L);
+                }
+                success = isDone.get();
+                elapsedTime = System.currentTimeMillis() - startTime;
+            } finally {
+                try {
+                    if (subscribed) {
+                        subscriber.stopDelivery(topic, subId);
+                        subscriber.unsubscribe(topic, subId);
+                    }
+                } finally {
+                    if (success) {
+                        System.out.println("PUBSUB SUCCESS. TIME: " + elapsedTime + " MS");
+                    } else {
+                        System.out.println("PUBSUB FAILED. ");
+                    }
+                    return success;
+                }
+            }
+        }
+
+    }
+    
+    class ReadTopicCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            if (args.length < 2) {
+                return false;
+            }
+            ReadTopic rt;
+            ByteString topic = ByteString.copyFromUtf8(args[1]);
+            if (args.length == 2) {
+                rt = new ReadTopic(admin, topic, inConsole);
+            } else {
+                rt = new ReadTopic(admin, topic, Long.parseLong(args[2]), inConsole);
+            }
+            rt.readTopic();
+            return true;
+        }
+        
+    }
+
+    class ShowCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            if (args.length < 2) {
+                return false;
+            }
+            String errorMsg = null;
+            try {
+                if (HedwigCommands.SHOW_HUBS.equals(args[1])) {
+                    errorMsg = "Unable to fetch the list of hub servers";
+                    showHubs();
+                } else if (HedwigCommands.SHOW_TOPICS.equals(args[1])) {
+                    errorMsg = "Unable to fetch the list of topics";
+                    showTopics();
+                } else {
+                    System.err.println("ERROR: Unknown show command '" + args[1] + "'");
+                    return false;
+                }
+            } catch (Exception e) {
+                if (null != errorMsg) {
+                    System.err.println(errorMsg);
+                }
+                e.printStackTrace();
+            }
+            return true;
+        }
+
+        protected void showHubs() throws Exception {
+            Map<HedwigSocketAddress, Integer> hubs = admin.getAvailableHubs();
+            System.out.println("Available Hub Servers:");
+            for (Map.Entry<HedwigSocketAddress, Integer> entry : hubs.entrySet()) {
+                System.out.println("\t" + entry.getKey() + " :\t" + entry.getValue());
+            }
+        }
+
+        protected void showTopics() throws Exception {
+            List<String> topics = admin.getTopics();
+            System.out.println("Topic List:");
+            System.out.println(topics);
+        }
+        
+    }
+
+    class DescribeCmd implements MyCommand {
+
+        @Override
+        public boolean runCmd(String[] args) throws Exception {
+            if (args.length < 3) {
+                return false;
+            }
+            if (HedwigCommands.DESCRIBE_TOPIC.equals(args[1])) {
+                return describeTopic(args[2]);
+            } else {
+                return false;
+            }
+        }
+
+        protected boolean describeTopic(String topic) throws Exception {
+            ByteString btopic = ByteString.copyFromUtf8(topic);
+            HedwigSocketAddress owner = admin.getTopicOwner(btopic);
+            List<LedgerRange> ranges = admin.getTopicLedgers(btopic);
+            Map<ByteString, SubscriptionState> states = admin.getTopicSubscriptions(btopic);
+
+            System.out.println("===== Topic Information : " + topic + " =====");
+            System.out.println();
+            System.out.println("Owner : " + (owner == null ? "NULL" : owner));
+            System.out.println();
+
+            // print ledgers
+            printTopicLedgers(ranges);
+            // print subscriptions
+            printTopicSubscriptions(states);
+
+            return true;
+        }
+
+        private void printTopicLedgers(List<LedgerRange> lrs) {
+            System.out.println(">>> Persistence Info <<<");
+            if (null == lrs) {
+                System.out.println("N/A");
+                return;
+            }
+            if (lrs.isEmpty()) {
+                System.out.println("No Ledger used.");
+                return;
+            }
+            Iterator<LedgerRange> lrIterator = lrs.iterator();
+            long startOfLedger = 1;
+            while (lrIterator.hasNext()) {
+                LedgerRange range = lrIterator.next();
+                long endOfLedger = Long.MAX_VALUE;
+                if (range.hasEndSeqIdIncluded()) {
+                    endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
+                }
+                System.out.println("Ledger " + range.getLedgerId() + " [ " + startOfLedger + " ~ " + (endOfLedger == Long.MAX_VALUE ? "" : endOfLedger) + " ]");
+
+                startOfLedger = endOfLedger + 1;
+            }
+            System.out.println();
+        }
+
+        private void printTopicSubscriptions(Map<ByteString, SubscriptionState> states) {
+            System.out.println(">>> Subscription Info <<<");
+            if (0 == states.size()) {
+                System.out.println("No subscriber.");
+                return;
+            }
+            for (Map.Entry<ByteString, SubscriptionState> entry : states.entrySet()) {
+                System.out.println("Subscriber " + entry.getKey().toStringUtf8() + " : "
+                                 + SubscriptionStateUtils.toString(entry.getValue()));
+            }
+            System.out.println();
+        }
+
+    }
+
+    protected Map<String, MyCommand> buildMyCommands() {
+        Map<String, MyCommand> cmds =
+                new HashMap<String, MyCommand>();
+
+        ExitCmd exitCmd = new ExitCmd();
+        cmds.put(EXIT, exitCmd);
+        cmds.put(QUIT, exitCmd);
+        cmds.put(HELP, new HelpCmd());
+        cmds.put(HISTORY, new HistoryCmd());
+        cmds.put(REDO, new RedoCmd());
+        cmds.put(SET, new SetCmd());
+        cmds.put(PUB, new PubCmd());
+        cmds.put(SUB, new SubCmd());
+        cmds.put(PUBSUB, new PubSubCmd());
+        cmds.put(CLOSESUB, new CloseSubscriptionCmd());
+        cmds.put(UNSUB, new UnsubCmd());
+        cmds.put(RMSUB, new RmsubCmd());
+        cmds.put(CONSUME, new ConsumeCmd());
+        cmds.put(CONSUMETO, new ConsumeToCmd());
+        cmds.put(SHOW, new ShowCmd());
+        cmds.put(DESCRIBE, new DescribeCmd());
+        cmds.put(READTOPIC, new ReadTopicCmd());
+
+        return cmds;
+    }
+
+    static void usage() {
+        System.err.println("HedwigConsole [options] [command] [args]");
+        System.err.println();
+        System.err.println("Avaiable commands:");
+        for (String cmd : getHedwigCommands().keySet()) {
+            System.err.println("\t" + cmd);
+        }
+        System.err.println();
+    }
+
+    /**
+     * A storage class for both command line options and shell commands.
+     */
+    static private class MyCommandOptions {
+
+        private Map<String,String> options = new HashMap<String,String>();
+        private List<String> cmdArgs = null;
+        private String command = null;
+
+        public MyCommandOptions() {
+        }
+
+        public String getOption(String opt) {
+            return options.get(opt);
+        }
+
+        public String getCommand( ) {
+            return command;
+        }
+
+        public String getCmdArgument( int index ) {
+            return cmdArgs.get(index);
+        }
+
+        public int getNumArguments( ) {
+            return cmdArgs.size();
+        }
+
+        public String[] getArgArray() {
+            return cmdArgs.toArray(new String[0]);
+        }
+
+        /**
+         * Parses a command line that may contain one or more flags
+         * before an optional command string
+         * @param args command line arguments
+         * @return true if parsing succeeded, false otherwise.
+         */
+        public boolean parseOptions(String[] args) {
+            List<String> argList = Arrays.asList(args);
+            Iterator<String> it = argList.iterator();
+
+            while (it.hasNext()) {
+                String opt = it.next();
+                if (!opt.startsWith("-")) {
+                    command = opt;
+                    cmdArgs = new ArrayList<String>( );
+                    cmdArgs.add( command );
+                    while (it.hasNext()) {
+                        cmdArgs.add(it.next());
+                    }
+                    return true;
+                } else {
+                    try {
+                        options.put(opt.substring(1), it.next());
+                    } catch (NoSuchElementException e) {
+                        System.err.println("Error: no argument found for option "
+                                + opt);
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+
+        /**
+         * Breaks a string into command + arguments.
+         * @param cmdstring string of form "cmd arg1 arg2..etc"
+         * @return true if parsing succeeded.
+         */
+        public boolean parseCommand( String cmdstring ) {
+            String[] args = cmdstring.split(" ");
+            if (args.length == 0){
+                return false;
+            }
+            command = args[0];
+            cmdArgs = Arrays.asList(args);
+            return true;
+        }
+    }
+
+    private class MyWatcher implements Watcher {
+        public void process(WatchedEvent event) {
+            if (getPrintWatches()) {
+                printMessage("WATCHER::");
+                printMessage(event.toString());
+            }
+        }
+    }
+
+    public void printMessage(String msg) {
+        if (inConsole) {
+            System.out.println("\n"+msg);
+        }
+    }
+
+    /**
+     * Hedwig Console
+     *
+     * @param args arguments
+     * @throws IOException
+     * @throws InterruptedException 
+     */
+    public HedwigConsole(String[] args) throws IOException, InterruptedException {
+        cl.parseOptions(args);
+
+        if (cl.getCommand() == null) {
+            inConsole = true;
+        } else {
+            inConsole = false;
+        }
+
+        org.apache.bookkeeper.conf.ClientConfiguration bkClientConf =
+            new org.apache.bookkeeper.conf.ClientConfiguration();
+        ServerConfiguration hubServerConf = new ServerConfiguration();
+        String serverCfgFile = cl.getOption("server-cfg");
+        if (serverCfgFile != null) {
+            try {
+                hubServerConf.loadConf(new File(serverCfgFile).toURI().toURL());
+            } catch (ConfigurationException e) {
+                throw new IOException(e);
+            }
+            try {
+                bkClientConf.loadConf(new File(serverCfgFile).toURI().toURL());
+            } catch (ConfigurationException e) {
+                throw new IOException(e);
+            }
+        }
+
+        ClientConfiguration hubClientCfg = new ClientConfiguration();
+        String clientCfgFile = cl.getOption("client-cfg");
+        if (clientCfgFile != null) {
+            try {
+                hubClientCfg.loadConf(new File(clientCfgFile).toURI().toURL());
+            } catch (ConfigurationException e) {
+                throw new IOException(e);
+            }
+        }
+
+
+
+        printMessage("Connecting to zookeeper/bookkeeper using HedwigAdmin");
+        try {
+            admin = new HedwigAdmin(bkClientConf, hubServerConf);
+            admin.getZkHandle().register(new MyWatcher());
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        
+        printMessage("Connecting to default hub server " + hubClientCfg.getDefaultServerHost());
+        hubClient = new HedwigClient(hubClientCfg);
+        publisher = hubClient.getPublisher();
+        subscriber = hubClient.getSubscriber();
+        
+        // other parameters
+        myRegion = hubServerConf.getMyRegion();
+    }
+
+    public boolean getPrintWatches() {
+        return printWatches;
+    }
+
+    protected String getPrompt() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[hedwig: (").append(myRegion).append(") ").append(commandCount).append("] ");
+        return sb.toString();
+    }
+
+    protected void addToHistory(int i, String cmd) {
+        history.put(i, cmd);
+    }
+
+    public void executeLine(String line) {
+        if (!line.equals("")) {
+            cl.parseCommand(line);
+            addToHistory(commandCount, line);
+            processCmd(cl);
+            commandCount++;
+        }
+    }
+
+    protected boolean processCmd(MyCommandOptions co) {
+        String[] args = co.getArgArray();
+        String cmd = co.getCommand();
+        if (args.length < 1) {
+            usage();
+            return false;
+        }
+        if (!getHedwigCommands().containsKey(cmd)) {
+            usage();
+            return false;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Processing " + cmd);
+        }
+
+        MyCommand myCommand = myCommands.get(cmd);
+        if (myCommand == null) {
+            System.err.println("No Command Processor found for command " + cmd);
+            usage();
+            return false;
+        }
+
+        long startTime = System.currentTimeMillis();
+        boolean success = false;
+        try {
+            success = myCommand.runCmd(args);
+        } catch (Exception e) {
+            e.printStackTrace();
+            success = false;
+        }
+        long elapsedTime = System.currentTimeMillis() - startTime;
+        if (inConsole) {
+            if (success) {
+                System.out.println("Finished " + ((double)elapsedTime / 1000) + " s.");
+            } else {
+                COMMAND c = getHedwigCommands().get(cmd);
+                if (c != null) {
+                    c.printUsage();
+                }
+            }
+        }
+        return success;
+    }
+
+    @SuppressWarnings("unchecked")
+    void run() throws IOException {
+        inConsole = true;
+        myCommands = buildMyCommands();
+        if (cl.getCommand() == null) {
+            System.out.println("Welcome to Hedwig!");
+
+            boolean jlinemissing = false;
+            // only use jline if it's in the classpath
+            try {
+                Class consoleC = Class.forName("jline.ConsoleReader");
+                Class completorC =
+                    Class.forName("org.apache.hedwig.admin.console.JLineHedwigCompletor");
+
+                System.out.println("JLine support is enabled");
+
+                Object console =
+                    consoleC.getConstructor().newInstance();
+
+                Object completor =
+                    completorC.getConstructor(HedwigAdmin.class).newInstance(admin);
+                Method addCompletor = consoleC.getMethod("addCompletor",
+                        Class.forName("jline.Completor"));
+                addCompletor.invoke(console, completor);
+
+                // load history file
+                boolean historyEnabled = false;
+                Object history = null;
+                Method addHistory = null;
+                // Method flushHistory = null;
+                try {
+                    Class historyC = Class.forName("jline.History");
+                    history = historyC.getConstructor().newInstance();
+
+                    File file = new File(System.getProperty("hw.history",
+                                         new File(System.getProperty("user.home"), HW_HISTORY_FILE).toString()));
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("History file is " + file.toString());
+                    }
+                    Method setHistoryFile = historyC.getMethod("setHistoryFile", File.class);
+                    setHistoryFile.invoke(history, file);
+
+                    // set history to console reader
+                    Method setHistory = consoleC.getMethod("setHistory", historyC);
+                    setHistory.invoke(console, history);
+
+                    // load history from history file
+                    Method moveToFirstEntry = historyC.getMethod("moveToFirstEntry");
+                    moveToFirstEntry.invoke(history);
+
+                    addHistory = historyC.getMethod("addToHistory", String.class);
+                    // flushHistory = historyC.getMethod("flushBuffer");
+
+                    Method nextEntry = historyC.getMethod("next");
+                    Method current = historyC.getMethod("current");
+                    while ((Boolean)(nextEntry.invoke(history))) {
+                        String entry = (String)current.invoke(history);
+                        if (!entry.equals("")) {
+                            addToHistory(commandCount, entry);
+                        }
+                        commandCount++;
+                    }
+
+                    historyEnabled = true;
+                    System.out.println("JLine history support is enabled");
+                } catch (Exception e) {
+                    historyEnabled = false;
+                    e.printStackTrace();
+                    System.out.println("JLine history support is disabled");
+                }
+
+                String line;
+                Method readLine = consoleC.getMethod("readLine", String.class);
+                while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
+                    executeLine(line);
+                    if (historyEnabled) {
+                        addHistory.invoke(history, line);
+                        // flushHistory.invoke(history);
+                    }
+                }
+            } catch (ClassNotFoundException e) {
+                LOG.debug("Unable to start jline", e);
+                jlinemissing = true;
+            } catch (NoSuchMethodException e) {
+                LOG.debug("Unable to start jline", e);
+                jlinemissing = true;
+            } catch (InvocationTargetException e) {
+                LOG.debug("Unable to start jline", e);
+                jlinemissing = true;
+            } catch (IllegalAccessException e) {
+                LOG.debug("Unable to start jline", e);
+                jlinemissing = true;
+            } catch (InstantiationException e) {
+                LOG.debug("Unable to start jline", e);
+                jlinemissing = true;
+            }
+
+            if (jlinemissing) {
+                System.out.println("JLine support is disabled");
+                BufferedReader br =
+                    new BufferedReader(new InputStreamReader(System.in));
+
+                String line;
+                while ((line = br.readLine()) != null) {
+                    executeLine(line);
+                }
+            }
+        }
+
+        inConsole = false;
+        processCmd(cl);
+        try {
+            myCommands.get(EXIT).runCmd(new String[0]);
+        } catch (Exception e) {
+        }
+    }
+
+    public static void main(String[] args) throws IOException, InterruptedException {
+        HedwigConsole console = new HedwigConsole(args);
+        console.run();
+    }
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java?rev=1241858&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java Wed Feb  8 10:49:10 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.admin.console;
+
+import java.util.List;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.hedwig.admin.HedwigAdmin;
+
+import jline.Completor;
+
+import static org.apache.hedwig.admin.console.HedwigCommands.*;
+
+/**
+ * A jline completor for hedwig console
+ */
+public class JLineHedwigCompletor implements Completor {
+    private HedwigAdmin admin;
+
+    public JLineHedwigCompletor(HedwigAdmin admin) {
+        this.admin = admin;
+    }
+
+    @Override
+    public int complete(String buffer, int cursor, List candidates) {
+        // Guarantee that the final token is the one we're expanding
+        buffer = buffer.substring(0,cursor);
+        String[] tokens = buffer.split(" ");
+        if (buffer.endsWith(" ")) {
+            String[] newTokens = new String[tokens.length + 1];
+            System.arraycopy(tokens, 0, newTokens, 0, tokens.length);
+            newTokens[newTokens.length - 1] = "";
+            tokens = newTokens;
+        }
+        
+        if (tokens.length > 2 &&
+            DESCRIBE.equalsIgnoreCase(tokens[0]) &&
+            DESCRIBE_TOPIC.equalsIgnoreCase(tokens[1])) {
+            return completeTopic(buffer, tokens[2], candidates);
+        } else if (tokens.length > 1 &&
+                   (SUB.equalsIgnoreCase(tokens[0]) ||
+                    PUB.equalsIgnoreCase(tokens[0]) ||
+                    CLOSESUB.equalsIgnoreCase(tokens[0]) ||
+                    CONSUME.equalsIgnoreCase(tokens[0]) ||
+                    CONSUMETO.equalsIgnoreCase(tokens[0]) ||
+                    READTOPIC.equalsIgnoreCase(tokens[0]))) {
+            return completeTopic(buffer, tokens[1], candidates);
+        }
+        List<String> cmds = HedwigCommands.findCandidateCommands(tokens);
+        return completeCommand(buffer, tokens[tokens.length - 1], cmds, candidates);
+    }
+
+    private int completeCommand(String buffer, String token,
+            List<String> commands, List<String> candidates) {
+        for (String cmd : commands) {
+            if (cmd.startsWith(token)) {
+                candidates.add(cmd);
+            }
+        }
+        return buffer.lastIndexOf(" ") + 1;
+    }
+
+    private int completeTopic(String buffer, String token, List<String> candidates) {
+        try {
+            List<String> children = admin.getTopics();
+            for (String child : children) {
+                if (child.startsWith(token)) {
+                    candidates.add(child);
+                }
+            }
+        } catch (Exception e) {
+            return buffer.length();
+        }
+        return candidates.size() == 0 ? buffer.length() : buffer.lastIndexOf(" ") + 1;
+    }
+}



Mime
View raw message