activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6102
Date Mon, 21 Dec 2015 22:04:53 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 7dd233001 -> 871f0a600


https://issues.apache.org/jira/browse/AMQ-6102

Call to the statistics instance of the subscription to reset the
counters.
(cherry picked from commit 564d55023ec7386c31277db054ecc63d966e2b29)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/871f0a60
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/871f0a60
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/871f0a60

Branch: refs/heads/activemq-5.13.x
Commit: 871f0a60051192b36ea79936a4d020d8bd6eb8bd
Parents: 7dd2330
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Dec 21 17:03:24 2015 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Dec 21 17:04:47 2015 -0500

----------------------------------------------------------------------
 .../activemq/broker/jmx/SubscriptionView.java   |   4 +-
 .../apache/activemq/broker/jmx/MBeanTest.java   | 117 ++++++++++++++++---
 2 files changed, 105 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/871f0a60/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
index deefdb4..41495ea 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
@@ -420,8 +420,8 @@ public class SubscriptionView implements SubscriptionViewMBean {
 
     @Override
     public void resetStatistics() {
-        if (subscription != null){
-            subscription.resetConsumedCount();
+        if (subscription != null && subscription.getSubscriptionStatistics() != null){
+            subscription.getSubscriptionStatistics().reset();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/871f0a60/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index b9b8e2f..b2d690e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -26,7 +26,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.jms.*;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
 import javax.management.MBeanServer;
 import javax.management.MBeanServerInvocationHandler;
 import javax.management.MalformedObjectNameException;
@@ -51,11 +62,9 @@ import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTempQueue;
-import org.apache.activemq.memory.list.MessageList;
 import org.apache.activemq.util.JMXSupport;
 import org.apache.activemq.util.URISupport;
 import org.apache.activemq.util.Wait;
-import org.junit.Ignore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +74,7 @@ import org.slf4j.LoggerFactory;
  * command line application.
  */
 public class MBeanTest extends EmbeddedBrokerTestSupport {
+
     private static final Logger LOG = LoggerFactory.getLogger(MBeanTest.class);
 
     private static boolean waitForKeyPress;
@@ -171,7 +181,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
     }
 
     //Show broken behaviour https://issues.apache.org/jira/browse/AMQ-5752"
-    // points to the need to except on a duplicate or have store.addMessage return bool
+    // points to the need to except on a duplicate or have store.addMessage return boolean
     // need some thought on how best to resolve this
     public void Broken_testMoveDuplicateDoesNotDelete() throws Exception {
         connection = connectionFactory.createConnection();
@@ -1437,7 +1447,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
             String messageID = (String) cdata.get("JMSMessageID");
             assertNotNull("Should have a message ID for message " + i, messageID);
 
-            Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES);
+            @SuppressWarnings("unchecked")
+            Map<Object, Object> intProperties = CompositeDataHelper.getTabularMap(cdata,
CompositeDataConstants.INT_PROPERTIES);
             assertTrue("not empty", intProperties.size() > 0);
             assertEquals("counter in order", i, intProperties.get("counter"));
         }
@@ -1464,7 +1475,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         for (int i = 0; i < messageCount - 4; i++) {
             CompositeData cdata = compdatalist[i];
 
-            Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES);
+            @SuppressWarnings("unchecked")
+            Map<Object, Object> intProperties = CompositeDataHelper.getTabularMap(cdata,
CompositeDataConstants.INT_PROPERTIES);
             assertTrue("not empty", intProperties.size() > 0);
             assertEquals("counter in order", i + 5, intProperties.get("counter"));
             echo("Got: " + intProperties.get("counter"));
@@ -1476,7 +1488,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
         BrokerViewMBean brokerView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
-        Map connectors = brokerView.getTransportConnectors();
+        Map<String, String> connectors = brokerView.getTransportConnectors();
         LOG.info("Connectors: " + connectors);
         assertEquals("one connector", 1, connectors.size());
 
@@ -1656,13 +1668,91 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         ObjectName topicObjName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic");
         final TopicViewMBean topicView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
topicObjName, TopicViewMBean.class, true);
+        ArrayList<SubscriptionViewMBean> subscriberViews = new ArrayList<SubscriptionViewMBean>();
+        for (ObjectName name : topicView.getSubscriptions()) {
+            subscriberViews.add(MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
name, SubscriptionViewMBean.class, true));
+        }
 
-        assertEquals(1, topicView.getEnqueueCount());
-        assertEquals(4, topicView.getDispatchCount());
-        assertEquals(4, topicView.getInFlightCount());
-        assertEquals(0, topicView.getDequeueCount());
+        assertEquals(4, subscriberViews.size());
+
+        for (SubscriptionViewMBean subscriberView : subscriberViews) {
+            assertEquals(1, subscriberView.getEnqueueCounter());
+            assertEquals(1, subscriberView.getDispatchedCounter());
+            assertEquals(0, subscriberView.getDequeueCounter());
+        }
+
+        for (Message message : messages) {
+            try {
+                message.acknowledge();
+            } catch (JMSException ignore) {}
+        }
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return topicView.getDequeueCount() == 4;
+            }
+        });
+
+        for (SubscriptionViewMBean subscriberView : subscriberViews) {
+            assertEquals(1, subscriberView.getEnqueueCounter());
+            assertEquals(1, subscriberView.getDispatchedCounter());
+            assertEquals(1, subscriberView.getDequeueCounter());
+        }
+
+        for (SubscriptionViewMBean subscriberView : subscriberViews) {
+            subscriberView.resetStatistics();
+        }
+
+        for (SubscriptionViewMBean subscriberView : subscriberViews) {
+            assertEquals(0, subscriberView.getEnqueueCounter());
+            assertEquals(0, subscriberView.getDispatchedCounter());
+            assertEquals(0, subscriberView.getDequeueCounter());
+        }
+    }
+
+    public void testSubscriptionView() throws Exception {
+        connection = connectionFactory.createConnection();
+        connection.setClientID("test");
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        connection.start();
 
-        ArrayList<SubscriptionViewMBean> subscriberViews = new ArrayList();
+        Topic singleTopic = session.createTopic("test.topic");
+        Topic wildcardTopic = session.createTopic("test.>");
+
+        TopicSubscriber durable1 = session.createDurableSubscriber(singleTopic, "single");
+        TopicSubscriber durable2 = session.createDurableSubscriber(wildcardTopic, "wildcard");
+
+        MessageConsumer consumer1 = session.createConsumer(singleTopic);
+        MessageConsumer consumer2 = session.createConsumer(wildcardTopic);
+
+        final ArrayList<Message> messages = new ArrayList<>();
+
+        MessageListener listener = new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                messages.add(message);
+            }
+        };
+
+        durable1.setMessageListener(listener);
+        durable2.setMessageListener(listener);
+        consumer1.setMessageListener(listener);
+        consumer2.setMessageListener(listener);
+
+        MessageProducer producer = session.createProducer(singleTopic);
+        producer.send(session.createTextMessage("test"));
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return messages.size() == 4;
+            }
+        });
+
+        ObjectName topicObjName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic");
+        final TopicViewMBean topicView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
topicObjName, TopicViewMBean.class, true);
+        ArrayList<SubscriptionViewMBean> subscriberViews = new ArrayList<SubscriptionViewMBean>();
         for (ObjectName name : topicView.getSubscriptions()) {
             subscriberViews.add(MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
name, SubscriptionViewMBean.class, true));
         }
@@ -1681,6 +1771,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
             } catch (JMSException ignore) {}
         }
 
+        // Wait so that each subscription gets updated
         Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
@@ -1698,7 +1789,5 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
             assertEquals(1, subscriberView.getDispatchedCounter());
             assertEquals(1, subscriberView.getDequeueCounter());
         }
-
-
     }
 }


Mime
View raw message