Author: ivank
Date: Wed Feb 8 10:49:10 2012
New Revision: 1241858
URL: http://svn.apache.org/viewvc?rev=1241858&view=rev
Log:
BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/bin/hedwig
zookeeper/bookkeeper/trunk/hedwig-server/pom.xml
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1241858&r1=1241857&r2=1241858&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Feb 8 10:49:10 2012
@@ -37,7 +37,6 @@ Trunk (unreleased changes)
BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)
BOOKKEEPER-133: Hub server should update subscription state to zookeeper when losing topic or shutting down (Sijie Gou via ivank)
-
IMPROVEMENTS:
bookkeeper-server/
@@ -48,6 +47,9 @@ Trunk (unreleased changes)
BOOKKEEPER-157: For small packets, increasing number of bookies actually degrades performance. (ivank via fpj)
+ hedwig-server/
+
+ BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)
Release 4.0.0 - 2011-11-30
Modified: zookeeper/bookkeeper/trunk/hedwig-server/bin/hedwig
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/bin/hedwig?rev=1241858&r1=1241857&r2=1241858&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/bin/hedwig (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/bin/hedwig Wed Feb 8 10:49:10 2012
@@ -34,6 +34,8 @@ HW_HOME=`cd $BINDIR/..;pwd`
DEFAULT_CONF=$HW_HOME/conf/hw_server.conf
+. $HW_HOME/conf/hwenv.sh
+
RELEASE_JAR=`ls $HW_HOME/hedwig-server-*.jar 2> /dev/null | tail -1`
if [ $? == 0 ]; then
HEDWIG_JAR=$RELEASE_JAR
@@ -78,6 +80,7 @@ hedwig_help() {
Usage: hedwig <command>
where command is one of:
server Run the hedwig server
+ console Run the hedwig admin console
help This help message
or command is the full name of a class with a defined main() method.
@@ -118,6 +121,8 @@ OPTS="$OPTS -Djava.net.preferIPv4Stack=t
if [ $COMMAND == "server" ]; then
exec java $OPTS org.apache.hedwig.server.netty.PubSubServer $HEDWIG_SERVER_CONF $@
+elif [ $COMMAND == "console" ]; then
+ exec java $OPTS org.apache.hedwig.admin.console.HedwigConsole -server-cfg $HEDWIG_SERVER_CONF $@
elif [ $COMMAND == "help" ]; then
hedwig_help;
else
Modified: zookeeper/bookkeeper/trunk/hedwig-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/pom.xml?rev=1241858&r1=1241857&r2=1241858&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/pom.xml Wed Feb 8 10:49:10 2012
@@ -110,6 +110,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>jline</groupId>
+ <artifactId>jline</artifactId>
+ <version>0.9.94</version>
+ </dependency>
</dependencies>
<build>
<plugins>
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java?rev=1241858&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java Wed Feb 8 10:49:10 2012
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.admin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Hedwig Admin
+ */
+public class HedwigAdmin {
+ static final Logger LOG = LoggerFactory.getLogger(HedwigAdmin.class);
+
+ // NOTE: now it is fixed passwd used in hedwig
+ static byte[] passwd = "sillysecret".getBytes();
+
+ protected ZooKeeper zk;
+ protected BookKeeper bk;
+ // hub configurations
+ protected ServerConfiguration serverConf;
+ // bookkeeper configurations
+ protected ClientConfiguration bkClientConf;
+
+ // Empty watcher
+ private class MyWatcher implements Watcher {
+ public void process(WatchedEvent event) {
+ }
+ }
+
+ /**
+ * Hedwig Admin Constructor
+ *
+ * @param bkConf
+ * BookKeeper Client Configuration.
+ * @param hubConf
+ * Hub Server Configuration.
+ * @throws Exception
+ */
+ public HedwigAdmin(ClientConfiguration bkConf, ServerConfiguration hubConf) throws Exception {
+ this.serverConf = hubConf;
+ this.bkClientConf = bkConf;
+
+ // connect to zookeeper
+ zk = new ZooKeeper(bkClientConf.getZkServers(), bkClientConf.getZkTimeout(), new MyWatcher());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to zookeeper " + bkClientConf.getZkServers() + ", timeout = "
+ + bkClientConf.getZkTimeout());
+ }
+
+ // connect to bookkeeper
+ bk = new BookKeeper(bkClientConf, zk);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to bookkeeper");
+ }
+ }
+
+ /**
+ * Close the hedwig admin.
+ *
+ * @throws Exception
+ */
+ public void close() throws Exception {
+ bk.close();
+ zk.close();
+ }
+
+ /**
+ * Return zookeeper handle used in hedwig admin.
+ *
+ * @return zookeeper handle
+ */
+ public ZooKeeper getZkHandle() {
+ return zk;
+ }
+
+ /**
+ * Return bookkeeper handle used in hedwig admin.
+ *
+ * @return bookkeeper handle
+ */
+ public BookKeeper getBkHandle() {
+ return bk;
+ }
+
+ /**
+ * Return hub server configuration used in hedwig admin
+ *
+ * @return hub server configuration
+ */
+ public ServerConfiguration getHubServerConf() {
+ return serverConf;
+ }
+
+ /**
+ * Return bookeeper passwd used in hedwig admin
+ *
+ * @return bookeeper passwd
+ */
+ public byte[] getBkPasswd() {
+ return passwd;
+ }
+
+ /**
+ * Return digest type used in hedwig admin
+ *
+ * @return bookeeper digest type
+ */
+ public DigestType getBkDigestType() {
+ return DigestType.CRC32;
+ }
+
+ /**
+ * Dose topic exist?
+ *
+ * @param topic
+ * Topic name
+ * @return whether topic exists or not?
+ * @throws Exception
+ */
+ public boolean hasTopic(ByteString topic) throws Exception {
+ String topicPath = serverConf.getZkTopicPath(new StringBuilder(), topic).toString();
+ Stat stat = zk.exists(topicPath, false);
+ return null != stat;
+ }
+
+ /**
+ * Get available hubs.
+ *
+ * @return available hubs and their loads
+ * @throws Exception
+ */
+ public Map<HedwigSocketAddress, Integer> getAvailableHubs() throws Exception {
+ String zkHubsPath = serverConf.getZkHostsPrefix(new StringBuilder()).toString();
+ Map<HedwigSocketAddress, Integer> hubs =
+ new HashMap<HedwigSocketAddress, Integer>();
+ List<String> hosts = zk.getChildren(zkHubsPath, false);
+ for (String host : hosts) {
+ String zkHubPath = serverConf.getZkHostsPrefix(new StringBuilder())
+ .append("/").append(host).toString();
+ int load = 0;
+ try {
+ Stat stat = new Stat();
+ byte[] data = zk.getData(zkHubPath, false, stat);
+ if (data != null) {
+ load = Integer.parseInt(new String(data));
+ }
+ } catch (Exception e) {
+ // igore
+ }
+ hubs.put(new HedwigSocketAddress(host), load);
+ }
+ return hubs;
+ }
+
+ /**
+ * Get list of topics
+ *
+ * @return list of topics
+ * @throws Exception
+ */
+ public List<String> getTopics() throws Exception {
+ return zk.getChildren(serverConf.getZkTopicsPrefix(new StringBuilder()).toString(), false);
+ }
+
+ /**
+ * Return the znode path of owner of a topic
+ *
+ * @param topic
+ * Topic name
+ * @return znode path of owner of a topic
+ */
+ String hubPath(ByteString topic) {
+ return serverConf.getZkTopicPath(new StringBuilder(), topic).append("/hub").toString();
+ }
+
+ /**
+ * Return the topic owner of a topic
+ *
+ * @param topic
+ * Topic name
+ * @return the address of the owner of a topic
+ * @throws Exception
+ */
+ public HedwigSocketAddress getTopicOwner(ByteString topic) throws Exception {
+ Stat stat = new Stat();
+ byte[] owner = null;
+ try {
+ owner = zk.getData(hubPath(topic), false, stat);
+ } catch (KeeperException.NoNodeException nne) {
+ }
+ if (null == owner) {
+ return null;
+ }
+ return new HedwigSocketAddress(new String(owner));
+ }
+
+ /**
+ * Return the znode path to store ledgers info of a topic
+ *
+ * @param topic
+ * Topic name
+ * @return znode path to store ledgers info of a topic
+ */
+ String ledgersPath(ByteString topic) {
+ return serverConf.getZkTopicPath(new StringBuilder(), topic).append("/ledgers").toString();
+ }
+
+ /**
+ * Return the ledger range forming the topic
+ *
+ * @param topic
+ * Topic name
+ * @return ledger ranges forming the topic
+ * @throws Exception
+ */
+ public List<LedgerRange> getTopicLedgers(ByteString topic) throws Exception {
+ LedgerRanges ranges = null;
+ try {
+ Stat stat = new Stat();
+ byte[] ledgersData = zk.getData(ledgersPath(topic), false, stat);
+ if (null != ledgersData) {
+ ranges = LedgerRanges.parseFrom(ledgersData);
+ }
+ } catch (KeeperException.NoNodeException nne) {
+ }
+ if (null == ranges) {
+ return null;
+ }
+ List<LedgerRange> lrs = ranges.getRangesList();
+ if (lrs.isEmpty()) {
+ return lrs;
+ }
+ // try to check last ledger (it may still open)
+ LedgerRange lastRange = lrs.get(lrs.size() - 1);
+ if (lastRange.hasEndSeqIdIncluded()) {
+ return lrs;
+ }
+ // read last confirmed of the opened ledger
+ try {
+ List<LedgerRange> newLrs = new ArrayList<LedgerRange>();
+ newLrs.addAll(lrs);
+ lrs = newLrs;
+ MessageSeqId lastSeqId;
+ if (lrs.size() == 1) {
+ lastSeqId = MessageSeqId.newBuilder().setLocalComponent(1).build();
+ } else {
+ lastSeqId = lrs.get(lrs.size() - 2).getEndSeqIdIncluded();
+ }
+ LedgerRange newLastRange = refreshLastLedgerRange(lastSeqId, lastRange);
+ lrs.set(lrs.size() - 1, newLastRange);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return lrs;
+ }
+
+ /**
+ * Refresh last ledger range to get lastConfirmed entry, which make it available to read
+ *
+ * @param lastSeqId
+ * Last sequence id of previous ledger
+ * @param oldRange
+ * Ledger range to set lastConfirmed entry
+ */
+ LedgerRange refreshLastLedgerRange(MessageSeqId lastSeqId, LedgerRange oldRange)
+ throws BKException, KeeperException, InterruptedException {
+ LedgerHandle lh = bk.openLedgerNoRecovery(oldRange.getLedgerId(), DigestType.CRC32, passwd);
+ long lastConfirmed = lh.readLastConfirmed();
+ MessageSeqId newSeqId = MessageSeqId.newBuilder().mergeFrom(lastSeqId)
+ .setLocalComponent(lastSeqId.getLocalComponent() + lastConfirmed).build();
+ return LedgerRange.newBuilder().mergeFrom(oldRange).setEndSeqIdIncluded(newSeqId).build();
+ }
+
+ /**
+ * Return the znode path store all the subscribers of a topic.
+ *
+ * @param sb
+ * String builder to hold the znode path
+ * @param topic
+ * Topic name
+ */
+ private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString topic) {
+ return serverConf.getZkTopicPath(sb, topic).append("/subscribers");
+ }
+
+ /**
+ * Return the znode path of a subscriber of a topic.
+ *
+ * @param topic
+ * Topic name
+ * @param subscriber
+ * Subscriber name
+ */
+
+ private String topicSubscriberPath(ByteString topic, ByteString subscriber) {
+ return topicSubscribersPath(new StringBuilder(), topic).append("/")
+ .append(subscriber.toStringUtf8()).toString();
+ }
+
+ /**
+ * Return subscriptions of a topic
+ *
+ * @param topic
+ * Topic name
+ * @return subscriptions of a topic
+ * @throws Exception
+ */
+ public Map<ByteString, SubscriptionState> getTopicSubscriptions(ByteString topic)
+ throws Exception {
+ Map<ByteString, SubscriptionState> states =
+ new HashMap<ByteString, SubscriptionState>();
+ try {
+ String subsPath = topicSubscribersPath(new StringBuilder(), topic).toString();
+ List<String> children = zk.getChildren(subsPath, false);
+ for (String child : children) {
+ ByteString subscriberId = ByteString.copyFromUtf8(child);
+ String subPath = topicSubscriberPath(topic, subscriberId);
+ Stat stat = new Stat();
+ byte[] subData = zk.getData(subPath, false, stat);
+ if (null == subData) {
+ continue;
+ }
+ SubscriptionState state = SubscriptionState.parseFrom(subData);
+ states.put(subscriberId, state);
+ }
+ } catch (KeeperException.NoNodeException nne) {
+ }
+ return states;
+ }
+
+ /**
+ * Return subscription state of a subscriber of topic
+ *
+ * @param topic
+ * Topic name
+ * @param subscriber
+ * Subscriber name
+ * @return subscription state
+ * @throws Exception
+ */
+ public SubscriptionState getSubscription(ByteString topic, ByteString subscriber) throws Exception {
+ String subPath = topicSubscriberPath(topic, subscriber);
+ Stat stat = new Stat();
+ byte[] subData = null;
+ try {
+ subData = zk.getData(subPath, false, stat);
+ } catch (KeeperException.NoNodeException nne) {
+ }
+ if (null == subData) {
+ return null;
+ }
+ return SubscriptionState.parseFrom(subData);
+ }
+}
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java?rev=1241858&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java Wed Feb 8 10:49:10 2012
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.admin.console;
+
+import java.util.Map;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.LinkedHashMap;
+
+/**
+ * List all the available commands
+ */
+public final class HedwigCommands {
+
+ static final String[] EMPTY_ARRAY = new String[0];
+
+ //
+ // List all commands used to play with hedwig
+ //
+
+ /* PUB : publish a message to hedwig */
+ static final String PUB = "pub";
+ static final String PUB_DESC = "Publish a message to a topic in Hedwig";
+ static final String[] PUB_USAGE = new String[] {
+ "usage: pub {topic} {message}",
+ "",
+ " {topic} : topic name.",
+ " any printable string without spaces.",
+ " {message} : message body.",
+ " remaining arguments are used as message body to publish.",
+ };
+
+ /* SUB : subscriber a topic in hedwig for a specified subscriber */
+ static final String SUB = "sub";
+ static final String SUB_DESC = "Subscribe a topic for a specified subscriber";
+ static final String[] SUB_USAGE = new String[] {
+ "usage: sub {topic} {subscriber} [mode]",
+ "",
+ " {topic} : topic name.",
+ " any printable string without spaces.",
+ " {subscriber} : subscriber id.",
+ " any printable string without spaces.",
+ " [mode] : mode to create subscription.",
+ " [receive] : bool. whether to start delivery to receive messages.",
+ "",
+ " available modes: (default value is 1)",
+ " 0 = CREATE: create subscription.",
+ " if the subscription is exsited, it will fail.",
+ " 1 = ATTACH: attach to exsited subscription.",
+ " if the subscription is not existed, it will faile.",
+ " 2 = CREATE_OR_ATTACH:",
+ " attach to subscription, if not existed create one."
+ };
+
+ /* CLOSESUB : close the subscription of a subscriber for a topic */
+ static final String CLOSESUB = "closesub";
+ static final String CLOSESUB_DESC = "Close subscription of a subscriber to a specified topic";
+ static final String[] CLOSESUB_USAGE = new String[] {
+ "usage: closesub {topic} {subscriber}",
+ "",
+ " {topic} : topic name.",
+ " any printable string without spaces.",
+ " {subscriber} : subscriber id.",
+ " any printable string without spaces.",
+ "",
+ " NOTE: this command just cleanup subscription states on client side.",
+ " You can try UNSUB to clean subscription states on server side.",
+ };
+
+ /* UNSUB: unsubscribe of a subscriber to a topic */
+ static final String UNSUB = "unsub";
+ static final String UNSUB_DESC = "Unsubscribe a topic for a subscriber";
+ static final String[] UNSUB_USAGE = new String[] {
+ "usage: unsub {topic} {subscriber}",
+ "",
+ " {topic} : topic name.",
+ " any printable string without spaces.",
+ " {subscriber} : subscriber id.",
+ " any printable string without spaces.",
+ "",
+ " NOTE: this command will cleanup subscription states on server side.",
+ " You can try CLOSESUB to just clean subscription states on client side.",
+ };
+
+ static final String RMSUB = "rmsub";
+ static final String RMSUB_DESC = "Remove subscriptions for topics";
+ static final String[] RMSUB_USAGE = new String[] {
+ "usage: rmsub {topic_prefix} {start_topic} {end_topic} {subscriber_prefix} {start_sub} {end_sub}",
+ "",
+ " {topic_prefix} : topic prefix.",
+ " {start_topic} : start topic id.",
+ " {end_topic} : end topic id.",
+ " {subscriber_prefix} : subscriber prefix.",
+ " {start_sub} : start subscriber id.",
+ " {end_sub} : end subscriber id.",
+ };
+
+ /* CONSUME: move consume ptr of a subscription with specified steps */
+ static final String CONSUME = "consume";
+ static final String CONSUME_DESC = "Move consume ptr of a subscription with sepcified steps";
+ static final String[] CONSUME_USAGE = new String[] {
+ "usage: consume {topic} {subscriber} {nmsgs}",
+ "",
+ " {topic} : topic name.",
+ " any printable string without spaces.",
+ " {subscriber} : subscriber id.",
+ " any printable string without spaces.",
+ " {nmsgs} : how many messages to move consume ptr.",
+ "",
+ " Example:",
+ " suppose, from zk we know subscriber B consumed topic T to message 10",
+ " [hedwig: (standalone) 1] consume T B 2",
+ " after executed above command, a consume(10+2) request will be sent to hedwig.",
+ "",
+ " NOTE:",
+ " since Hedwig updates subscription consume ptr lazily, so you need to know that",
+ " 1) the consumption ptr read from zookeeper may be stable; ",
+ " 2) after sent the consume request, hedwig may just move ptr in its memory and lazily update it to zookeeper. you may not see the ptr changed when DESCRIBE the topic.",
+ };
+
+ /* CONSUMETO: move consume ptr of a subscription to a specified pos */
+ static final String CONSUMETO = "consumeto";
+ static final String CONSUMETO_DESC = "Move consume ptr of a subscription to a specified message id";
+ static final String[] CONSUMETO_USAGE = new String[] {
+ "usage: consumeto {topic} {subscriber} {msg_id}",
+ "",
+ " {topic} : topic name.",
+ " any printable string without spaces.",
+ " {subscriber} : subscriber id.",
+ " any printable string without spaces.",
+ " {msg_id} : message id that consume ptr will be moved to.",
+ " if the message id is less than current consume ptr,",
+ " hedwig will do nothing.",
+ "",
+ " Example:",
+ " suppose, from zk we know subscriber B consumed topic T to message 10",
+ " [hedwig: (standalone) 1] consumeto T B 12",
+ " after executed above command, a consume(12) request will be sent to hedwig.",
+ "",
+ " NOTE:",
+ " since Hedwig updates subscription consume ptr lazily, so you need to know that",
+ " 1) the consumption ptr read from zookeeper may be stable; ",
+ " 2) after sent the consume request, hedwig may just move ptr in its memory and lazily update it to zookeeper. you may not see the ptr changed when DESCRIBE the topic.",
+ };
+
+ /* PUBSUB: a healthy checking command to ensure cluster is running */
+ static final String PUBSUB = "pubsub";
+ static final String PUBSUB_DESC = "A healthy checking command to ensure hedwig is in running state";
+ static final String[] PUBSUB_USAGE = new String[] {
+ "usage: pubsub {topic} {subscriber} {timeout_secs} {message}",
+ "",
+ " {topic} : topic name.",
+ " any printable string without spaces.",
+ " {subscriber} : subscriber id.",
+ " any printable string without spaces.",
+ " {timeout_secs} : how long will the subscriber wait for published message.",
+ " {message} : message body.",
+ " remaining arguments are used as message body to publish.",
+ "",
+ " Example:",
+ " [hedwig: (standalone) 1] pubsub TOPIC SUBID 10 TEST_MESSAGS",
+ "",
+ " 1) hw will subscribe topic TOPIC as subscriber SUBID;",
+ " 2) subscriber SUBID will wait a message until 10 seconds;",
+ " 3) hw publishes TEST_MESSAGES to topic TOPIC;",
+ " 4) if subscriber recevied message in 10 secs, it checked that whether the message is published message.",
+ " if true, it will return SUCCESS, otherwise return FAILED.",
+ };
+
+ //
+ // List all commands used to admin hedwig
+ //
+
+ /* SHOW: list all available hub servers or topics */
+ static final String SHOW = "show";
+ static final String SHOW_DESC = "list all available hub servers or topics";
+ static final String[] SHOW_USAGE = new String[] {
+ "usage: show [topics | hubs]",
+ "",
+ " show topics :",
+ " listing all available topics in hedwig.",
+ "",
+ " show hubs :",
+ " listing all available hubs in hedwig.",
+ "",
+ " NOTES:",
+ " 'show topics' will not works when there are millions of topics in hedwig, since we have packetLen limitation fetching data from zookeeper.",
+ };
+
+ static final String SHOW_TOPICS = "topics";
+ static final String SHOW_HUBS = "hubs";
+
+ /* DESCRIBE: show the metadata of a topic */
+ static final String DESCRIBE = "describe";
+ static final String DESCRIBE_DESC = "show metadata of a topic, including topic owner, persistence info, subscriptions info";
+ static final String[] DESCRIBE_USAGE = new String[] {
+ "usage: describe topic {topic}",
+ "",
+ " {topic} : topic name.",
+ " any printable string without spaces.",
+ "",
+ " Example: describe topic ttttt",
+ "",
+ " Output:",
+ " ===== Topic Information : ttttt =====",
+ "",
+ " Owner : 98.137.99.27:9875:9876",
+ "",
+ " >>> Persistence Info <<<",
+ " Ledger 54729 [ 1 ~ 59 ]",
+ " Ledger 54731 [ 60 ~ 60 ]",
+ " Ledger 54733 [ 61 ~ 61 ]",
+ "",
+ " >>> Subscription Info <<<",
+ " Subscriber mysub : consumeSeqId: local:50",
+ };
+
+ static final String DESCRIBE_TOPIC = "topic";
+
+ /* READTOPIC: read messages of a specified topic */
+ static final String READTOPIC = "readtopic";
+ static final String READTOPIC_DESC = "read messages of a specified topic";
+ static final String[] READTOPIC_USAGE = new String[] {
+ "usage: readtopic {topic} [start_msg_id]",
+ "",
+ " {topic} : topic name.",
+ " any printable string without spaces.",
+ " [start_msg_id] : message id that start to read from.",
+ "",
+ " no start_msg_id provided:",
+ " it will start from least_consumed_message_id + 1.",
+ " least_consume_message_id is computed from all its subscribers.",
+ "",
+ " start_msg_id provided:",
+ " it will start from MAX(start_msg_id, least_consumed_message_id).",
+ "",
+ " MESSAGE FORMAT:",
+ "",
+ " ---------- MSGID=LOCAL(51) ----------",
+ " MsgId: LOCAL(51)",
+ " SrcRegion: standalone",
+ " Message:",
+ "",
+ " hello",
+ };
+
+ //
+ // List other useful commands
+ //
+
+ /* SET: set whether printing zk watches or not */
+ static final String SET = "set";
+ static final String SET_DESC = "set whether printing zk watches or not";
+ static final String[] SET_USAGE = EMPTY_ARRAY;
+
+ /* HISTORY: list history commands */
+ static final String HISTORY = "history";
+ static final String HISTORY_DESC = "list history commands";
+ static final String[] HISTORY_USAGE = EMPTY_ARRAY;
+
+ /* REDO: redo previous command */
+ static final String REDO = "redo";
+ static final String REDO_DESC = "redo history command";
+ static final String[] REDO_USAGE = new String[] {
+ "usage: redo [{cmdno} | !]",
+ "",
+ " {cmdno} : history command no.",
+ " ! : last command.",
+ };
+
+ /* HELP: print usage information of a specified command */
+ static final String HELP = "help";
+ static final String HELP_DESC = "print usage information of a specified command";
+ static final String[] HELP_USAGE = new String[] {
+ "usage: help {command}",
+ "",
+ " {command} : command name",
+ };
+
+ static final String QUIT = "quit";
+ static final String QUIT_DESC = "exit console";
+ static final String[] QUIT_USAGE = EMPTY_ARRAY;
+
+ static final String EXIT = "exit";
+ static final String EXIT_DESC = QUIT_DESC;
+ static final String[] EXIT_USAGE = EMPTY_ARRAY;
+
+ public static enum COMMAND {
+
+ CMD_PUB (PUB, PUB_DESC, PUB_USAGE),
+ CMD_SUB (SUB, SUB_DESC, SUB_USAGE),
+ CMD_CLOSESUB (CLOSESUB, CLOSESUB_DESC, CLOSESUB_USAGE),
+ CMD_UNSUB (UNSUB, UNSUB_DESC, UNSUB_USAGE),
+ CMD_RMSUB (RMSUB, RMSUB_DESC, RMSUB_USAGE),
+ CMD_CONSUME (CONSUME, CONSUME_DESC, CONSUME_USAGE),
+ CMD_CONSUMETO (CONSUMETO, CONSUMETO_DESC, CONSUMETO_USAGE),
+ CMD_PUBSUB (PUBSUB, PUBSUB_DESC, PUBSUB_USAGE),
+ CMD_SHOW (SHOW, SHOW_DESC, SHOW_USAGE),
+ CMD_DESCRIBE (DESCRIBE, DESCRIBE_DESC, DESCRIBE_USAGE),
+ CMD_READTOPIC (READTOPIC, READTOPIC_DESC, READTOPIC_USAGE),
+ CMD_SET (SET, SET_DESC, SET_USAGE),
+ CMD_HISTORY (HISTORY, HISTORY_DESC, HISTORY_USAGE),
+ CMD_REDO (REDO, REDO_DESC, REDO_USAGE),
+ CMD_HELP (HELP, HELP_DESC, HELP_USAGE),
+ CMD_QUIT (QUIT, QUIT_DESC, QUIT_USAGE),
+ CMD_EXIT (EXIT, EXIT_DESC, EXIT_USAGE),
+ // sub commands
+ CMD_SHOW_TOPICS (SHOW_TOPICS, "", EMPTY_ARRAY),
+ CMD_SHOW_HUBS (SHOW_HUBS, "", EMPTY_ARRAY),
+ CMD_DESCRIBE_TOPIC (DESCRIBE_TOPIC, "", EMPTY_ARRAY);
+
+ COMMAND(String name, String desc, String[] usage) {
+ this.name = name;
+ this.desc = desc;
+ this.usage = usage;
+ this.subCmds = new LinkedHashMap<String, COMMAND>();
+ }
+
+ public String getName() { return name; }
+
+ public String getDescription() { return desc; }
+
+ public String[] getUsage() { return usage; }
+
+ public Map<String, COMMAND> getSubCommands() { return subCmds; }
+
+ public void addSubCommand(COMMAND c) {
+ this.subCmds.put(c.name, c);
+ };
+
+ public void printUsage() {
+ System.err.println(name + ": " + desc);
+ for(String line : usage) {
+ System.err.println(line);
+ }
+ System.err.println();
+ }
+
+ protected String name;
+ protected String desc;
+ protected String[] usage;
+ protected Map<String, COMMAND> subCmds;
+ }
+
+ static Map<String, COMMAND> commands = null;
+
+ private static void addCommand(COMMAND c) {
+ commands.put(c.getName(), c);
+ }
+
+ static {
+ commands = new LinkedHashMap<String, COMMAND>();
+
+ addCommand(COMMAND.CMD_PUB);
+ addCommand(COMMAND.CMD_SUB);
+ addCommand(COMMAND.CMD_CLOSESUB);
+ addCommand(COMMAND.CMD_UNSUB);
+ addCommand(COMMAND.CMD_RMSUB);
+ addCommand(COMMAND.CMD_CONSUME);
+ addCommand(COMMAND.CMD_CONSUMETO);
+ addCommand(COMMAND.CMD_PUBSUB);
+
+ // show
+ COMMAND.CMD_SHOW.addSubCommand(COMMAND.CMD_SHOW_TOPICS);
+ COMMAND.CMD_SHOW.addSubCommand(COMMAND.CMD_SHOW_HUBS);
+ addCommand(COMMAND.CMD_SHOW);
+
+ // describe
+ COMMAND.CMD_DESCRIBE.addSubCommand(COMMAND.CMD_DESCRIBE_TOPIC);
+ addCommand(COMMAND.CMD_DESCRIBE);
+
+ addCommand(COMMAND.CMD_READTOPIC);
+ addCommand(COMMAND.CMD_SET);
+ addCommand(COMMAND.CMD_HISTORY);
+ addCommand(COMMAND.CMD_REDO);
+ addCommand(COMMAND.CMD_HELP);
+ addCommand(COMMAND.CMD_QUIT);
+ addCommand(COMMAND.CMD_EXIT);
+ }
+
+ public static Map<String, COMMAND> getHedwigCommands() {
+ return commands;
+ }
+
+ /**
+ * Find candidate commands by the specified token list
+ *
+ * @param token token list
+ *
+ * @return list of candidate commands
+ */
+ public static List<String> findCandidateCommands(String[] tokens) {
+ List<String> cmds = new LinkedList<String>();
+
+ Map<String, COMMAND> cmdMap = commands;
+ for (int i=0; i<(tokens.length - 1); i++) {
+ COMMAND c = cmdMap.get(tokens[i]);
+ // no commands
+ if (c == null || c.getSubCommands().size() <= 0) {
+ return cmds;
+ } else {
+ cmdMap = c.getSubCommands();
+ }
+ }
+ cmds.addAll(cmdMap.keySet());
+ return cmds;
+ }
+}
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java?rev=1241858&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java Wed Feb 8 10:49:10 2012
@@ -0,0 +1,1029 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.admin.console;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.hedwig.admin.HedwigAdmin;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
+import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import static org.apache.hedwig.admin.console.HedwigCommands.*;
+import static org.apache.hedwig.admin.console.HedwigCommands.COMMAND.*;
+
+/**
+ * Console Client to Hedwig
+ */
+public class HedwigConsole {
+ private static final Logger LOG = LoggerFactory.getLogger(HedwigConsole.class);
+ // NOTE: now it is fixed passwd in bookkeeper
+ static byte[] passwd = "sillysecret".getBytes();
+
+ // history file name
+ static final String HW_HISTORY_FILE = ".hw_history";
+
+ protected MyCommandOptions cl = new MyCommandOptions();
+ protected HashMap<Integer, String> history = new LinkedHashMap<Integer, String>();
+ protected int commandCount = 0;
+ protected boolean printWatches = true;
+ protected Map<String, MyCommand> myCommands;
+
+ protected boolean inConsole = true;
+
+ protected HedwigAdmin admin;
+ protected HedwigClient hubClient;
+ protected Publisher publisher;
+ protected Subscriber subscriber;
+ protected ConsoleMessageHandler consoleHandler =
+ new ConsoleMessageHandler();
+
+ protected String myRegion;
+
+ interface MyCommand {
+ boolean runCmd(String[] args) throws Exception;
+ }
+
+ class HelpCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ boolean printUsage = true;
+ if (args.length >= 2) {
+ String command = args[1];
+ COMMAND c = getHedwigCommands().get(command);
+ if (c != null) {
+ c.printUsage();
+ printUsage = false;
+ }
+ }
+ if (printUsage) {
+ usage();
+ }
+ return true;
+ }
+ }
+
+ class ExitCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ printMessage("Quitting ...");
+ hubClient.close();
+ admin.close();
+ System.exit(0);
+ return true;
+ }
+ }
+
+ class RedoCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ if (args.length < 2) {
+ return false;
+ }
+
+ int index;
+ if ("!".equals(args[1])) {
+ index = commandCount - 1;
+ } else {
+ index = Integer.decode(args[1]);
+ if (commandCount <= index) {
+ System.err.println("Command index out of range");
+ return false;
+ }
+ }
+ cl.parseCommand(history.get(index));
+ if (cl.getCommand().equals("redo")) {
+ System.err.println("No redoing redos");
+ return false;
+ }
+ history.put(commandCount, history.get(index));
+ processCmd(cl);
+ return true;
+ }
+
+ }
+
+ class HistoryCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ for (int i=commandCount - 10; i<=commandCount; ++i) {
+ if (i < 0) {
+ continue;
+ }
+ System.out.println(i + " - " + history.get(i));
+ }
+ return true;
+ }
+
+ }
+
+ class SetCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ if (args.length < 3 || !"printwatches".equals(args[1])) {
+ return false;
+ } else if (args.length == 2) {
+ System.out.println("printwatches is " + (printWatches ? "on" : "off"));
+ } else {
+ printWatches = args[2].equals("on");
+ }
+ return true;
+ }
+
+ }
+
+ class PubCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ if (args.length < 3) {
+ return false;
+ }
+ ByteString topic = ByteString.copyFromUtf8(args[1]);
+
+ StringBuilder sb = new StringBuilder();
+ for (int i=2; i<args.length; i++) {
+ sb.append(args[i]);
+ if (i != args.length - 1) {
+ sb.append(' ');
+ }
+ }
+ ByteString msgBody = ByteString.copyFromUtf8(sb.toString());
+ Message msg = Message.newBuilder().setBody(msgBody).build();
+ try {
+ publisher.publish(topic, msg);
+ System.out.println("PUB DONE");
+ } catch (Exception e) {
+ System.err.println("PUB FAILED");
+ e.printStackTrace();
+ }
+ return true;
+ }
+
+ }
+
+ class ConsoleMessageHandler implements MessageHandler {
+
+ @Override
+ public void deliver(ByteString topic, ByteString subscriberId,
+ Message msg, Callback<Void> callback, Object context) {
+ System.out.println("Received message from topic " + topic.toStringUtf8() +
+ " for subscriber " + subscriberId.toStringUtf8() + " : "
+ + msg.getBody().toStringUtf8());
+ callback.operationFinished(context, null);
+ }
+
+ }
+
+ class SubCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ CreateOrAttach mode;
+ boolean receive = true;
+ if (args.length < 3) {
+ return false;
+ } else if (args.length == 3) {
+ mode = CreateOrAttach.ATTACH;
+ receive = true;
+ } else {
+ try {
+ mode = CreateOrAttach.valueOf(Integer.parseInt(args[3]));
+ } catch (Exception e) {
+ System.err.println("Unknow mode : " + args[3]);
+ return false;
+ }
+ if (args.length >= 5) {
+ try {
+ receive = Boolean.parseBoolean(args[4]);
+ } catch (Exception e) {
+ receive = false;
+ }
+ }
+ }
+ if (mode == null) {
+ System.err.println("Unknow mode : " + args[3]);
+ return false;
+ }
+ ByteString topic = ByteString.copyFromUtf8(args[1]);
+ ByteString subId = ByteString.copyFromUtf8(args[2]);
+ try {
+ subscriber.subscribe(topic, subId, mode);
+ if (receive) {
+ subscriber.startDelivery(topic, subId, consoleHandler);
+ System.out.println("SUB DONE AND RECEIVE");
+ } else {
+ System.out.println("SUB DONE BUT NOT RECEIVE");
+ }
+ } catch (Exception e) {
+ System.err.println("SUB FAILED");
+ e.printStackTrace();
+ }
+ return true;
+ }
+ }
+
+ class UnsubCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ if (args.length < 3) {
+ return false;
+ }
+ ByteString topic = ByteString.copyFromUtf8(args[1]);
+ ByteString subId = ByteString.copyFromUtf8(args[2]);
+ try {
+ subscriber.stopDelivery(topic, subId);
+ subscriber.unsubscribe(topic, subId);
+ System.out.println("UNSUB DONE");
+ } catch (Exception e) {
+ System.err.println("UNSUB FAILED");
+ e.printStackTrace();
+ }
+ return true;
+ }
+
+ }
+
+ class RmsubCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ if (args.length < 7) {
+ return false;
+ }
+ String topicPrefix = args[1];
+ int startTopic = Integer.parseInt(args[2]);
+ int endTopic = Integer.parseInt(args[3]);
+ String subPrefix = args[4];
+ int startSub = Integer.parseInt(args[5]);
+ int endSub = Integer.parseInt(args[6]);
+ if (startTopic > endTopic || endSub < startSub) {
+ return false;
+ }
+ for (int i=startTopic; i<=endTopic; i++) {
+ ByteString topic = ByteString.copyFromUtf8(topicPrefix + i);
+ try {
+ for (int j=startSub; j<=endSub; j++) {
+ ByteString sub = ByteString.copyFromUtf8(subPrefix + j);
+ subscriber.subscribe(topic, sub, CreateOrAttach.CREATE_OR_ATTACH);
+ subscriber.unsubscribe(topic, sub);
+ }
+ System.out.println("RMSUB " + topic.toStringUtf8() + " DONE");
+ } catch (Exception e) {
+ System.err.println("RMSUB " + topic.toStringUtf8() + " FAILED");
+ e.printStackTrace();
+ }
+ }
+ return true;
+ }
+
+ }
+
+ class CloseSubscriptionCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ if (args.length < 3) {
+ return false;
+ }
+ ByteString topic = ByteString.copyFromUtf8(args[1]);
+ ByteString sudId = ByteString.copyFromUtf8(args[2]);
+
+ try {
+ subscriber.stopDelivery(topic, sudId);
+ subscriber.closeSubscription(topic, sudId);
+ } catch (Exception e) {
+ System.err.println("CLOSESUB FAILED");
+ }
+ return true;
+ }
+
+ }
+
+ class ConsumeToCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ if (args.length < 4) {
+ return false;
+ }
+ ByteString topic = ByteString.copyFromUtf8(args[1]);
+ ByteString subId = ByteString.copyFromUtf8(args[2]);
+ long msgId = Long.parseLong(args[3]);
+ MessageSeqId consumeId = MessageSeqId.newBuilder().setLocalComponent(msgId).build();
+ try {
+ subscriber.consume(topic, subId, consumeId);
+ } catch (Exception e) {
+ System.err.println("CONSUMETO FAILED");
+ }
+ return true;
+ }
+
+ }
+
+ class ConsumeCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ if (args.length < 4) {
+ return false;
+ }
+ long lastConsumedId = 0;
+ SubscriptionState state = admin.getSubscription(ByteString.copyFromUtf8(args[1]), ByteString.copyFromUtf8(args[2]));
+ if (null == state) {
+ System.err.println("Failed to read subscription for topic: " + args[1]
+ + " subscriber: " + args[2]);
+ return true;
+ }
+ long numMessagesToConsume = Long.parseLong(args[3]);
+ long idToConsumed = lastConsumedId + numMessagesToConsume;
+ System.out.println("Try to move subscriber(" + args[2] + ") consume ptr of topic(" + args[1]
+ + ") from " + lastConsumedId + " to " + idToConsumed);
+ MessageSeqId consumeId = MessageSeqId.newBuilder().setLocalComponent(idToConsumed).build();
+ ByteString topic = ByteString.copyFromUtf8(args[1]);
+ ByteString subId = ByteString.copyFromUtf8(args[2]);
+ try {
+ subscriber.consume(topic, subId, consumeId);
+ } catch (Exception e) {
+ System.err.println("CONSUME FAILED");
+ }
+ return true;
+ }
+
+ }
+
+ class PubSubCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ if (args.length < 5) {
+ return false;
+ }
+ final long startTime = System.currentTimeMillis();
+
+ final ByteString topic = ByteString.copyFromUtf8(args[1]);
+ final ByteString subId = ByteString.copyFromUtf8(args[2] + "-" + startTime);
+ int timeoutSecs = 60;
+ try {
+ timeoutSecs = Integer.parseInt(args[3]);
+ } catch (NumberFormatException nfe) {
+ }
+
+ StringBuilder sb = new StringBuilder();
+ for (int i=4; i<args.length; i++) {
+ sb.append(args[i]);
+ if (i != args.length - 1) {
+ sb.append(' ');
+ }
+ }
+ // append a timestamp tag
+ ByteString msgBody = ByteString.copyFromUtf8(sb.toString() + "-" + startTime);
+ final Message msg = Message.newBuilder().setBody(msgBody).build();
+
+ boolean subscribed = false;
+ boolean success = false;
+ final AtomicBoolean isDone = new AtomicBoolean(false);
+ long elapsedTime = 0L;
+
+ System.out.println("Starting PUBSUB test ...");
+ try {
+ // sub the topic
+ subscriber.subscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH);
+ subscribed = true;
+
+ System.out.println("Sub topic " + topic.toStringUtf8() + ", subscriber id " + subId.toStringUtf8());
+
+
+
+ // pub topic
+ publisher.publish(topic, msg);
+ System.out.println("Pub topic " + topic.toStringUtf8() + " : " + msg.getBody().toStringUtf8());
+
+ // ensure subscriber first, publish next, then we start delivery to receive message
+ // if start delivery first before publish, isDone may notify before wait
+ subscriber.startDelivery(topic, subId, new MessageHandler() {
+
+ @Override
+ public void deliver(ByteString thisTopic, ByteString subscriberId,
+ Message message, Callback<Void> callback, Object context) {
+ if (thisTopic.equals(topic) && subscriberId.equals(subId) &&
+ msg.getBody().equals(message.getBody())) {
+ System.out.println("Received message : " + message.getBody().toStringUtf8());
+ synchronized(isDone) {
+ isDone.set(true);
+ isDone.notify();
+ }
+ }
+ callback.operationFinished(context, null);
+ }
+
+ });
+
+ // wait for the message
+ synchronized (isDone) {
+ isDone.wait(timeoutSecs * 1000L);
+ }
+ success = isDone.get();
+ elapsedTime = System.currentTimeMillis() - startTime;
+ } finally {
+ try {
+ if (subscribed) {
+ subscriber.stopDelivery(topic, subId);
+ subscriber.unsubscribe(topic, subId);
+ }
+ } finally {
+ if (success) {
+ System.out.println("PUBSUB SUCCESS. TIME: " + elapsedTime + " MS");
+ } else {
+ System.out.println("PUBSUB FAILED. ");
+ }
+ return success;
+ }
+ }
+ }
+
+ }
+
+ class ReadTopicCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ if (args.length < 2) {
+ return false;
+ }
+ ReadTopic rt;
+ ByteString topic = ByteString.copyFromUtf8(args[1]);
+ if (args.length == 2) {
+ rt = new ReadTopic(admin, topic, inConsole);
+ } else {
+ rt = new ReadTopic(admin, topic, Long.parseLong(args[2]), inConsole);
+ }
+ rt.readTopic();
+ return true;
+ }
+
+ }
+
+ class ShowCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ if (args.length < 2) {
+ return false;
+ }
+ String errorMsg = null;
+ try {
+ if (HedwigCommands.SHOW_HUBS.equals(args[1])) {
+ errorMsg = "Unable to fetch the list of hub servers";
+ showHubs();
+ } else if (HedwigCommands.SHOW_TOPICS.equals(args[1])) {
+ errorMsg = "Unable to fetch the list of topics";
+ showTopics();
+ } else {
+ System.err.println("ERROR: Unknown show command '" + args[1] + "'");
+ return false;
+ }
+ } catch (Exception e) {
+ if (null != errorMsg) {
+ System.err.println(errorMsg);
+ }
+ e.printStackTrace();
+ }
+ return true;
+ }
+
+ protected void showHubs() throws Exception {
+ Map<HedwigSocketAddress, Integer> hubs = admin.getAvailableHubs();
+ System.out.println("Available Hub Servers:");
+ for (Map.Entry<HedwigSocketAddress, Integer> entry : hubs.entrySet()) {
+ System.out.println("\t" + entry.getKey() + " :\t" + entry.getValue());
+ }
+ }
+
+ protected void showTopics() throws Exception {
+ List<String> topics = admin.getTopics();
+ System.out.println("Topic List:");
+ System.out.println(topics);
+ }
+
+ }
+
+ class DescribeCmd implements MyCommand {
+
+ @Override
+ public boolean runCmd(String[] args) throws Exception {
+ if (args.length < 3) {
+ return false;
+ }
+ if (HedwigCommands.DESCRIBE_TOPIC.equals(args[1])) {
+ return describeTopic(args[2]);
+ } else {
+ return false;
+ }
+ }
+
+ protected boolean describeTopic(String topic) throws Exception {
+ ByteString btopic = ByteString.copyFromUtf8(topic);
+ HedwigSocketAddress owner = admin.getTopicOwner(btopic);
+ List<LedgerRange> ranges = admin.getTopicLedgers(btopic);
+ Map<ByteString, SubscriptionState> states = admin.getTopicSubscriptions(btopic);
+
+ System.out.println("===== Topic Information : " + topic + " =====");
+ System.out.println();
+ System.out.println("Owner : " + (owner == null ? "NULL" : owner));
+ System.out.println();
+
+ // print ledgers
+ printTopicLedgers(ranges);
+ // print subscriptions
+ printTopicSubscriptions(states);
+
+ return true;
+ }
+
+ private void printTopicLedgers(List<LedgerRange> lrs) {
+ System.out.println(">>> Persistence Info <<<");
+ if (null == lrs) {
+ System.out.println("N/A");
+ return;
+ }
+ if (lrs.isEmpty()) {
+ System.out.println("No Ledger used.");
+ return;
+ }
+ Iterator<LedgerRange> lrIterator = lrs.iterator();
+ long startOfLedger = 1;
+ while (lrIterator.hasNext()) {
+ LedgerRange range = lrIterator.next();
+ long endOfLedger = Long.MAX_VALUE;
+ if (range.hasEndSeqIdIncluded()) {
+ endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
+ }
+ System.out.println("Ledger " + range.getLedgerId() + " [ " + startOfLedger + " ~ " + (endOfLedger == Long.MAX_VALUE ? "" : endOfLedger) + " ]");
+
+ startOfLedger = endOfLedger + 1;
+ }
+ System.out.println();
+ }
+
+ private void printTopicSubscriptions(Map<ByteString, SubscriptionState> states) {
+ System.out.println(">>> Subscription Info <<<");
+ if (0 == states.size()) {
+ System.out.println("No subscriber.");
+ return;
+ }
+ for (Map.Entry<ByteString, SubscriptionState> entry : states.entrySet()) {
+ System.out.println("Subscriber " + entry.getKey().toStringUtf8() + " : "
+ + SubscriptionStateUtils.toString(entry.getValue()));
+ }
+ System.out.println();
+ }
+
+ }
+
+ protected Map<String, MyCommand> buildMyCommands() {
+ Map<String, MyCommand> cmds =
+ new HashMap<String, MyCommand>();
+
+ ExitCmd exitCmd = new ExitCmd();
+ cmds.put(EXIT, exitCmd);
+ cmds.put(QUIT, exitCmd);
+ cmds.put(HELP, new HelpCmd());
+ cmds.put(HISTORY, new HistoryCmd());
+ cmds.put(REDO, new RedoCmd());
+ cmds.put(SET, new SetCmd());
+ cmds.put(PUB, new PubCmd());
+ cmds.put(SUB, new SubCmd());
+ cmds.put(PUBSUB, new PubSubCmd());
+ cmds.put(CLOSESUB, new CloseSubscriptionCmd());
+ cmds.put(UNSUB, new UnsubCmd());
+ cmds.put(RMSUB, new RmsubCmd());
+ cmds.put(CONSUME, new ConsumeCmd());
+ cmds.put(CONSUMETO, new ConsumeToCmd());
+ cmds.put(SHOW, new ShowCmd());
+ cmds.put(DESCRIBE, new DescribeCmd());
+ cmds.put(READTOPIC, new ReadTopicCmd());
+
+ return cmds;
+ }
+
+ static void usage() {
+ System.err.println("HedwigConsole [options] [command] [args]");
+ System.err.println();
+ System.err.println("Avaiable commands:");
+ for (String cmd : getHedwigCommands().keySet()) {
+ System.err.println("\t" + cmd);
+ }
+ System.err.println();
+ }
+
+ /**
+ * A storage class for both command line options and shell commands.
+ */
+ static private class MyCommandOptions {
+
+ private Map<String,String> options = new HashMap<String,String>();
+ private List<String> cmdArgs = null;
+ private String command = null;
+
+ public MyCommandOptions() {
+ }
+
+ public String getOption(String opt) {
+ return options.get(opt);
+ }
+
+ public String getCommand( ) {
+ return command;
+ }
+
+ public String getCmdArgument( int index ) {
+ return cmdArgs.get(index);
+ }
+
+ public int getNumArguments( ) {
+ return cmdArgs.size();
+ }
+
+ public String[] getArgArray() {
+ return cmdArgs.toArray(new String[0]);
+ }
+
+ /**
+ * Parses a command line that may contain one or more flags
+ * before an optional command string
+ * @param args command line arguments
+ * @return true if parsing succeeded, false otherwise.
+ */
+ public boolean parseOptions(String[] args) {
+ List<String> argList = Arrays.asList(args);
+ Iterator<String> it = argList.iterator();
+
+ while (it.hasNext()) {
+ String opt = it.next();
+ if (!opt.startsWith("-")) {
+ command = opt;
+ cmdArgs = new ArrayList<String>( );
+ cmdArgs.add( command );
+ while (it.hasNext()) {
+ cmdArgs.add(it.next());
+ }
+ return true;
+ } else {
+ try {
+ options.put(opt.substring(1), it.next());
+ } catch (NoSuchElementException e) {
+ System.err.println("Error: no argument found for option "
+ + opt);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Breaks a string into command + arguments.
+ * @param cmdstring string of form "cmd arg1 arg2..etc"
+ * @return true if parsing succeeded.
+ */
+ public boolean parseCommand( String cmdstring ) {
+ String[] args = cmdstring.split(" ");
+ if (args.length == 0){
+ return false;
+ }
+ command = args[0];
+ cmdArgs = Arrays.asList(args);
+ return true;
+ }
+ }
+
+ private class MyWatcher implements Watcher {
+ public void process(WatchedEvent event) {
+ if (getPrintWatches()) {
+ printMessage("WATCHER::");
+ printMessage(event.toString());
+ }
+ }
+ }
+
+ public void printMessage(String msg) {
+ if (inConsole) {
+ System.out.println("\n"+msg);
+ }
+ }
+
+ /**
+ * Hedwig Console
+ *
+ * @param args arguments
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public HedwigConsole(String[] args) throws IOException, InterruptedException {
+ cl.parseOptions(args);
+
+ if (cl.getCommand() == null) {
+ inConsole = true;
+ } else {
+ inConsole = false;
+ }
+
+ org.apache.bookkeeper.conf.ClientConfiguration bkClientConf =
+ new org.apache.bookkeeper.conf.ClientConfiguration();
+ ServerConfiguration hubServerConf = new ServerConfiguration();
+ String serverCfgFile = cl.getOption("server-cfg");
+ if (serverCfgFile != null) {
+ try {
+ hubServerConf.loadConf(new File(serverCfgFile).toURI().toURL());
+ } catch (ConfigurationException e) {
+ throw new IOException(e);
+ }
+ try {
+ bkClientConf.loadConf(new File(serverCfgFile).toURI().toURL());
+ } catch (ConfigurationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ ClientConfiguration hubClientCfg = new ClientConfiguration();
+ String clientCfgFile = cl.getOption("client-cfg");
+ if (clientCfgFile != null) {
+ try {
+ hubClientCfg.loadConf(new File(clientCfgFile).toURI().toURL());
+ } catch (ConfigurationException e) {
+ throw new IOException(e);
+ }
+ }
+
+
+
+ printMessage("Connecting to zookeeper/bookkeeper using HedwigAdmin");
+ try {
+ admin = new HedwigAdmin(bkClientConf, hubServerConf);
+ admin.getZkHandle().register(new MyWatcher());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ printMessage("Connecting to default hub server " + hubClientCfg.getDefaultServerHost());
+ hubClient = new HedwigClient(hubClientCfg);
+ publisher = hubClient.getPublisher();
+ subscriber = hubClient.getSubscriber();
+
+ // other parameters
+ myRegion = hubServerConf.getMyRegion();
+ }
+
+ public boolean getPrintWatches() {
+ return printWatches;
+ }
+
+ protected String getPrompt() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[hedwig: (").append(myRegion).append(") ").append(commandCount).append("] ");
+ return sb.toString();
+ }
+
+ protected void addToHistory(int i, String cmd) {
+ history.put(i, cmd);
+ }
+
+ public void executeLine(String line) {
+ if (!line.equals("")) {
+ cl.parseCommand(line);
+ addToHistory(commandCount, line);
+ processCmd(cl);
+ commandCount++;
+ }
+ }
+
+ protected boolean processCmd(MyCommandOptions co) {
+ String[] args = co.getArgArray();
+ String cmd = co.getCommand();
+ if (args.length < 1) {
+ usage();
+ return false;
+ }
+ if (!getHedwigCommands().containsKey(cmd)) {
+ usage();
+ return false;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing " + cmd);
+ }
+
+ MyCommand myCommand = myCommands.get(cmd);
+ if (myCommand == null) {
+ System.err.println("No Command Processor found for command " + cmd);
+ usage();
+ return false;
+ }
+
+ long startTime = System.currentTimeMillis();
+ boolean success = false;
+ try {
+ success = myCommand.runCmd(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ success = false;
+ }
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ if (inConsole) {
+ if (success) {
+ System.out.println("Finished " + ((double)elapsedTime / 1000) + " s.");
+ } else {
+ COMMAND c = getHedwigCommands().get(cmd);
+ if (c != null) {
+ c.printUsage();
+ }
+ }
+ }
+ return success;
+ }
+
+ @SuppressWarnings("unchecked")
+ void run() throws IOException {
+ inConsole = true;
+ myCommands = buildMyCommands();
+ if (cl.getCommand() == null) {
+ System.out.println("Welcome to Hedwig!");
+
+ boolean jlinemissing = false;
+ // only use jline if it's in the classpath
+ try {
+ Class consoleC = Class.forName("jline.ConsoleReader");
+ Class completorC =
+ Class.forName("org.apache.hedwig.admin.console.JLineHedwigCompletor");
+
+ System.out.println("JLine support is enabled");
+
+ Object console =
+ consoleC.getConstructor().newInstance();
+
+ Object completor =
+ completorC.getConstructor(HedwigAdmin.class).newInstance(admin);
+ Method addCompletor = consoleC.getMethod("addCompletor",
+ Class.forName("jline.Completor"));
+ addCompletor.invoke(console, completor);
+
+ // load history file
+ boolean historyEnabled = false;
+ Object history = null;
+ Method addHistory = null;
+ // Method flushHistory = null;
+ try {
+ Class historyC = Class.forName("jline.History");
+ history = historyC.getConstructor().newInstance();
+
+ File file = new File(System.getProperty("hw.history",
+ new File(System.getProperty("user.home"), HW_HISTORY_FILE).toString()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("History file is " + file.toString());
+ }
+ Method setHistoryFile = historyC.getMethod("setHistoryFile", File.class);
+ setHistoryFile.invoke(history, file);
+
+ // set history to console reader
+ Method setHistory = consoleC.getMethod("setHistory", historyC);
+ setHistory.invoke(console, history);
+
+ // load history from history file
+ Method moveToFirstEntry = historyC.getMethod("moveToFirstEntry");
+ moveToFirstEntry.invoke(history);
+
+ addHistory = historyC.getMethod("addToHistory", String.class);
+ // flushHistory = historyC.getMethod("flushBuffer");
+
+ Method nextEntry = historyC.getMethod("next");
+ Method current = historyC.getMethod("current");
+ while ((Boolean)(nextEntry.invoke(history))) {
+ String entry = (String)current.invoke(history);
+ if (!entry.equals("")) {
+ addToHistory(commandCount, entry);
+ }
+ commandCount++;
+ }
+
+ historyEnabled = true;
+ System.out.println("JLine history support is enabled");
+ } catch (Exception e) {
+ historyEnabled = false;
+ e.printStackTrace();
+ System.out.println("JLine history support is disabled");
+ }
+
+ String line;
+ Method readLine = consoleC.getMethod("readLine", String.class);
+ while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
+ executeLine(line);
+ if (historyEnabled) {
+ addHistory.invoke(history, line);
+ // flushHistory.invoke(history);
+ }
+ }
+ } catch (ClassNotFoundException e) {
+ LOG.debug("Unable to start jline", e);
+ jlinemissing = true;
+ } catch (NoSuchMethodException e) {
+ LOG.debug("Unable to start jline", e);
+ jlinemissing = true;
+ } catch (InvocationTargetException e) {
+ LOG.debug("Unable to start jline", e);
+ jlinemissing = true;
+ } catch (IllegalAccessException e) {
+ LOG.debug("Unable to start jline", e);
+ jlinemissing = true;
+ } catch (InstantiationException e) {
+ LOG.debug("Unable to start jline", e);
+ jlinemissing = true;
+ }
+
+ if (jlinemissing) {
+ System.out.println("JLine support is disabled");
+ BufferedReader br =
+ new BufferedReader(new InputStreamReader(System.in));
+
+ String line;
+ while ((line = br.readLine()) != null) {
+ executeLine(line);
+ }
+ }
+ }
+
+ inConsole = false;
+ processCmd(cl);
+ try {
+ myCommands.get(EXIT).runCmd(new String[0]);
+ } catch (Exception e) {
+ }
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ HedwigConsole console = new HedwigConsole(args);
+ console.run();
+ }
+}
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java?rev=1241858&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java Wed Feb 8 10:49:10 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.admin.console;
+
+import java.util.List;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.hedwig.admin.HedwigAdmin;
+
+import jline.Completor;
+
+import static org.apache.hedwig.admin.console.HedwigCommands.*;
+
+/**
+ * A jline completor for hedwig console
+ */
+public class JLineHedwigCompletor implements Completor {
+ private HedwigAdmin admin;
+
+ public JLineHedwigCompletor(HedwigAdmin admin) {
+ this.admin = admin;
+ }
+
+ @Override
+ public int complete(String buffer, int cursor, List candidates) {
+ // Guarantee that the final token is the one we're expanding
+ buffer = buffer.substring(0,cursor);
+ String[] tokens = buffer.split(" ");
+ if (buffer.endsWith(" ")) {
+ String[] newTokens = new String[tokens.length + 1];
+ System.arraycopy(tokens, 0, newTokens, 0, tokens.length);
+ newTokens[newTokens.length - 1] = "";
+ tokens = newTokens;
+ }
+
+ if (tokens.length > 2 &&
+ DESCRIBE.equalsIgnoreCase(tokens[0]) &&
+ DESCRIBE_TOPIC.equalsIgnoreCase(tokens[1])) {
+ return completeTopic(buffer, tokens[2], candidates);
+ } else if (tokens.length > 1 &&
+ (SUB.equalsIgnoreCase(tokens[0]) ||
+ PUB.equalsIgnoreCase(tokens[0]) ||
+ CLOSESUB.equalsIgnoreCase(tokens[0]) ||
+ CONSUME.equalsIgnoreCase(tokens[0]) ||
+ CONSUMETO.equalsIgnoreCase(tokens[0]) ||
+ READTOPIC.equalsIgnoreCase(tokens[0]))) {
+ return completeTopic(buffer, tokens[1], candidates);
+ }
+ List<String> cmds = HedwigCommands.findCandidateCommands(tokens);
+ return completeCommand(buffer, tokens[tokens.length - 1], cmds, candidates);
+ }
+
+ private int completeCommand(String buffer, String token,
+ List<String> commands, List<String> candidates) {
+ for (String cmd : commands) {
+ if (cmd.startsWith(token)) {
+ candidates.add(cmd);
+ }
+ }
+ return buffer.lastIndexOf(" ") + 1;
+ }
+
+ private int completeTopic(String buffer, String token, List<String> candidates) {
+ try {
+ List<String> children = admin.getTopics();
+ for (String child : children) {
+ if (child.startsWith(token)) {
+ candidates.add(child);
+ }
+ }
+ } catch (Exception e) {
+ return buffer.length();
+ }
+ return candidates.size() == 0 ? buffer.length() : buffer.lastIndexOf(" ") + 1;
+ }
+}
|