Return-Path: X-Original-To: apmail-bookkeeper-commits-archive@www.apache.org Delivered-To: apmail-bookkeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BA61B19866 for ; Wed, 16 Mar 2016 03:44:15 +0000 (UTC) Received: (qmail 51025 invoked by uid 500); 16 Mar 2016 03:44:15 -0000 Delivered-To: apmail-bookkeeper-commits-archive@bookkeeper.apache.org Received: (qmail 50947 invoked by uid 500); 16 Mar 2016 03:44:15 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 49933 invoked by uid 99); 16 Mar 2016 03:44:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 03:44:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 58A7ADFCDF; Wed, 16 Mar 2016 03:44:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sijie@apache.org To: commits@bookkeeper.apache.org Date: Wed, 16 Mar 2016 03:44:28 -0000 Message-Id: <0522229c46de453bacd7535d691fe266@git.apache.org> In-Reply-To: <5d2e3d025bc84b8f8013a1ae9dbb9498@git.apache.org> References: <5d2e3d025bc84b8f8013a1ae9dbb9498@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java deleted file mode 100644 index 407769b..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java +++ /dev/null @@ -1,437 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.admin.console; - -import java.util.Map; -import java.util.List; -import java.util.LinkedList; -import java.util.LinkedHashMap; - -/** - * List all the available commands - */ -public final class HedwigCommands { - - static final String[] EMPTY_ARRAY = new String[0]; - - // - // List all commands used to play with hedwig - // - - /* PUB : publish a message to hedwig */ - static final String PUB = "pub"; - static final String PUB_DESC = "Publish a message to a topic in Hedwig"; - static final String[] PUB_USAGE = new String[] { - "usage: pub {topic} {message}", - "", - " {topic} : topic name.", - " any printable string without spaces.", - " {message} : message body.", - " remaining arguments are used as message body to publish.", - }; - - /* SUB : subscriber a topic in hedwig for a specified subscriber */ - static final String SUB = "sub"; - static final String SUB_DESC = "Subscribe a topic for a specified subscriber"; - static final String[] SUB_USAGE = new String[] { - "usage: sub {topic} {subscriber} [mode]", - "", - " {topic} : topic name.", - " any printable string without spaces.", - " {subscriber} : subscriber id.", - " any printable string without spaces.", - " [mode] : mode to create subscription.", - " [receive] : bool. whether to start delivery to receive messages.", - "", - " available modes: (default value is 1)", - " 0 = CREATE: create subscription.", - " if the subscription is exsited, it will fail.", - " 1 = ATTACH: attach to exsited subscription.", - " if the subscription is not existed, it will faile.", - " 2 = CREATE_OR_ATTACH:", - " attach to subscription, if not existed create one." - }; - - /* CLOSESUB : close the subscription of a subscriber for a topic */ - static final String CLOSESUB = "closesub"; - static final String CLOSESUB_DESC = "Close subscription of a subscriber to a specified topic"; - static final String[] CLOSESUB_USAGE = new String[] { - "usage: closesub {topic} {subscriber}", - "", - " {topic} : topic name.", - " any printable string without spaces.", - " {subscriber} : subscriber id.", - " any printable string without spaces.", - "", - " NOTE: this command just cleanup subscription states on client side.", - " You can try UNSUB to clean subscription states on server side.", - }; - - /* UNSUB: unsubscribe of a subscriber to a topic */ - static final String UNSUB = "unsub"; - static final String UNSUB_DESC = "Unsubscribe a topic for a subscriber"; - static final String[] UNSUB_USAGE = new String[] { - "usage: unsub {topic} {subscriber}", - "", - " {topic} : topic name.", - " any printable string without spaces.", - " {subscriber} : subscriber id.", - " any printable string without spaces.", - "", - " NOTE: this command will cleanup subscription states on server side.", - " You can try CLOSESUB to just clean subscription states on client side.", - }; - - static final String RMSUB = "rmsub"; - static final String RMSUB_DESC = "Remove subscriptions for topics"; - static final String[] RMSUB_USAGE = new String[] { - "usage: rmsub {topic_prefix} {start_topic} {end_topic} {subscriber_prefix} {start_sub} {end_sub}", - "", - " {topic_prefix} : topic prefix.", - " {start_topic} : start topic id.", - " {end_topic} : end topic id.", - " {subscriber_prefix} : subscriber prefix.", - " {start_sub} : start subscriber id.", - " {end_sub} : end subscriber id.", - }; - - /* CONSUME: move consume ptr of a subscription with specified steps */ - static final String CONSUME = "consume"; - static final String CONSUME_DESC = "Move consume ptr of a subscription with sepcified steps"; - static final String[] CONSUME_USAGE = new String[] { - "usage: consume {topic} {subscriber} {nmsgs}", - "", - " {topic} : topic name.", - " any printable string without spaces.", - " {subscriber} : subscriber id.", - " any printable string without spaces.", - " {nmsgs} : how many messages to move consume ptr.", - "", - " Example:", - " suppose, from zk we know subscriber B consumed topic T to message 10", - " [hedwig: (standalone) 1] consume T B 2", - " after executed above command, a consume(10+2) request will be sent to hedwig.", - "", - " NOTE:", - " since Hedwig updates subscription consume ptr lazily, so you need to know that", - " 1) the consumption ptr read from zookeeper may be stable; ", - " 2) after sent the consume request, hedwig may just move ptr in its memory and lazily update it to zookeeper. you may not see the ptr changed when DESCRIBE the topic.", - }; - - /* CONSUMETO: move consume ptr of a subscription to a specified pos */ - static final String CONSUMETO = "consumeto"; - static final String CONSUMETO_DESC = "Move consume ptr of a subscription to a specified message id"; - static final String[] CONSUMETO_USAGE = new String[] { - "usage: consumeto {topic} {subscriber} {msg_id}", - "", - " {topic} : topic name.", - " any printable string without spaces.", - " {subscriber} : subscriber id.", - " any printable string without spaces.", - " {msg_id} : message id that consume ptr will be moved to.", - " if the message id is less than current consume ptr,", - " hedwig will do nothing.", - "", - " Example:", - " suppose, from zk we know subscriber B consumed topic T to message 10", - " [hedwig: (standalone) 1] consumeto T B 12", - " after executed above command, a consume(12) request will be sent to hedwig.", - "", - " NOTE:", - " since Hedwig updates subscription consume ptr lazily, so you need to know that", - " 1) the consumption ptr read from zookeeper may be stable; ", - " 2) after sent the consume request, hedwig may just move ptr in its memory and lazily update it to zookeeper. you may not see the ptr changed when DESCRIBE the topic.", - }; - - /* PUBSUB: a healthy checking command to ensure cluster is running */ - static final String PUBSUB = "pubsub"; - static final String PUBSUB_DESC = "A healthy checking command to ensure hedwig is in running state"; - static final String[] PUBSUB_USAGE = new String[] { - "usage: pubsub {topic} {subscriber} {timeout_secs} {message}", - "", - " {topic} : topic name.", - " any printable string without spaces.", - " {subscriber} : subscriber id.", - " any printable string without spaces.", - " {timeout_secs} : how long will the subscriber wait for published message.", - " {message} : message body.", - " remaining arguments are used as message body to publish.", - "", - " Example:", - " [hedwig: (standalone) 1] pubsub TOPIC SUBID 10 TEST_MESSAGS", - "", - " 1) hw will subscribe topic TOPIC as subscriber SUBID;", - " 2) subscriber SUBID will wait a message until 10 seconds;", - " 3) hw publishes TEST_MESSAGES to topic TOPIC;", - " 4) if subscriber recevied message in 10 secs, it checked that whether the message is published message.", - " if true, it will return SUCCESS, otherwise return FAILED.", - }; - - // - // List all commands used to admin hedwig - // - - /* SHOW: list all available hub servers or topics */ - static final String SHOW = "show"; - static final String SHOW_DESC = "list all available hub servers or topics"; - static final String[] SHOW_USAGE = new String[] { - "usage: show [topics | hubs]", - "", - " show topics :", - " listing all available topics in hedwig.", - "", - " show hubs :", - " listing all available hubs in hedwig.", - "", - " NOTES:", - " 'show topics' will not works when there are millions of topics in hedwig, since we have packetLen limitation fetching data from zookeeper.", - }; - - static final String SHOW_TOPICS = "topics"; - static final String SHOW_HUBS = "hubs"; - - /* DESCRIBE: show the metadata of a topic */ - static final String DESCRIBE = "describe"; - static final String DESCRIBE_DESC = "show metadata of a topic, including topic owner, persistence info, subscriptions info"; - static final String[] DESCRIBE_USAGE = new String[] { - "usage: describe topic {topic}", - "", - " {topic} : topic name.", - " any printable string without spaces.", - "", - " Example: describe topic ttttt", - "", - " Output:", - " ===== Topic Information : ttttt =====", - "", - " Owner : 98.137.99.27:9875:9876", - "", - " >>> Persistence Info <<<", - " Ledger 54729 [ 1 ~ 59 ]", - " Ledger 54731 [ 60 ~ 60 ]", - " Ledger 54733 [ 61 ~ 61 ]", - "", - " >>> Subscription Info <<<", - " Subscriber mysub : consumeSeqId: local:50", - }; - - static final String DESCRIBE_TOPIC = "topic"; - - /* READTOPIC: read messages of a specified topic */ - static final String READTOPIC = "readtopic"; - static final String READTOPIC_DESC = "read messages of a specified topic"; - static final String[] READTOPIC_USAGE = new String[] { - "usage: readtopic {topic} [start_msg_id]", - "", - " {topic} : topic name.", - " any printable string without spaces.", - " [start_msg_id] : message id that start to read from.", - "", - " no start_msg_id provided:", - " it will start from least_consumed_message_id + 1.", - " least_consume_message_id is computed from all its subscribers.", - "", - " start_msg_id provided:", - " it will start from MAX(start_msg_id, least_consumed_message_id).", - "", - " MESSAGE FORMAT:", - "", - " ---------- MSGID=LOCAL(51) ----------", - " MsgId: LOCAL(51)", - " SrcRegion: standalone", - " Message:", - "", - " hello", - }; - - /* FORMAT: format metadata for Hedwig */ - static final String FORMAT = "format"; - static final String FORMAT_DESC = "format metadata for Hedwig"; - static final String[] FORMAT_USAGE = new String[] { - "usage: format [-force]", - "", - " [-force] : Format metadata for Hedwig w/o confirmation.", - }; - - - // - // List other useful commands - // - - /* SET: set whether printing zk watches or not */ - static final String SET = "set"; - static final String SET_DESC = "set whether printing zk watches or not"; - static final String[] SET_USAGE = EMPTY_ARRAY; - - /* HISTORY: list history commands */ - static final String HISTORY = "history"; - static final String HISTORY_DESC = "list history commands"; - static final String[] HISTORY_USAGE = EMPTY_ARRAY; - - /* REDO: redo previous command */ - static final String REDO = "redo"; - static final String REDO_DESC = "redo history command"; - static final String[] REDO_USAGE = new String[] { - "usage: redo [{cmdno} | !]", - "", - " {cmdno} : history command no.", - " ! : last command.", - }; - - /* HELP: print usage information of a specified command */ - static final String HELP = "help"; - static final String HELP_DESC = "print usage information of a specified command"; - static final String[] HELP_USAGE = new String[] { - "usage: help {command}", - "", - " {command} : command name", - }; - - static final String QUIT = "quit"; - static final String QUIT_DESC = "exit console"; - static final String[] QUIT_USAGE = EMPTY_ARRAY; - - static final String EXIT = "exit"; - static final String EXIT_DESC = QUIT_DESC; - static final String[] EXIT_USAGE = EMPTY_ARRAY; - - public static enum COMMAND { - - CMD_PUB (PUB, PUB_DESC, PUB_USAGE), - CMD_SUB (SUB, SUB_DESC, SUB_USAGE), - CMD_CLOSESUB (CLOSESUB, CLOSESUB_DESC, CLOSESUB_USAGE), - CMD_UNSUB (UNSUB, UNSUB_DESC, UNSUB_USAGE), - CMD_RMSUB (RMSUB, RMSUB_DESC, RMSUB_USAGE), - CMD_CONSUME (CONSUME, CONSUME_DESC, CONSUME_USAGE), - CMD_CONSUMETO (CONSUMETO, CONSUMETO_DESC, CONSUMETO_USAGE), - CMD_PUBSUB (PUBSUB, PUBSUB_DESC, PUBSUB_USAGE), - CMD_SHOW (SHOW, SHOW_DESC, SHOW_USAGE), - CMD_DESCRIBE (DESCRIBE, DESCRIBE_DESC, DESCRIBE_USAGE), - CMD_READTOPIC (READTOPIC, READTOPIC_DESC, READTOPIC_USAGE), - CMD_FORMAT (FORMAT, FORMAT_DESC, FORMAT_USAGE), - CMD_SET (SET, SET_DESC, SET_USAGE), - CMD_HISTORY (HISTORY, HISTORY_DESC, HISTORY_USAGE), - CMD_REDO (REDO, REDO_DESC, REDO_USAGE), - CMD_HELP (HELP, HELP_DESC, HELP_USAGE), - CMD_QUIT (QUIT, QUIT_DESC, QUIT_USAGE), - CMD_EXIT (EXIT, EXIT_DESC, EXIT_USAGE), - // sub commands - CMD_SHOW_TOPICS (SHOW_TOPICS, "", EMPTY_ARRAY), - CMD_SHOW_HUBS (SHOW_HUBS, "", EMPTY_ARRAY), - CMD_DESCRIBE_TOPIC (DESCRIBE_TOPIC, "", EMPTY_ARRAY); - - COMMAND(String name, String desc, String[] usage) { - this.name = name; - this.desc = desc; - this.usage = usage; - this.subCmds = new LinkedHashMap(); - } - - public String getName() { return name; } - - public String getDescription() { return desc; } - - public Map getSubCommands() { return subCmds; } - - public void addSubCommand(COMMAND c) { - this.subCmds.put(c.name, c); - }; - - public void printUsage() { - System.err.println(name + ": " + desc); - for(String line : usage) { - System.err.println(line); - } - System.err.println(); - } - - protected String name; - protected String desc; - protected String[] usage; - protected Map subCmds; - } - - static Map commands = null; - - private static void addCommand(COMMAND c) { - commands.put(c.getName(), c); - } - - static synchronized void init() { - if (commands != null) { - return; - } - commands = new LinkedHashMap(); - - addCommand(COMMAND.CMD_PUB); - addCommand(COMMAND.CMD_SUB); - addCommand(COMMAND.CMD_CLOSESUB); - addCommand(COMMAND.CMD_UNSUB); - addCommand(COMMAND.CMD_RMSUB); - addCommand(COMMAND.CMD_CONSUME); - addCommand(COMMAND.CMD_CONSUMETO); - addCommand(COMMAND.CMD_PUBSUB); - - // show - COMMAND.CMD_SHOW.addSubCommand(COMMAND.CMD_SHOW_TOPICS); - COMMAND.CMD_SHOW.addSubCommand(COMMAND.CMD_SHOW_HUBS); - addCommand(COMMAND.CMD_SHOW); - - // describe - COMMAND.CMD_DESCRIBE.addSubCommand(COMMAND.CMD_DESCRIBE_TOPIC); - addCommand(COMMAND.CMD_DESCRIBE); - - addCommand(COMMAND.CMD_READTOPIC); - addCommand(COMMAND.CMD_FORMAT); - addCommand(COMMAND.CMD_SET); - addCommand(COMMAND.CMD_HISTORY); - addCommand(COMMAND.CMD_REDO); - addCommand(COMMAND.CMD_HELP); - addCommand(COMMAND.CMD_QUIT); - addCommand(COMMAND.CMD_EXIT); - } - - public static Map getHedwigCommands() { - return commands; - } - - /** - * Find candidate commands by the specified token list - * - * @param token token list - * - * @return list of candidate commands - */ - public static List findCandidateCommands(String[] tokens) { - List cmds = new LinkedList(); - - Map cmdMap = commands; - for (int i=0; i<(tokens.length - 1); i++) { - COMMAND c = cmdMap.get(tokens[i]); - // no commands - if (c == null || c.getSubCommands().size() <= 0) { - return cmds; - } else { - cmdMap = c.getSubCommands(); - } - } - cmds.addAll(cmdMap.keySet()); - return cmds; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java deleted file mode 100644 index 3a5ef5a..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java +++ /dev/null @@ -1,1038 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.admin.console; - -import jline.ConsoleReader; -import jline.History; -import jline.Terminal; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.bookkeeper.util.MathUtils; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.hedwig.admin.HedwigAdmin; -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.topics.HubInfo; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.HedwigSocketAddress; -import org.apache.hedwig.util.SubscriptionListener; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; -import static com.google.common.base.Charsets.UTF_8; - -import static org.apache.hedwig.admin.console.HedwigCommands.*; -import static org.apache.hedwig.admin.console.HedwigCommands.COMMAND.*; - -/** - * Console Client to Hedwig - */ -public class HedwigConsole { - private static final Logger LOG = LoggerFactory.getLogger(HedwigConsole.class); - // NOTE: now it is fixed passwd in bookkeeper - static byte[] passwd = "sillysecret".getBytes(UTF_8); - - // history file name - static final String HW_HISTORY_FILE = ".hw_history"; - - static final char[] CONTINUE_OR_QUIT = new char[] { 'Q', 'q', '\n' }; - - protected MyCommandOptions cl = new MyCommandOptions(); - protected HashMap history = new LinkedHashMap(); - protected int commandCount = 0; - protected boolean printWatches = true; - protected Map myCommands; - - protected boolean inConsole = true; - protected ConsoleReader console = null; - - protected HedwigAdmin admin; - protected HedwigClient hubClient; - protected Publisher publisher; - protected Subscriber subscriber; - protected ConsoleMessageHandler consoleHandler = - new ConsoleMessageHandler(); - protected Terminal terminal; - - protected String myRegion; - - interface MyCommand { - boolean runCmd(String[] args) throws Exception; - } - - static class HelpCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - boolean printUsage = true; - if (args.length >= 2) { - String command = args[1]; - COMMAND c = getHedwigCommands().get(command); - if (c != null) { - c.printUsage(); - printUsage = false; - } - } - if (printUsage) { - usage(); - } - return true; - } - } - - class ExitCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - printMessage("Quitting ..."); - hubClient.close(); - admin.close(); - Runtime.getRuntime().exit(0); - return true; - } - } - - class RedoCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - if (args.length < 2) { - return false; - } - - int index; - if ("!".equals(args[1])) { - index = commandCount - 1; - } else { - index = Integer.decode(args[1]); - if (commandCount <= index) { - System.err.println("Command index out of range"); - return false; - } - } - cl.parseCommand(history.get(index)); - if (cl.getCommand().equals("redo")) { - System.err.println("No redoing redos"); - return false; - } - history.put(commandCount, history.get(index)); - processCmd(cl); - return true; - } - - } - - class HistoryCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - for (int i=commandCount - 10; i<=commandCount; ++i) { - if (i < 0) { - continue; - } - System.out.println(i + " - " + history.get(i)); - } - return true; - } - - } - - class SetCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - if (args.length < 3 || !"printwatches".equals(args[1])) { - return false; - } else if (args.length == 2) { - System.out.println("printwatches is " + (printWatches ? "on" : "off")); - } else { - printWatches = args[2].equals("on"); - } - return true; - } - - } - - class PubCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - if (args.length < 3) { - return false; - } - ByteString topic = ByteString.copyFromUtf8(args[1]); - - StringBuilder sb = new StringBuilder(); - for (int i=2; i callback, Object context) { - System.out.println("Received message from topic " + topic.toStringUtf8() + - " for subscriber " + subscriberId.toStringUtf8() + " : " - + msg.getBody().toStringUtf8()); - callback.operationFinished(context, null); - } - - } - - static class ConsoleSubscriptionListener implements SubscriptionListener { - - @Override - public void processEvent(ByteString t, ByteString s, SubscriptionEvent event) { - System.out.println("Subscription Channel for (topic:" + t.toStringUtf8() + ", subscriber:" - + s.toStringUtf8() + ") received event : " + event); - } - } - - class SubCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - CreateOrAttach mode; - boolean receive = true; - if (args.length < 3) { - return false; - } else if (args.length == 3) { - mode = CreateOrAttach.ATTACH; - receive = true; - } else { - try { - mode = CreateOrAttach.valueOf(Integer.parseInt(args[3])); - } catch (Exception e) { - System.err.println("Unknow mode : " + args[3]); - return false; - } - if (args.length >= 5) { - try { - receive = Boolean.parseBoolean(args[4]); - } catch (Exception e) { - receive = false; - } - } - } - if (mode == null) { - System.err.println("Unknow mode : " + args[3]); - return false; - } - ByteString topic = ByteString.copyFromUtf8(args[1]); - ByteString subId = ByteString.copyFromUtf8(args[2]); - try { - SubscriptionOptions options = - SubscriptionOptions.newBuilder().setCreateOrAttach(mode) - .setForceAttach(false).build(); - subscriber.subscribe(topic, subId, options); - if (receive) { - subscriber.startDelivery(topic, subId, consoleHandler); - System.out.println("SUB DONE AND RECEIVE"); - } else { - System.out.println("SUB DONE BUT NOT RECEIVE"); - } - } catch (Exception e) { - System.err.println("SUB FAILED"); - e.printStackTrace(); - } - return true; - } - } - - class UnsubCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - if (args.length < 3) { - return false; - } - ByteString topic = ByteString.copyFromUtf8(args[1]); - ByteString subId = ByteString.copyFromUtf8(args[2]); - try { - subscriber.stopDelivery(topic, subId); - subscriber.unsubscribe(topic, subId); - System.out.println("UNSUB DONE"); - } catch (Exception e) { - System.err.println("UNSUB FAILED"); - e.printStackTrace(); - } - return true; - } - - } - - class RmsubCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - if (args.length < 7) { - return false; - } - String topicPrefix = args[1]; - int startTopic = Integer.parseInt(args[2]); - int endTopic = Integer.parseInt(args[3]); - String subPrefix = args[4]; - int startSub = Integer.parseInt(args[5]); - int endSub = Integer.parseInt(args[6]); - if (startTopic > endTopic || endSub < startSub) { - return false; - } - for (int i=startTopic; i<=endTopic; i++) { - ByteString topic = ByteString.copyFromUtf8(topicPrefix + i); - try { - for (int j=startSub; j<=endSub; j++) { - ByteString sub = ByteString.copyFromUtf8(subPrefix + j); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, sub, opts); - subscriber.unsubscribe(topic, sub); - } - System.out.println("RMSUB " + topic.toStringUtf8() + " DONE"); - } catch (Exception e) { - System.err.println("RMSUB " + topic.toStringUtf8() + " FAILED"); - e.printStackTrace(); - } - } - return true; - } - - } - - class CloseSubscriptionCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - if (args.length < 3) { - return false; - } - ByteString topic = ByteString.copyFromUtf8(args[1]); - ByteString sudId = ByteString.copyFromUtf8(args[2]); - - try { - subscriber.stopDelivery(topic, sudId); - subscriber.closeSubscription(topic, sudId); - } catch (Exception e) { - System.err.println("CLOSESUB FAILED"); - } - return true; - } - - } - - class ConsumeToCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - if (args.length < 4) { - return false; - } - ByteString topic = ByteString.copyFromUtf8(args[1]); - ByteString subId = ByteString.copyFromUtf8(args[2]); - long msgId = Long.parseLong(args[3]); - MessageSeqId consumeId = MessageSeqId.newBuilder().setLocalComponent(msgId).build(); - try { - subscriber.consume(topic, subId, consumeId); - } catch (Exception e) { - System.err.println("CONSUMETO FAILED"); - } - return true; - } - - } - - class ConsumeCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - if (args.length < 4) { - return false; - } - long lastConsumedId = 0; - SubscriptionData subData = admin.getSubscription(ByteString.copyFromUtf8(args[1]), - ByteString.copyFromUtf8(args[2])); - if (null == subData) { - System.err.println("Failed to read subscription for topic: " + args[1] - + " subscriber: " + args[2]); - return true; - } - lastConsumedId = subData.getState().getMsgId().getLocalComponent(); - long numMessagesToConsume = Long.parseLong(args[3]); - long idToConsumed = lastConsumedId + numMessagesToConsume; - System.out.println("Try to move subscriber(" + args[2] + ") consume ptr of topic(" + args[1] - + ") from " + lastConsumedId + " to " + idToConsumed); - MessageSeqId consumeId = MessageSeqId.newBuilder().setLocalComponent(idToConsumed).build(); - ByteString topic = ByteString.copyFromUtf8(args[1]); - ByteString subId = ByteString.copyFromUtf8(args[2]); - try { - subscriber.consume(topic, subId, consumeId); - } catch (Exception e) { - System.err.println("CONSUME FAILED"); - } - return true; - } - - } - - class PubSubCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - if (args.length < 5) { - return false; - } - final long startTime = MathUtils.now(); - - final ByteString topic = ByteString.copyFromUtf8(args[1]); - final ByteString subId = ByteString.copyFromUtf8(args[2] + "-" + startTime); - int timeoutSecs = 60; - try { - timeoutSecs = Integer.parseInt(args[3]); - } catch (NumberFormatException nfe) { - } - - StringBuilder sb = new StringBuilder(); - for (int i=4; i callback, Object context) { - if (thisTopic.equals(topic) && subscriberId.equals(subId) && - msg.getBody().equals(message.getBody())) { - System.out.println("Received message : " + message.getBody().toStringUtf8()); - isDone.countDown(); - } - callback.operationFinished(context, null); - } - - }); - - // wait for the message - success = isDone.await(timeoutSecs, TimeUnit.SECONDS); - elapsedTime = MathUtils.now() - startTime; - } finally { - try { - if (subscribed) { - subscriber.stopDelivery(topic, subId); - subscriber.unsubscribe(topic, subId); - } - } finally { - if (success) { - System.out.println("PUBSUB SUCCESS. TIME: " + elapsedTime + " MS"); - } else { - System.out.println("PUBSUB FAILED. "); - } - return success; - } - } - } - - } - - class ReadTopicCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - if (args.length < 2) { - return false; - } - ReadTopic rt; - ByteString topic = ByteString.copyFromUtf8(args[1]); - if (args.length == 2) { - rt = new ReadTopic(admin, topic, inConsole); - } else { - rt = new ReadTopic(admin, topic, Long.parseLong(args[2]), inConsole); - } - rt.readTopic(); - return true; - } - - } - - class ShowCmd implements MyCommand { - - static final int MAX_TOPICS_PER_SHOW = 100; - - @Override - public boolean runCmd(String[] args) throws Exception { - if (args.length < 2) { - return false; - } - String errorMsg = null; - try { - if (HedwigCommands.SHOW_HUBS.equals(args[1])) { - errorMsg = "Unable to fetch the list of hub servers"; - showHubs(); - } else if (HedwigCommands.SHOW_TOPICS.equals(args[1])) { - errorMsg = "Unable to fetch the list of topics"; - showTopics(); - } else { - System.err.println("ERROR: Unknown show command '" + args[1] + "'"); - return false; - } - } catch (Exception e) { - if (null != errorMsg) { - System.err.println(errorMsg); - } - e.printStackTrace(); - } - return true; - } - - protected void showHubs() throws Exception { - Map hubs = admin.getAvailableHubs(); - System.out.println("Available Hub Servers:"); - for (Map.Entry entry : hubs.entrySet()) { - System.out.println("\t" + entry.getKey() + " :\t" + entry.getValue()); - } - } - - protected void showTopics() throws Exception { - List topics = new ArrayList(); - Iterator iter = admin.getTopics(); - - System.out.println("Topic List:"); - boolean stop = false; - while (iter.hasNext()) { - if (topics.size() >= MAX_TOPICS_PER_SHOW) { - System.out.println(topics); - topics.clear(); - stop = !continueOrQuit(); - if (stop) { - break; - } - } - ByteString t = iter.next(); - topics.add(t.toStringUtf8()); - } - if (!stop) { - System.out.println(topics); - } - } - - - - } - - class DescribeCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - if (args.length < 3) { - return false; - } - if (HedwigCommands.DESCRIBE_TOPIC.equals(args[1])) { - return describeTopic(args[2]); - } else { - return false; - } - } - - protected boolean describeTopic(String topic) throws Exception { - ByteString btopic = ByteString.copyFromUtf8(topic); - HubInfo owner = admin.getTopicOwner(btopic); - List ranges = admin.getTopicLedgers(btopic); - Map states = admin.getTopicSubscriptions(btopic); - - System.out.println("===== Topic Information : " + topic + " ====="); - System.out.println(); - System.out.println("Owner : " + (owner == null ? "NULL" : - owner.toString().trim().replaceAll("\n", ", "))); - System.out.println(); - - // print ledgers - printTopicLedgers(ranges); - // print subscriptions - printTopicSubscriptions(states); - - return true; - } - - private void printTopicLedgers(List ranges) { - System.out.println(">>> Persistence Info <<<"); - if (null == ranges) { - System.out.println("N/A"); - return; - } - if (ranges.isEmpty()) { - System.out.println("No Ledger used."); - return; - } - for (LedgerRange range : ranges) { - System.out.println("Ledger " + range.getLedgerId() + " [ " - + range.getStartSeqIdIncluded() + " ~ " - + range.getEndSeqIdIncluded().getLocalComponent() + " ]"); - } - System.out.println(); - } - - private void printTopicSubscriptions(Map states) { - System.out.println(">>> Subscription Info <<<"); - if (0 == states.size()) { - System.out.println("No subscriber."); - return; - } - for (Map.Entry entry : states.entrySet()) { - System.out.println("Subscriber " + entry.getKey().toStringUtf8() + " : " - + SubscriptionStateUtils.toString(entry.getValue())); - } - System.out.println(); - } - - } - - class FormatCmd implements MyCommand { - - @Override - public boolean runCmd(String[] args) throws Exception { - boolean force = false; - if (args.length >= 2 && "-force".equals(args[1])) { - force = true; - } - boolean doFormat = true; - System.out.println("You ask to format hedwig metadata stored in " - + admin.getMetadataManagerFactory().getClass().getName() + "."); - if (!force) { - doFormat = continueOrQuit(); - } - if (doFormat) { - admin.format(); - System.out.println("Formatted hedwig metadata successfully."); - } else { - System.out.println("Given up formatting hedwig metadata."); - } - return true; - } - - } - - protected Map buildMyCommands() { - Map cmds = - new HashMap(); - - ExitCmd exitCmd = new ExitCmd(); - cmds.put(EXIT, exitCmd); - cmds.put(QUIT, exitCmd); - cmds.put(HELP, new HelpCmd()); - cmds.put(HISTORY, new HistoryCmd()); - cmds.put(REDO, new RedoCmd()); - cmds.put(SET, new SetCmd()); - cmds.put(PUB, new PubCmd()); - cmds.put(SUB, new SubCmd()); - cmds.put(PUBSUB, new PubSubCmd()); - cmds.put(CLOSESUB, new CloseSubscriptionCmd()); - cmds.put(UNSUB, new UnsubCmd()); - cmds.put(RMSUB, new RmsubCmd()); - cmds.put(CONSUME, new ConsumeCmd()); - cmds.put(CONSUMETO, new ConsumeToCmd()); - cmds.put(SHOW, new ShowCmd()); - cmds.put(DESCRIBE, new DescribeCmd()); - cmds.put(READTOPIC, new ReadTopicCmd()); - cmds.put(FORMAT, new FormatCmd()); - - return cmds; - } - - static void usage() { - System.err.println("HedwigConsole [options] [command] [args]"); - System.err.println(); - System.err.println("Avaiable commands:"); - for (String cmd : getHedwigCommands().keySet()) { - System.err.println("\t" + cmd); - } - System.err.println(); - } - - /** - * A storage class for both command line options and shell commands. - */ - static private class MyCommandOptions { - - private Map options = new HashMap(); - private List cmdArgs = null; - private String command = null; - - public MyCommandOptions() { - } - - public String getOption(String opt) { - return options.get(opt); - } - - public String getCommand( ) { - return command; - } - - public String getCmdArgument( int index ) { - return cmdArgs.get(index); - } - - public int getNumArguments( ) { - return cmdArgs.size(); - } - - public String[] getArgArray() { - return cmdArgs.toArray(new String[0]); - } - - /** - * Parses a command line that may contain one or more flags - * before an optional command string - * @param args command line arguments - * @return true if parsing succeeded, false otherwise. - */ - public boolean parseOptions(String[] args) { - List argList = Arrays.asList(args); - Iterator it = argList.iterator(); - - while (it.hasNext()) { - String opt = it.next(); - if (!opt.startsWith("-")) { - command = opt; - cmdArgs = new ArrayList( ); - cmdArgs.add( command ); - while (it.hasNext()) { - cmdArgs.add(it.next()); - } - return true; - } else { - try { - options.put(opt.substring(1), it.next()); - } catch (NoSuchElementException e) { - System.err.println("Error: no argument found for option " - + opt); - return false; - } - } - } - return true; - } - - /** - * Breaks a string into command + arguments. - * @param cmdstring string of form "cmd arg1 arg2..etc" - * @return true if parsing succeeded. - */ - public boolean parseCommand( String cmdstring ) { - String[] args = cmdstring.split(" "); - if (args.length == 0){ - return false; - } - command = args[0]; - cmdArgs = Arrays.asList(args); - return true; - } - } - - private class MyWatcher implements Watcher { - public void process(WatchedEvent event) { - if (getPrintWatches()) { - printMessage("WATCHER::"); - printMessage(event.toString()); - } - } - } - - public void printMessage(String msg) { - if (inConsole) { - System.out.println("\n"+msg); - } - } - - /** - * Hedwig Console - * - * @param args arguments - * @throws IOException - * @throws InterruptedException - */ - public HedwigConsole(String[] args) throws IOException, InterruptedException { - // Setup Terminal - terminal = Terminal.setupTerminal(); - HedwigCommands.init(); - cl.parseOptions(args); - - if (cl.getCommand() == null) { - inConsole = true; - } else { - inConsole = false; - } - - org.apache.bookkeeper.conf.ClientConfiguration bkClientConf = - new org.apache.bookkeeper.conf.ClientConfiguration(); - ServerConfiguration hubServerConf = new ServerConfiguration(); - String serverCfgFile = cl.getOption("server-cfg"); - if (serverCfgFile != null) { - try { - hubServerConf.loadConf(new File(serverCfgFile).toURI().toURL()); - } catch (ConfigurationException e) { - throw new IOException(e); - } - try { - bkClientConf.loadConf(new File(serverCfgFile).toURI().toURL()); - } catch (ConfigurationException e) { - throw new IOException(e); - } - } - - ClientConfiguration hubClientCfg = new ClientConfiguration(); - String clientCfgFile = cl.getOption("client-cfg"); - if (clientCfgFile != null) { - try { - hubClientCfg.loadConf(new File(clientCfgFile).toURI().toURL()); - } catch (ConfigurationException e) { - throw new IOException(e); - } - } - - printMessage("Connecting to zookeeper/bookkeeper using HedwigAdmin"); - try { - admin = new HedwigAdmin(bkClientConf, hubServerConf); - admin.getZkHandle().register(new MyWatcher()); - } catch (Exception e) { - throw new IOException(e); - } - - printMessage("Connecting to default hub server " + hubClientCfg.getDefaultServerHost()); - hubClient = new HedwigClient(hubClientCfg); - publisher = hubClient.getPublisher(); - subscriber = hubClient.getSubscriber(); - subscriber.addSubscriptionListener(new ConsoleSubscriptionListener()); - - // other parameters - myRegion = hubServerConf.getMyRegion(); - } - - public boolean getPrintWatches() { - return printWatches; - } - - protected String getPrompt() { - StringBuilder sb = new StringBuilder(); - sb.append("[hedwig: (").append(myRegion).append(") ").append(commandCount).append("] "); - return sb.toString(); - } - - protected boolean continueOrQuit() throws IOException { - System.out.println("Press to continue, or Q to cancel ..."); - int ch; - if (null != console) { - ch = console.readCharacter(CONTINUE_OR_QUIT); - } else { - do { - ch = terminal.readCharacter(System.in); - } while (ch != 'q' && ch != 'Q' && ch != '\n'); - } - if (ch == 'q' || - ch == 'Q') { - return false; - } - return true; - } - - protected void addToHistory(int i, String cmd) { - history.put(i, cmd); - } - - public void executeLine(String line) { - if (!line.equals("")) { - cl.parseCommand(line); - addToHistory(commandCount, line); - processCmd(cl); - commandCount++; - } - } - - protected boolean processCmd(MyCommandOptions co) { - String[] args = co.getArgArray(); - String cmd = co.getCommand(); - if (args.length < 1) { - usage(); - return false; - } - if (!getHedwigCommands().containsKey(cmd)) { - usage(); - return false; - } - - LOG.debug("Processing {}", cmd); - - MyCommand myCommand = myCommands.get(cmd); - if (myCommand == null) { - System.err.println("No Command Processor found for command " + cmd); - usage(); - return false; - } - - long startTime = MathUtils.now(); - boolean success = false; - try { - success = myCommand.runCmd(args); - } catch (Exception e) { - e.printStackTrace(); - success = false; - } - long elapsedTime = MathUtils.now() - startTime; - if (inConsole) { - if (success) { - System.out.println("Finished " + ((double)elapsedTime / 1000) + " s."); - } else { - COMMAND c = getHedwigCommands().get(cmd); - if (c != null) { - c.printUsage(); - } - } - } - return success; - } - - @SuppressWarnings("unchecked") - void run() throws IOException { - inConsole = true; - myCommands = buildMyCommands(); - if (cl.getCommand() == null) { - System.out.println("Welcome to Hedwig!"); - System.out.println("JLine support is enabled"); - - console = new ConsoleReader(); - JLineHedwigCompletor completor = new JLineHedwigCompletor(admin); - console.addCompletor(completor); - - // load history file - History history = new History(); - File file = new File(System.getProperty("hw.history", - new File(System.getProperty("user.home"), HW_HISTORY_FILE).toString())); - if (LOG.isDebugEnabled()) { - LOG.debug("History file is " + file.toString()); - } - history.setHistoryFile(file); - // set history to console reader - console.setHistory(history); - // load history from history file - history.moveToFirstEntry(); - - while (history.next()) { - String entry = history.current(); - if (!entry.equals("")) { - addToHistory(commandCount, entry); - } - commandCount++; - } - System.out.println("JLine history support is enabled"); - - String line; - while ((line = console.readLine(getPrompt())) != null) { - executeLine(line); - history.addToHistory(line); - } - } - - inConsole = false; - processCmd(cl); - try { - myCommands.get(EXIT).runCmd(new String[0]); - } catch (Exception e) { - } - } - - public static void main(String[] args) throws IOException, InterruptedException { - HedwigConsole console = new HedwigConsole(args); - console.run(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java deleted file mode 100644 index cdc0c33..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.admin.console; - -import java.util.Iterator; -import java.util.List; - -import org.apache.zookeeper.KeeperException; -import org.apache.hedwig.admin.HedwigAdmin; - -import com.google.protobuf.ByteString; - -import jline.Completor; - -import static org.apache.hedwig.admin.console.HedwigCommands.*; - -/** - * A jline completor for hedwig console - */ -public class JLineHedwigCompletor implements Completor { - // for topic completion - static final int MAX_TOPICS_TO_SEARCH = 1000; - - private HedwigAdmin admin; - - public JLineHedwigCompletor(HedwigAdmin admin) { - this.admin = admin; - } - - @Override - public int complete(String buffer, int cursor, List candidates) { - // Guarantee that the final token is the one we're expanding - buffer = buffer.substring(0,cursor); - String[] tokens = buffer.split(" "); - if (buffer.endsWith(" ")) { - String[] newTokens = new String[tokens.length + 1]; - System.arraycopy(tokens, 0, newTokens, 0, tokens.length); - newTokens[newTokens.length - 1] = ""; - tokens = newTokens; - } - - if (tokens.length > 2 && - DESCRIBE.equalsIgnoreCase(tokens[0]) && - DESCRIBE_TOPIC.equalsIgnoreCase(tokens[1])) { - return completeTopic(buffer, tokens[2], candidates); - } else if (tokens.length > 1 && - (SUB.equalsIgnoreCase(tokens[0]) || - PUB.equalsIgnoreCase(tokens[0]) || - CLOSESUB.equalsIgnoreCase(tokens[0]) || - CONSUME.equalsIgnoreCase(tokens[0]) || - CONSUMETO.equalsIgnoreCase(tokens[0]) || - READTOPIC.equalsIgnoreCase(tokens[0]))) { - return completeTopic(buffer, tokens[1], candidates); - } - List cmds = HedwigCommands.findCandidateCommands(tokens); - return completeCommand(buffer, tokens[tokens.length - 1], cmds, candidates); - } - - @SuppressWarnings("unchecked") - private int completeCommand(String buffer, String token, - List commands, List candidates) { - for (Object cmdo : commands) { - assert (cmdo instanceof String); - if (((String)cmdo).startsWith(token)) { - candidates.add(cmdo); - } - } - return buffer.lastIndexOf(" ") + 1; - } - - @SuppressWarnings("unchecked") - private int completeTopic(String buffer, String token, List candidates) { - try { - Iterator children = admin.getTopics(); - int i = 0; - while (children.hasNext() && i <= MAX_TOPICS_TO_SEARCH) { - String child = children.next().toStringUtf8(); - if (child.startsWith(token)) { - candidates.add(child); - } - ++i; - } - } catch (Exception e) { - return buffer.length(); - } - return candidates.size() == 0 ? buffer.length() : buffer.lastIndexOf(" ") + 1; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java deleted file mode 100644 index edad190..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java +++ /dev/null @@ -1,332 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.admin.console; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.BookKeeper.DigestType; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.hedwig.admin.HedwigAdmin; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; - -import static com.google.common.base.Charsets.UTF_8; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; - -/** - * A tool to read topic messages. - * - * This tool : - * 1) read persistence info from zookeeper: ledger ranges - * 2) read subscription infor from zookeeper: we can know the least message id (ledger id) - * 3) use bk client to read message starting from least message id - */ -public class ReadTopic { - - final HedwigAdmin admin; - final ByteString topic; - long startSeqId; - long leastConsumedSeqId = Long.MAX_VALUE; - final boolean inConsole; - - static final int RC_OK = 0; - static final int RC_ERROR = -1; - static final int RC_NOTOPIC = -2; - static final int RC_NOLEDGERS = -3; - static final int RC_NOSUBSCRIBERS = -4; - - static final int NUM_MESSAGES_TO_PRINT = 15; - - List ledgers = new ArrayList(); - - /** - * Constructor - */ - public ReadTopic(HedwigAdmin admin, ByteString topic, boolean inConsole) { - this(admin, topic, 1, inConsole); - } - - /** - * Constructor - */ - public ReadTopic(HedwigAdmin admin, ByteString topic, long msgSeqId, boolean inConsole) { - this.admin = admin; - this.topic = topic; - this.startSeqId = msgSeqId; - this.inConsole = inConsole; - } - - /** - * Check whether the topic existed or not - * - * @return RC_OK if topic is existed; RC_NOTOPIC if not. - * @throws Exception - */ - protected int checkTopic() throws Exception { - return admin.hasTopic(topic) ? RC_OK : RC_NOTOPIC; - } - - /** - * Get the ledgers used by this topic to store messages - * - * @return RC_OK if topic has messages; RC_NOLEDGERS if not. - * @throws Exception - */ - protected int getTopicLedgers() throws Exception { - List ranges = admin.getTopicLedgers(topic); - if (null == ranges || ranges.isEmpty()) { - return RC_NOLEDGERS; - } - ledgers.addAll(ranges); - return RC_OK; - } - - protected int getLeastSubscription() throws Exception { - Map states = admin.getTopicSubscriptions(topic); - if (states.isEmpty()) { - return RC_NOSUBSCRIBERS; - } - for (Map.Entry entry : states.entrySet()) { - SubscriptionData state = entry.getValue(); - long localMsgId = state.getState().getMsgId().getLocalComponent(); - if (localMsgId < leastConsumedSeqId) { - leastConsumedSeqId = localMsgId; - } - } - if (leastConsumedSeqId == Long.MAX_VALUE) { - leastConsumedSeqId = 0; - } - return RC_OK; - } - - public void readTopic() { - try { - int rc = _readTopic(); - switch (rc) { - case RC_NOTOPIC: - System.err.println("No topic " + topic + " found."); - break; - case RC_NOLEDGERS: - System.err.println("No message is published to topic " + topic); - break; - default: - break; - } - } catch (Exception e) { - System.err.println("ERROR: read messages of topic " + topic + " failed."); - e.printStackTrace(); - } - } - - protected int _readTopic() throws Exception { - int rc; - // check topic - rc = checkTopic(); - if (RC_OK != rc) { - return rc; - } - // get topic ledgers - rc = getTopicLedgers(); - if (RC_OK != rc) { - return rc; - } - // get topic subscription to find the least one - rc = getLeastSubscription(); - if (RC_NOSUBSCRIBERS == rc) { - startSeqId = 1; - } else if (RC_OK == rc) { - if (leastConsumedSeqId > startSeqId) { - startSeqId = leastConsumedSeqId + 1; - } - } else { - return rc; - } - - for (LedgerRange range : ledgers) { - long endSeqId = range.getEndSeqIdIncluded().getLocalComponent(); - if (endSeqId < startSeqId) { - continue; - } - boolean toContinue = readLedger(range); - startSeqId = endSeqId + 1; - if (!toContinue) { - break; - } - } - - return RC_OK; - } - - /** - * Read a specific ledger - * - * @param ledger in memory ledger range - * @param endSeqId end seq id - * @return true if continue, otherwise false - * @throws BKException - * @throws IOException - * @throws InterruptedException - */ - protected boolean readLedger(LedgerRange ledger) - throws BKException, IOException, InterruptedException { - long tEndSeqId = ledger.getEndSeqIdIncluded().getLocalComponent(); - - if (tEndSeqId < this.startSeqId) { - return true; - } - // Open Ledger Handle - long ledgerId = ledger.getLedgerId(); - System.out.println("\n>>>>> " + ledger + " <<<<<\n"); - LedgerHandle lh = null; - try { - lh = admin.getBkHandle().openLedgerNoRecovery(ledgerId, admin.getBkDigestType(), admin.getBkPasswd()); - } catch (BKException e) { - System.err.println("ERROR: No ledger " + ledgerId + " found. maybe garbage collected due to the messages are consumed."); - } - if (null == lh) { - return true; - } - long expectedEntryId = startSeqId - ledger.getStartSeqIdIncluded(); - - long correctedEndSeqId = tEndSeqId; - try { - while (startSeqId <= tEndSeqId) { - correctedEndSeqId = Math.min(startSeqId + NUM_MESSAGES_TO_PRINT - 1, tEndSeqId); - - try { - Enumeration seq = - lh.readEntries(startSeqId - ledger.getStartSeqIdIncluded(), - correctedEndSeqId - ledger.getStartSeqIdIncluded()); - LedgerEntry entry = null; - while (seq.hasMoreElements()) { - entry = seq.nextElement(); - Message message; - try { - message = Message.parseFrom(entry.getEntryInputStream()); - } catch (IOException e) { - System.out.println("WARN: Unreadable message found\n"); - expectedEntryId++; - continue; - } - if (expectedEntryId != entry.getEntryId() - || (message.getMsgId().getLocalComponent() - ledger.getStartSeqIdIncluded()) != expectedEntryId) { - throw new IOException("ERROR: Message ids are out of order : expected entry id " + expectedEntryId - + ", current entry id " + entry.getEntryId() + ", msg seq id " + message.getMsgId().getLocalComponent()); - } - expectedEntryId++; - formatMessage(message); - - } - startSeqId = correctedEndSeqId + 1; - if (inConsole) { - if (!pressKeyToContinue()) { - return false; - } - } - } catch (BKException.BKReadException be) { - throw be; - } - } - } catch (BKException bke) { - if (tEndSeqId != Long.MAX_VALUE) { - System.err.println("ERROR: ledger " + ledgerId + " may be corrupted, since read messages [" - + startSeqId + " ~ " + correctedEndSeqId + " ] failed :"); - throw bke; - } - } - System.out.println("\n"); - return true; - } - - protected void formatMessage(Message message) { - // print msg id - String msgId; - if (!message.hasMsgId()) { - msgId = "N/A"; - } else { - MessageSeqId seqId = message.getMsgId(); - StringBuilder idBuilder = new StringBuilder(); - if (seqId.hasLocalComponent()) { - idBuilder.append("LOCAL(").append(seqId.getLocalComponent()).append(")"); - } else { - List remoteIds = seqId.getRemoteComponentsList(); - int i = 0, numRegions = remoteIds.size(); - idBuilder.append("REMOTE("); - for (RegionSpecificSeqId rssid : remoteIds) { - idBuilder.append(rssid.getRegion().toStringUtf8()); - idBuilder.append("["); - idBuilder.append(rssid.getSeqId()); - idBuilder.append("]"); - ++i; - if (i < numRegions) { - idBuilder.append(","); - } - } - idBuilder.append(")"); - } - msgId = idBuilder.toString(); - } - System.out.println("---------- MSGID=" + msgId + " ----------"); - System.out.println("MsgId: " + msgId); - // print source region - if (message.hasSrcRegion()) { - System.out.println("SrcRegion: " + message.getSrcRegion().toStringUtf8()); - } else { - System.out.println("SrcRegion: N/A"); - } - // print message body - System.out.println("Message:"); - System.out.println(); - if (message.hasBody()) { - System.out.println(message.getBody().toStringUtf8()); - } else { - System.out.println("N/A"); - } - System.out.println(); - } - - boolean pressKeyToContinue() throws IOException { - System.out.println("Press Y to continue..."); - BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in, UTF_8)); - int ch = stdin.read(); - if (ch == 'y' || - ch == 'Y') { - return true; - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java b/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java deleted file mode 100644 index 6ac6879..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.hedwig.data; - -import java.io.IOException; -import java.util.List; - -import org.apache.bookkeeper.util.EntryFormatter; -import org.apache.commons.configuration.Configuration; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Format a pub sub message into a readable format. - */ -public class MessageFormatter extends EntryFormatter { - private static final Logger logger = LoggerFactory.getLogger(MessageFormatter.class); - - static final String MESSAGE_PAYLOAD_FORMATTER_CLASS = "message_payload_formatter_class"; - - EntryFormatter dataFormatter = EntryFormatter.STRING_FORMATTER; - - @Override - public void setConf(Configuration conf) { - super.setConf(conf); - dataFormatter = EntryFormatter.newEntryFormatter(conf, MESSAGE_PAYLOAD_FORMATTER_CLASS); - } - - @Override - public void formatEntry(java.io.InputStream input) { - Message message; - try { - message = Message.parseFrom(input); - } catch (IOException e) { - System.out.println("WARN: Unreadable message found\n"); - EntryFormatter.STRING_FORMATTER.formatEntry(input); - return; - } - formatMessage(message); - } - - @Override - public void formatEntry(byte[] data) { - Message message; - try { - message = Message.parseFrom(data); - } catch (IOException e) { - System.out.println("WARN: Unreadable message found\n"); - EntryFormatter.STRING_FORMATTER.formatEntry(data); - return; - } - formatMessage(message); - } - - void formatMessage(Message message) { - // print msg id - String msgId; - if (!message.hasMsgId()) { - msgId = "N/A"; - } else { - MessageSeqId seqId = message.getMsgId(); - StringBuilder idBuilder = new StringBuilder(); - if (seqId.hasLocalComponent()) { - idBuilder.append("LOCAL(").append(seqId.getLocalComponent()).append(")"); - } else { - List remoteIds = seqId.getRemoteComponentsList(); - int i = 0, numRegions = remoteIds.size(); - idBuilder.append("REMOTE("); - for (RegionSpecificSeqId rssid : remoteIds) { - idBuilder.append(rssid.getRegion().toStringUtf8()); - idBuilder.append("["); - idBuilder.append(rssid.getSeqId()); - idBuilder.append("]"); - ++i; - if (i < numRegions) { - idBuilder.append(","); - } - } - idBuilder.append(")"); - } - msgId = idBuilder.toString(); - } - System.out.println("****** MSGID=" + msgId + " ******"); - System.out.println("MessageId: " + msgId); - // print source region - if (message.hasSrcRegion()) { - System.out.println("SrcRegion: " + message.getSrcRegion().toStringUtf8()); - } else { - System.out.println("SrcRegion: N/A"); - } - // print message body - if (message.hasBody()) { - System.out.println("Body:"); - dataFormatter.formatEntry(message.getBody().toByteArray()); - } else { - System.out.println("Body: N/A"); - } - System.out.println(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java deleted file mode 100644 index 21687eb..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.benchmark; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.bookkeeper.util.MathUtils; -import org.apache.hedwig.util.ConcurrencyUtils; - -public abstract class AbstractBenchmark { - - private static final Logger logger = LoggerFactory.getLogger(AbstractBenchmark.class); - - AtomicLong totalLatency = new AtomicLong(); - LinkedBlockingQueue doneSignalQueue = new LinkedBlockingQueue(); - - abstract void doOps(int numOps) throws Exception; - abstract void tearDown() throws Exception; - - protected class AbstractCallback { - AtomicInteger numDone = new AtomicInteger(0); - Semaphore outstanding; - int numOps; - boolean logging; - - public AbstractCallback(Semaphore outstanding, int numOps) { - this.outstanding = outstanding; - this.numOps = numOps; - logging = Boolean.getBoolean("progress"); - } - - public void handle(boolean success, Object ctx) { - outstanding.release(); - - if (!success) { - ConcurrencyUtils.put(doneSignalQueue, false); - return; - } - - totalLatency.addAndGet(MathUtils.now() - (Long)ctx); - int numDoneInt = numDone.incrementAndGet(); - - if (logging && numDoneInt % 10000 == 0) { - logger.info("Finished " + numDoneInt + " ops"); - } - - if (numOps == numDoneInt) { - ConcurrencyUtils.put(doneSignalQueue, true); - } - } - } - - public void runPhase(String phase, int numOps) throws Exception { - long startTime = MathUtils.now(); - - doOps(numOps); - - if (!doneSignalQueue.take()) { - logger.error("One or more operations failed in phase: " + phase); - throw new RuntimeException(); - } else { - logger.info("Phase: " + phase + " Avg latency : " + totalLatency.get() / numOps + ", tput = " + (numOps * 1000/ (MathUtils.now() - startTime))); - } - } - - - - - - public void run() throws Exception { - - int numWarmup = Integer.getInteger("nWarmup", 50000); - runPhase("warmup", numWarmup); - - logger.info("Sleeping for 10 seconds"); - Thread.sleep(10000); - //reset latency - totalLatency.set(0); - - int numOps = Integer.getInteger("nOps", 400000); - runPhase("real", numOps); - - tearDown(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java deleted file mode 100644 index d58883d..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.benchmark; - -import java.nio.ByteBuffer; -import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.proto.BookieClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; -import org.apache.bookkeeper.util.MathUtils; -import org.apache.bookkeeper.util.OrderedSafeExecutor; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; - -public class BookieBenchmark extends AbstractBenchmark { - - private static final Logger logger = LoggerFactory.getLogger(BookkeeperBenchmark.class); - - BookieClient bkc; - BookieSocketAddress addr; - ClientSocketChannelFactory channelFactory; - OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder() - .name("BookieBenchmarkScheduler") - .numThreads(1) - .build(); - - public BookieBenchmark(String bookieHostPort) throws Exception { - channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); - bkc = new BookieClient(new ClientConfiguration(), channelFactory, executor); - String[] hostPort = bookieHostPort.split(":"); - addr = new BookieSocketAddress(hostPort[0], Integer.parseInt(hostPort[1])); - } - - - @Override - void doOps(final int numOps) throws Exception { - int numOutstanding = Integer.getInteger("nPars",1000); - final Semaphore outstanding = new Semaphore(numOutstanding); - - - WriteCallback callback = new WriteCallback() { - AbstractCallback handler = new AbstractCallback(outstanding, numOps); - - @Override - public void writeComplete(int rc, long ledgerId, long entryId, - BookieSocketAddress addr, Object ctx) { - handler.handle(rc == BKException.Code.OK, ctx); - } - }; - - byte[] passwd = new byte[20]; - int size = Integer.getInteger("size", 1024); - byte[] data = new byte[size]; - - for (int i=0; i