activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [2/2] git commit: https://issues.apache.org/jira/browse/AMQ-5289 - add forwardCount to destinationStatistics - allow local consumption to be accounted with dequeueCount - forwardCount so forwarded messages are not accounted for num hops times
Date Fri, 25 Jul 2014 10:48:37 GMT
https://issues.apache.org/jira/browse/AMQ-5289 - add forwardCount to destinationStatistics
- allow local consumption to be accounted with dequeueCount - forwardCount so forwarded messages
are not accounted for num hops times


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/25e3c1b3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/25e3c1b3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/25e3c1b3

Branch: refs/heads/trunk
Commit: 25e3c1b3c6f0702b6a60bbaa29465e3dbb1b8c76
Parents: 619864d
Author: gtully <gary.tully@gmail.com>
Authored: Fri Jul 25 11:46:36 2014 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Fri Jul 25 11:48:20 2014 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/jmx/DestinationView.java  |  5 +++++
 .../activemq/broker/jmx/DestinationViewMBean.java    | 10 ++++++++++
 .../broker/region/DestinationStatistics.java         | 10 ++++++++++
 .../org/apache/activemq/broker/region/Queue.java     |  3 +++
 .../activemq/broker/region/TopicSubscription.java    |  3 +++
 .../activemq/console/command/DstatCommand.java       |  9 ++++++---
 .../org/apache/activemq/broker/jmx/MBeanTest.java    |  1 +
 .../activemq/network/DemandForwardingBridgeTest.java | 15 +++++++++++++++
 .../usecases/ThreeBrokerTopicNetworkTest.java        |  4 ++++
 9 files changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/25e3c1b3/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index 3f62943..ec6fe7c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -90,6 +90,11 @@ public class DestinationView implements DestinationViewMBean {
     }
 
     @Override
+    public long getForwardCount() {
+        return destination.getDestinationStatistics().getForwards().getCount();
+    }
+
+    @Override
     public long getDispatchCount() {
         return destination.getDestinationStatistics().getDispatched().getCount();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/25e3c1b3/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index f83d47e..a42bcfa 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -70,6 +70,16 @@ public interface DestinationViewMBean {
     long getDequeueCount();
 
     /**
+     * Returns the number of messages that have been acknowledged by network subscriptions
from the
+     * destination.
+     *
+     * @return The number of messages that have been acknowledged by network subscriptions
from the
+     *         destination.
+     */
+    @MBeanInfo("Number of messages that have been forwarded (to a networked broker) from
the destination.")
+    long getForwardCount();
+
+    /**
      * Returns the number of messages that have been dispatched but not
      * acknowledged
      *

http://git-wip-us.apache.org/repos/asf/activemq/blob/25e3c1b3/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
index ee2b478..0a9176e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
@@ -31,6 +31,7 @@ public class DestinationStatistics extends StatsImpl {
 
     protected CountStatisticImpl enqueues;
     protected CountStatisticImpl dequeues;
+    protected CountStatisticImpl forwards;
     protected CountStatisticImpl consumers;
     protected CountStatisticImpl producers;
     protected CountStatisticImpl messages;
@@ -49,6 +50,7 @@ public class DestinationStatistics extends StatsImpl {
         enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been
sent to the destination");
         dispatched = new CountStatisticImpl("dispatched", "The number of messages that have
been dispatched from the destination");
         dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been
acknowledged from the destination");
+        forwards = new CountStatisticImpl("forwards", "The number of messages that have been
forwarded to a networked broker from the destination");
         inflight = new CountStatisticImpl("inflight", "The number of messages dispatched
but awaiting acknowledgement");
         expired = new CountStatisticImpl("expired", "The number of messages that have expired");
 
@@ -86,6 +88,10 @@ public class DestinationStatistics extends StatsImpl {
         return dequeues;
     }
 
+    public CountStatisticImpl getForwards() {
+        return forwards;
+    }
+
     public CountStatisticImpl getInflight() {
         return inflight;
     }
@@ -137,6 +143,7 @@ public class DestinationStatistics extends StatsImpl {
             super.reset();
             enqueues.reset();
             dequeues.reset();
+            forwards.reset();
             dispatched.reset();
             inflight.reset();
             expired.reset();
@@ -151,6 +158,7 @@ public class DestinationStatistics extends StatsImpl {
         enqueues.setEnabled(enabled);
         dispatched.setEnabled(enabled);
         dequeues.setEnabled(enabled);
+        forwards.setEnabled(enabled);
         inflight.setEnabled(enabled);
         expired.setEnabled(true);
         consumers.setEnabled(enabled);
@@ -169,6 +177,7 @@ public class DestinationStatistics extends StatsImpl {
             enqueues.setParent(parent.enqueues);
             dispatched.setParent(parent.dispatched);
             dequeues.setParent(parent.dequeues);
+            forwards.setParent(parent.forwards);
             inflight.setParent(parent.inflight);
             expired.setParent(parent.expired);
             consumers.setParent(parent.consumers);
@@ -183,6 +192,7 @@ public class DestinationStatistics extends StatsImpl {
             enqueues.setParent(null);
             dispatched.setParent(null);
             dequeues.setParent(null);
+            forwards.setParent(null);
             inflight.setParent(null);
             expired.setParent(null);
             consumers.setParent(null);

http://git-wip-us.apache.org/repos/asf/activemq/blob/25e3c1b3/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 06c74db..647ba68 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1810,6 +1810,9 @@ public class Queue extends BaseDestination implements Task, UsageListener
{
             } finally {
                 messagesLock.writeLock().unlock();
             }
+            if (sub != null && sub.getConsumerInfo().isNetworkSubscription()) {
+                getDestinationStatistics().getForwards().increment();
+            }
         }
 
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/25e3c1b3/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index d17fb2f..6b61379 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -285,6 +285,9 @@ public class TopicSubscription extends AbstractSubscription {
                 if (singleDestination && destination != null) {
                     destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
                     destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
+                    if (info.isNetworkSubscription()) {
+                        destination.getDestinationStatistics().getForwards().add(ack.getMessageCount());
+                    }
                 }
                 dequeueCounter.addAndGet(ack.getMessageCount());
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/25e3c1b3/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
b/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
index a6d6356..8a41d6a 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
@@ -95,7 +95,7 @@ public class DstatCommand extends AbstractJmxCommand {
         // sort list so the names is A..Z
         Collections.sort(queueList, new ObjectInstanceComparator());
 
-        context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #",
"Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
+        context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #",
"Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
 
         // Iterate through the queue result
         for (Object view : queueList) {
@@ -115,6 +115,7 @@ public class DstatCommand extends AbstractJmxCommand {
                     queueView.getConsumerCount(),
                     queueView.getEnqueueCount(),
                     queueView.getDequeueCount(),
+                    queueView.getForwardCount(),
                     queueView.getMemoryPercentUsage()));
         }
     }
@@ -128,7 +129,7 @@ public class DstatCommand extends AbstractJmxCommand {
         final String header = "%-50s  %10s  %10s  %10s  %10s  %10s  %10s";
         final String tableRow = "%-50s  %10d  %10d  %10d  %10d  %10d  %10d";
 
-        context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #",
"Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
+        context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #",
"Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
 
         Collections.sort(queueList, new ObjectInstanceComparator());
 
@@ -150,6 +151,7 @@ public class DstatCommand extends AbstractJmxCommand {
                     queueView.getConsumerCount(),
                     queueView.getEnqueueCount(),
                     queueView.getDequeueCount(),
+                    queueView.getForwardCount(),
                     queueView.getMemoryPercentUsage()));
         }
     }
@@ -166,7 +168,7 @@ public class DstatCommand extends AbstractJmxCommand {
         // sort list so the names is A..Z
         Collections.sort(topicsList, new ObjectInstanceComparator());
 
-        context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #",
"Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
+        context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #",
"Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
 
         // Iterate through the topics result
         for (Object view : topicsList) {
@@ -186,6 +188,7 @@ public class DstatCommand extends AbstractJmxCommand {
                     topicView.getConsumerCount(),
                     topicView.getEnqueueCount(),
                     topicView.getDequeueCount(),
+                    topicView.getForwardCount(),
                     topicView.getMemoryPercentUsage()));
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/25e3c1b3/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index 88ab95a..a47b7a9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -169,6 +169,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
         assertTrue("use cache", queueNew.isUseCache());
         assertTrue("cache enabled", queueNew.isCacheEnabled());
+        assertEquals("no forwards", 0, queueNew.getForwardCount());
     }
 
     public void testRemoveMessages() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/25e3c1b3/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
index 1491ba2..020a511 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
@@ -21,6 +21,7 @@ import javax.jms.DeliveryMode;
 import junit.framework.Test;
 
 import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
@@ -72,6 +73,11 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
         // Close consumer to cause the message to rollback.
         connection1.send(consumerInfo1.createRemoveCommand());
 
+        final DestinationStatistics destinationStatistics = broker.getDestination(destination).getDestinationStatistics();
+        assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount());
+        assertEquals("broker dest stat dequeues", 0, destinationStatistics.getDequeues().getCount());
+        assertEquals("broker dest stat forwards", 0, destinationStatistics.getForwards().getCount());
+
         // Now create remote consumer that should cause message to move to this
         // remote consumer.
         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
@@ -84,6 +90,15 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
                 return receiveMessage(connection2) != null;
             }
         }));
+
+        assertTrue("broker dest stat forwards", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 1 == destinationStatistics.getForwards().getCount();
+            }
+        }));
+        assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount());
+        assertEquals("remote broker dest stat dequeues", 1, remoteBroker.getDestination(destination).getDestinationStatistics().getDequeues().getCount());
     }
 
     public void initCombosForTestAddConsumerThenSend() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/25e3c1b3/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
index 33963b7..99deb28 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.util.MessageIdList;
 
 /**
@@ -77,6 +78,9 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport
{
         assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
         assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
         assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
+
+        assertEquals("Correct forwards from A", MESSAGE_COUNT,
+                brokers.get("BrokerA").broker.getDestination(ActiveMQDestination.transform(dest)).getDestinationStatistics().getForwards().getCount());
     }
 
     public void initCombosForTestABandBCbrokerNetworkWithSelectors() {


Mime
View raw message