activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5740
Date Fri, 19 Jun 2015 21:56:46 GMT
Repository: activemq
Updated Branches:
  refs/heads/master d919db5e3 -> c8b604314


https://issues.apache.org/jira/browse/AMQ-5740

added reset option to clear destination statistics 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c8b60431
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c8b60431
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c8b60431

Branch: refs/heads/master
Commit: c8b604314ac78498584edc861ca8f5e49820656a
Parents: d919db5
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Jun 19 17:56:22 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Jun 19 17:56:22 2015 -0400

----------------------------------------------------------------------
 .../activemq/console/command/PurgeCommand.java  |  99 +++++-----
 .../activemq/console/PurgeCommandTest.java      | 188 +++++++++++++++++++
 2 files changed, 240 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c8b60431/activemq-console/src/main/java/org/apache/activemq/console/command/PurgeCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/PurgeCommand.java
b/activemq-console/src/main/java/org/apache/activemq/console/command/PurgeCommand.java
index 803c30c..0b58c59 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/PurgeCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/PurgeCommand.java
@@ -16,48 +16,41 @@
  */
 package org.apache.activemq.console.command;
 
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.StringTokenizer;
 
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.management.MBeanServerConnection;
 import javax.management.MBeanServerInvocationHandler;
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.remote.JMXConnector;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.console.util.AmqMessagesUtil;
 import org.apache.activemq.console.util.JmxMBeansUtil;
 
 public class PurgeCommand extends AbstractJmxCommand {
 
     protected String[] helpFile = new String[] {
         "Task Usage: Main purge [browse-options] <destinations>",
-        "Description: Delete selected destination's messages that matches the message selector.",

-        "", 
+        "Description: Delete selected destination's messages that matches the message selector.",
+        "",
         "Purge Options:",
         "    --msgsel <msgsel1,msglsel2>   Add to the search list messages matched
by the query similar to",
         "                                  the messages selector format.",
+        "    --reset                       After the purge operation, reset the destination
statistics.",
         "    --jmxurl <url>                Set the JMX URL to connect to.",
-        "    --pid <pid>                   Set the pid to connect to (only on Sun JVM).",
           
+        "    --pid <pid>                   Set the pid to connect to (only on Sun JVM).",
         "    --jmxuser <user>              Set the JMX user used for authenticating.",
         "    --jmxpassword <password>      Set the JMX password used for authenticating.",
         "    --jmxlocal                    Use the local JMX server instead of a remote one.",
         "    --version                     Display the version information.",
-        "    -h,-?,--help                  Display the browse broker help information.",

-        "", 
+        "    -h,-?,--help                  Display the browse broker help information.",
+        "",
         "Examples:",
-        "    Main purge FOO.BAR", 
+        "    Main purge FOO.BAR",
         "        - Delete all the messages in queue FOO.BAR",
 
-        "    Main purge --msgsel \"JMSMessageID='*:10',JMSPriority>5\" FOO.*", 
+        "    Main purge --msgsel \"JMSMessageID='*:10',JMSPriority>5\" FOO.*",
         "        - Delete all the messages in the destinations that matches FOO.* and has
a JMSMessageID in",
         "          the header field that matches the wildcard *:10, and has a JMSPriority
field > 5 in the",
         "          queue FOO.BAR.",
@@ -69,6 +62,7 @@ public class PurgeCommand extends AbstractJmxCommand {
 
     private final List<String> queryAddObjects = new ArrayList<String>(10);
     private final List<String> querySubObjects = new ArrayList<String>(10);
+    private boolean resetStatistics;
 
     @Override
     public String getName() {
@@ -83,10 +77,11 @@ public class PurgeCommand extends AbstractJmxCommand {
     /**
      * Execute the purge command, which allows you to purge the messages in a
      * given JMS destination
-     * 
+     *
      * @param tokens - command arguments
      * @throws Exception
      */
+    @Override
     protected void runTask(List<String> tokens) throws Exception {
         try {
             // If there is no queue name specified, let's select all
@@ -103,36 +98,40 @@ public class PurgeCommand extends AbstractJmxCommand {
                     if (queryAddObjects.isEmpty()) {
                         purgeQueue(queueName);
                     } else {
-                        
-                    	QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler.
-                    			newProxyInstance(createJmxConnection(), 
-                    					queueName, 
-                    					QueueViewMBean.class, 
-                    					true);
+
+                        QueueViewMBean proxy = MBeanServerInvocationHandler.
+                                newProxyInstance(createJmxConnection(),
+                                        queueName,
+                                        QueueViewMBean.class,
+                                        true);
                         int removed = 0;
-                        
-                        // AMQ-3404: We support two syntaxes for the message 
+
+                        // AMQ-3404: We support two syntaxes for the message
                         // selector query:
-                        // 1) AMQ specific: 
+                        // 1) AMQ specific:
                         //    "JMSPriority>2,MyHeader='Foo'"
                         //
                         // 2) SQL-92 syntax:
                         //    "(JMSPriority>2) AND (MyHeader='Foo')"
                         //
                         // If syntax style 1) is used, the comma separated
-                        // criterias are broken into List<String> elements. 
-                        // We then need to construct the SQL-92 query out of 
+                        // criterias are broken into List<String> elements.
+                        // We then need to construct the SQL-92 query out of
                         // this list.
-                        
+
                         String sqlQuery = null;
                         if (queryAddObjects.size() > 1) {
-                        	 sqlQuery = convertToSQL92(queryAddObjects);
+                             sqlQuery = convertToSQL92(queryAddObjects);
                         } else {
-                        	sqlQuery = queryAddObjects.get(0);
+                            sqlQuery = queryAddObjects.get(0);
                         }
                         removed = proxy.removeMatchingMessages(sqlQuery);
                         context.printInfo("Removed: " + removed
                                 + " messages for message selector " + sqlQuery.toString());
+
+                        if (resetStatistics) {
+                            proxy.resetStatistics();
+                        }
                     }
                 }
             }
@@ -141,38 +140,42 @@ public class PurgeCommand extends AbstractJmxCommand {
             throw new Exception(e);
         }
     }
-    
-    
+
+
     /**
      * Purge all the messages in the queue
-     * 
+     *
      * @param queue - ObjectName of the queue to purge
      * @throws Exception
      */
     public void purgeQueue(ObjectName queue) throws Exception {
         context.printInfo("Purging all messages in queue: " + queue.getKeyProperty("destinationName"));
         createJmxConnection().invoke(queue, "purge", new Object[] {}, new String[] {});
+        if (resetStatistics) {
+            createJmxConnection().invoke(queue, "resetStatistics", new Object[] {}, new String[]
{});
+        }
     }
 
     /**
      * Handle the --msgsel, --xmsgsel.
-     * 
+     *
      * @param token - option token to handle
      * @param tokens - succeeding command arguments
      * @throws Exception
      */
+    @Override
     protected void handleOption(String token, List<String> tokens) throws Exception
{
         // If token is an additive message selector option
         if (token.startsWith("--msgsel")) {
 
             // If no message selector is specified, or next token is a new
             // option
-            if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
+            if (tokens.isEmpty() || tokens.get(0).startsWith("-")) {
                 context.printException(new IllegalArgumentException("Message selector not
specified"));
                 return;
             }
 
-            StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
+            StringTokenizer queryTokens = new StringTokenizer(tokens.remove(0), COMMAND_OPTION_DELIMETER);
             while (queryTokens.hasMoreTokens()) {
                 queryAddObjects.add(queryTokens.nextToken());
             }
@@ -181,35 +184,36 @@ public class PurgeCommand extends AbstractJmxCommand {
 
             // If no message selector is specified, or next token is a new
             // option
-            if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
+            if (tokens.isEmpty() || tokens.get(0).startsWith("-")) {
                 context.printException(new IllegalArgumentException("Message selector not
specified"));
                 return;
             }
 
-            StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
+            StringTokenizer queryTokens = new StringTokenizer(tokens.remove(0), COMMAND_OPTION_DELIMETER);
             while (queryTokens.hasMoreTokens()) {
                 querySubObjects.add(queryTokens.nextToken());
             }
-
+        } else if (token.startsWith("--reset")) {
+            resetStatistics = true;
         } else {
             // Let super class handle unknown option
             super.handleOption(token, tokens);
         }
     }
-    
+
     /**
      * Converts the message selector as provided on command line
-     * argument to activem-admin into an SQL-92 conform string. 
+     * argument to activem-admin into an SQL-92 conform string.
      * E.g.
      *   "JMSMessageID='*:10',JMSPriority>5"
-     * gets converted into 
+     * gets converted into
      *   "(JMSMessageID='%:10') AND (JMSPriority>5)"
-     * 
-     * @param tokens - List of message selector query parameters 
-     * @return SQL-92 string of that query. 
+     *
+     * @param tokens - List of message selector query parameters
+     * @return SQL-92 string of that query.
      */
     public String convertToSQL92(List<String> tokens) {
-    	String selector = "";
+        String selector = "";
 
         // Convert to message selector
         for (Iterator i = tokens.iterator(); i.hasNext(); ) {
@@ -223,11 +227,12 @@ public class PurgeCommand extends AbstractJmxCommand {
         }
         return selector;
     }
-    
+
 
     /**
      * Print the help messages for the browse command
      */
+    @Override
     protected void printHelp() {
         context.printHelp(helpFile);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/c8b60431/activemq-console/src/test/java/org/apache/activemq/console/PurgeCommandTest.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/test/java/org/apache/activemq/console/PurgeCommandTest.java
b/activemq-console/src/test/java/org/apache/activemq/console/PurgeCommandTest.java
new file mode 100644
index 0000000..1ece787
--- /dev/null
+++ b/activemq-console/src/test/java/org/apache/activemq/console/PurgeCommandTest.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.console;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.LinkedList;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.console.command.PurgeCommand;
+import org.apache.activemq.console.formatter.CommandShellOutputFormatter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Tests for the purge command.
+ */
+public class PurgeCommandTest {
+
+    private BrokerService brokerService;
+    private ConnectionFactory factory;
+
+    @Rule public TestName name = new TestName();
+
+    @Before
+    public void createBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.getManagementContext().setCreateConnector(false);
+        brokerService.setPersistent(false);
+        TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        factory = new ActiveMQConnectionFactory(connector.getPublishableConnectString());
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    @Test(timeout = 30000)
+    public void testPurge() throws Exception {
+        produce(10);
+
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+        assertEquals(10, queueView.getQueueSize());
+
+        executePurge(getDestinationName());
+
+        assertEquals(0, queueView.getQueueSize());
+    }
+
+    @Test(timeout = 30000)
+    public void testPurgeWithReset() throws Exception {
+        produce(20);
+        consume(10);
+
+        QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+        assertEquals(10, queueView.getQueueSize());
+        assertEquals(20, queueView.getEnqueueCount());
+        assertEquals(10, queueView.getDequeueCount());
+
+        // Normal purge doesn't change stats.
+        executePurge(getDestinationName());
+
+        assertEquals(0, queueView.getQueueSize());
+        assertEquals(20, queueView.getEnqueueCount());
+        assertEquals(20, queueView.getDequeueCount());
+
+        // Purge on empty leaves stats alone.
+        executePurge(getDestinationName());
+
+        assertEquals(0, queueView.getQueueSize());
+        assertEquals(20, queueView.getEnqueueCount());
+        assertEquals(20, queueView.getDequeueCount());
+
+        executePurge("--reset " + getDestinationName());
+
+        // Purge on empty with reset clears stats.
+        assertEquals(0, queueView.getQueueSize());
+        assertEquals(0, queueView.getEnqueueCount());
+        assertEquals(0, queueView.getDequeueCount());
+
+        produce(20);
+        consume(10);
+
+        assertEquals(10, queueView.getQueueSize());
+        assertEquals(20, queueView.getEnqueueCount());
+        assertEquals(10, queueView.getDequeueCount());
+
+        executePurge("--reset " + getDestinationName());
+
+        // Purge on non-empty with reset clears stats.
+        assertEquals(0, queueView.getQueueSize());
+        assertEquals(0, queueView.getEnqueueCount());
+        assertEquals(0, queueView.getDequeueCount());
+    }
+
+    private String getDestinationName() {
+        return name.getMethodName();
+    }
+
+    private String executePurge(String options) throws Exception {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(1024);
+        CommandContext context = new CommandContext();
+        context.setFormatter(new CommandShellOutputFormatter(byteArrayOutputStream));
+
+        PurgeCommand purgeCommand = new PurgeCommand();
+        purgeCommand.setJmxUseLocal(true);
+        purgeCommand.setCommandContext(context);
+
+        LinkedList<String> args = new LinkedList<>();
+        args.addAll(Arrays.asList(options.split(" ")));
+        purgeCommand.execute(args);
+
+        return byteArrayOutputStream.toString();
+    }
+
+    private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException,
JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    private void produce(int count) throws Exception {
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageProducer producer = session.createProducer(queue);
+
+        for (int i = 0; i < count; ++i) {
+            producer.send(session.createMessage());
+        }
+
+        connection.close();
+    }
+
+    private void consume(int count) throws Exception {
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageConsumer consumer = session.createConsumer(queue);
+        connection.start();
+
+        for (int i = 0; i < count; ++i) {
+            assertNotNull(consumer.receive(1000));
+        }
+
+        connection.close();
+    }
+}


Mime
View raw message