activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5792
Date Thu, 28 May 2015 20:27:12 GMT
Repository: activemq
Updated Branches:
  refs/heads/master f320f8499 -> df8dcb504


https://issues.apache.org/jira/browse/AMQ-5792

Adding SubscriptionStatistics to group together all metrics in a bean
for a subscription.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/eece576b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/eece576b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/eece576b

Branch: refs/heads/master
Commit: eece576b195686c55c5435ebdece09cf68cb5664
Parents: f320f84
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Thu May 21 19:47:22 2015 +0000
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu May 28 15:47:30 2015 -0400

----------------------------------------------------------------------
 .../broker/region/AbstractSubscription.java     |  17 ++--
 .../broker/region/DurableTopicSubscription.java |   6 +-
 .../broker/region/PrefetchSubscription.java     |  35 +++----
 .../activemq/broker/region/Subscription.java    |   2 +
 .../broker/region/SubscriptionStatistics.java   | 101 +++++++++++++++++++
 .../broker/region/TopicSubscription.java        |  39 ++++---
 .../region/QueueDuplicatesFromStoreTest.java    |  10 +-
 .../region/SubscriptionAddRemoveQueueTest.java  |   8 +-
 8 files changed, 167 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/eece576b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index 1ed2fae..37056a2 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -53,7 +53,7 @@ public abstract class AbstractSubscription implements Subscription {
     private int cursorMemoryHighWaterMark = 70;
     private boolean slowConsumer;
     private long lastAckTime;
-    private AtomicLong consumedCount = new AtomicLong();
+    private final SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
 
     public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
         this.broker = broker;
@@ -89,7 +89,7 @@ public abstract class AbstractSubscription implements Subscription {
     @Override
     public synchronized void acknowledge(final ConnectionContext context, final MessageAck
ack) throws Exception {
         this.lastAckTime = System.currentTimeMillis();
-        this.consumedCount.incrementAndGet();
+        subscriptionStatistics.getConsumedCount().increment();
     }
 
     @Override
@@ -230,7 +230,7 @@ public abstract class AbstractSubscription implements Subscription {
     @Override
     public int getInFlightUsage() {
         if (info.getPrefetchSize() > 0) {
-        return (getInFlightSize() * 100)/info.getPrefetchSize();
+            return (getInFlightSize() * 100)/info.getPrefetchSize();
         }
         return Integer.MAX_VALUE;
     }
@@ -285,14 +285,19 @@ public abstract class AbstractSubscription implements Subscription {
     }
 
     public long getConsumedCount(){
-        return consumedCount.get();
+        return subscriptionStatistics.getConsumedCount().getCount();
     }
 
     public void incrementConsumedCount(){
-        consumedCount.incrementAndGet();
+        subscriptionStatistics.getConsumedCount().increment();
     }
 
     public void resetConsumedCount(){
-        consumedCount.set(0);
+        subscriptionStatistics.getConsumedCount().reset();
+    }
+
+    @Override
+    public SubscriptionStatistics getSubscriptionStatistics() {
+        return subscriptionStatistics;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/eece576b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 91e14f0..d87bd54 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -121,11 +121,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements
Us
         if (active.get() || keepDurableSubsActive) {
             Topic topic = (Topic) destination;
             topic.activate(context, this);
-            this.enqueueCounter += pending.size();
+            getSubscriptionStatistics().getEnqueues().add(pending.size());
         } else if (destination.getMessageStore() != null) {
             TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
             try {
-                this.enqueueCounter += store.getMessageCount(subscriptionKey.getClientId(),
subscriptionKey.getSubscriptionName());
+                getSubscriptionStatistics().getEnqueues().add(store.getMessageCount(subscriptionKey.getClientId(),
subscriptionKey.getSubscriptionName()));
             } catch (IOException e) {
                 JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from
store " + e);
                 jmsEx.setLinkedException(e);
@@ -325,7 +325,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements
Us
     @Override
     public synchronized String toString() {
         return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId()
+ ", active=" + isActive() + ", destinations="
-                + durableDestinations.size() + ", total=" + enqueueCounter + ", pending="
+ getPendingQueueSize() + ", dispatched=" + dispatchCounter
+                + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount()
+ ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount()
                 + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/eece576b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 0bbd000..53a00c9 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -58,9 +58,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
     protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
     protected final AtomicInteger prefetchExtension = new AtomicInteger();
     protected boolean usePrefetchExtension = true;
-    protected long enqueueCounter;
-    protected long dispatchCounter;
-    protected long dequeueCounter;
     private int maxProducersToAudit=32;
     private int maxAuditDepth=2048;
     protected final SystemUsage usageManager;
@@ -94,7 +91,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
         // consumers to 'wake them up' in case they were waiting for a message.
         if (getPrefetchSize() == 0) {
             prefetchExtension.set(pull.getQuantity());
-            final long dispatchCounterBeforePull = dispatchCounter;
+            final long dispatchCounterBeforePull = getSubscriptionStatistics().getDispatched().getCount();
 
             // Have the destination push us some messages.
             for (Destination dest : destinations) {
@@ -104,7 +101,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
 
             synchronized(this) {
                 // If there was nothing dispatched.. we may need to setup a timeout.
-                if (dispatchCounterBeforePull == dispatchCounter || pull.isAlwaysSignalDone())
{
+                if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount()
|| pull.isAlwaysSignalDone()) {
                     // immediate timeout used by receiveNoWait()
                     if (pull.getTimeout() == -1) {
                         // Null message indicates the pull is done or did not have pending.
@@ -132,7 +129,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
      */
     final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) {
         synchronized (pendingLock) {
-            if (dispatchCounterBeforePull == dispatchCounter || alwaysSignalDone) {
+            if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount()
|| alwaysSignalDone) {
                 try {
                     prefetchExtension.set(1);
                     add(QueueMessageReference.NULL_MESSAGE);
@@ -157,7 +154,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
 
             // Don't increment for the pullTimeout control message.
             if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
-                enqueueCounter++;
+                getSubscriptionStatistics().getEnqueues().increment();
             }
             pending.addMessageLast(node);
         }
@@ -227,7 +224,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                     if (inAckRange) {
                         // Don't remove the nodes until we are committed.
                         if (!context.isInTransaction()) {
-                            dequeueCounter++;
+                            getSubscriptionStatistics().getDequeues().increment();
                             ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
                             removeList.add(node);
                         } else {
@@ -257,7 +254,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                     if (ack.getLastMessageId().equals(messageId)) {
                         // Don't remove the nodes until we are committed - immediateAck option
                         if (!context.isInTransaction()) {
-                            dequeueCounter++;
+                            getSubscriptionStatistics().getDequeues().increment();
                             ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
                             dispatched.remove(node);
                         } else {
@@ -361,9 +358,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                         sendToDLQ(context, node, ack.getPoisonCause());
                         Destination nodeDest = (Destination) node.getRegionDestination();
                         nodeDest.getDestinationStatistics()
-                                .getInflight().decrement();
+                        .getInflight().decrement();
                         removeList.add(node);
-                        dequeueCounter++;
+                        getSubscriptionStatistics().getDequeues().increment();
                         index++;
                         acknowledge(context, ack, node);
                         if (ack.getLastMessageId().equals(messageId)) {
@@ -428,7 +425,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                             throws Exception {
                         Destination nodeDest = (Destination) node.getRegionDestination();
                         synchronized(dispatchLock) {
-                            dequeueCounter++;
+                            getSubscriptionStatistics().getDequeues().increment();
                             dispatched.remove(node);
                             nodeDest.getDestinationStatistics().getInflight().decrement();
                         }
@@ -550,17 +547,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
 
     @Override
     public long getDequeueCounter() {
-        return dequeueCounter;
+        return getSubscriptionStatistics().getDequeues().getCount();
     }
 
     @Override
     public long getDispatchedCounter() {
-        return dispatchCounter;
+        return getSubscriptionStatistics().getDispatched().getCount();
     }
 
     @Override
     public long getEnqueueCounter() {
-        return enqueueCounter;
+        return getSubscriptionStatistics().getEnqueues().getCount();
     }
 
     @Override
@@ -632,7 +629,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
 
     // made public so it can be used in MQTTProtocolConverter
     public void dispatchPending() throws IOException {
-       synchronized(pendingLock) {
+        synchronized(pendingLock) {
             try {
                 int numberToDispatch = countBeforeFull();
                 if (numberToDispatch > 0) {
@@ -695,7 +692,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
 
         MessageDispatch md = createMessageDispatch(node, message);
         if (node != QueueMessageReference.NULL_MESSAGE) {
-            dispatchCounter++;
+            getSubscriptionStatistics().getDispatched().increment();
             dispatched.add(node);
         }
         if (getPrefetchSize() == 0) {
@@ -724,7 +721,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                         if (node != QueueMessageReference.NULL_MESSAGE) {
                             nodeDest.getDestinationStatistics().getDispatched().increment();
                             nodeDest.getDestinationStatistics().getInflight().increment();
-                            LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight:
{}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(),
dispatchCounter, dispatched.size() });
+                            LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight:
{}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(),
getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
                         }
                     }
                     if (node instanceof QueueMessageReference) {
@@ -746,7 +743,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
             if (node != QueueMessageReference.NULL_MESSAGE) {
                 nodeDest.getDestinationStatistics().getDispatched().increment();
                 nodeDest.getDestinationStatistics().getInflight().increment();
-                LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{
info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size()
});
+                LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{
info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(),
dispatched.size() });
             }
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/eece576b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
index ee0e218..ec37512 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
@@ -137,6 +137,8 @@ public interface Subscription extends SubscriptionRecovery {
      */
     long getDequeueCounter();
 
+    SubscriptionStatistics getSubscriptionStatistics();
+
     /**
      * @return the JMS selector on the current subscription
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/eece576b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java
new file mode 100644
index 0000000..09fab8a
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region;
+
+import org.apache.activemq.management.CountStatisticImpl;
+import org.apache.activemq.management.StatsImpl;
+
+/**
+ * The J2EE Statistics for a Subsription.
+ */
+public class SubscriptionStatistics extends StatsImpl {
+
+    protected CountStatisticImpl consumedCount;
+    protected CountStatisticImpl enqueues;
+    protected CountStatisticImpl dequeues;
+    protected CountStatisticImpl dispatched;
+
+
+    public SubscriptionStatistics() {
+        this(true);
+    }
+
+    public SubscriptionStatistics(boolean enabled) {
+
+        consumedCount = new CountStatisticImpl("consumedCount", "The number of messages that
have been consumed by the subscription");
+        enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been
sent to the subscription");
+        dispatched = new CountStatisticImpl("dispatched", "The number of messages that have
been dispatched from the subscription");
+        dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been
acknowledged from the subscription");
+
+        addStatistic("consumedCount", consumedCount);
+        addStatistic("enqueues", enqueues);
+        addStatistic("dispatched", dispatched);
+        addStatistic("dequeues", dequeues);
+
+        this.setEnabled(enabled);
+    }
+
+    public CountStatisticImpl getConsumedCount() {
+        return consumedCount;
+    }
+
+    public CountStatisticImpl getEnqueues() {
+        return enqueues;
+    }
+
+    public CountStatisticImpl getDequeues() {
+        return dequeues;
+    }
+
+    public CountStatisticImpl getDispatched() {
+        return dispatched;
+    }
+
+    public void reset() {
+        if (this.isDoReset()) {
+            super.reset();
+            consumedCount.reset();
+            enqueues.reset();
+            dequeues.reset();
+            dispatched.reset();
+        }
+    }
+
+    public void setEnabled(boolean enabled) {
+        super.setEnabled(enabled);
+        consumedCount.setEnabled(enabled);
+        enqueues.setEnabled(enabled);
+        dispatched.setEnabled(enabled);
+        dequeues.setEnabled(enabled);
+    }
+
+    public void setParent(SubscriptionStatistics parent) {
+        if (parent != null) {
+            consumedCount.setParent(parent.consumedCount);
+            enqueues.setParent(parent.enqueues);
+            dispatched.setParent(parent.dispatched);
+            dequeues.setParent(parent.dequeues);
+        } else {
+            consumedCount.setParent(null);
+            enqueues.setParent(null);
+            dispatched.setParent(null);
+            dequeues.setParent(null);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/eece576b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index fe3d911..ba755ee 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -53,8 +53,6 @@ public class TopicSubscription extends AbstractSubscription {
 
     protected PendingMessageCursor matched;
     protected final SystemUsage usageManager;
-    protected AtomicLong dispatchedCounter = new AtomicLong();
-
     boolean singleDestination = true;
     Destination destination;
     private final Scheduler scheduler;
@@ -63,8 +61,6 @@ public class TopicSubscription extends AbstractSubscription {
     private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
     private int discarded;
     private final Object matchedListMutex = new Object();
-    private final AtomicLong enqueueCounter = new AtomicLong(0);
-    private final AtomicLong dequeueCounter = new AtomicLong(0);
     private final AtomicInteger prefetchExtension = new AtomicInteger(0);
     private int memoryUsageHighWaterMark = 95;
     // allow duplicate suppression in a ring network of brokers
@@ -106,7 +102,7 @@ public class TopicSubscription extends AbstractSubscription {
         // Lets use an indirect reference so that we can associate a unique
         // locator /w the message.
         node = new IndirectMessageReference(node.getMessage());
-        enqueueCounter.incrementAndGet();
+        getSubscriptionStatistics().getEnqueues().increment();
         synchronized (matchedListMutex) {
             // if this subscriber is already discarding a message, we don't want to add
             // any more messages to it as those messages can only be advisories generated
in the process,
@@ -135,7 +131,7 @@ public class TopicSubscription extends AbstractSubscription {
                         while (matched.isFull()) {
                             if (getContext().getStopping().get()) {
                                 LOG.warn("{}: stopped waiting for space in pendingMessage
cursor for: {}", toString(), node.getMessageId());
-                                enqueueCounter.decrementAndGet();
+                                getSubscriptionStatistics().getEnqueues().decrement();
                                 return;
                             }
                             if (!warnedAboutWait) {
@@ -231,7 +227,7 @@ public class TopicSubscription extends AbstractSubscription {
                 node.decrementReferenceCount();
                 if (broker.isExpired(node)) {
                     matched.remove();
-                    dispatchedCounter.incrementAndGet();
+                    getSubscriptionStatistics().getDispatched().increment();
                     node.decrementReferenceCount();
                     ((Destination)node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
                     broker.messageExpired(getContext(), node, this);
@@ -253,7 +249,7 @@ public class TopicSubscription extends AbstractSubscription {
                     node.decrementReferenceCount();
                     if (node.getMessageId().equals(mdn.getMessageId())) {
                         matched.remove();
-                        dispatchedCounter.incrementAndGet();
+                        getSubscriptionStatistics().getDispatched().increment();
                         node.decrementReferenceCount();
                         break;
                     }
@@ -275,12 +271,12 @@ public class TopicSubscription extends AbstractSubscription {
 
                     @Override
                     public void afterCommit() throws Exception {
-                       synchronized (TopicSubscription.this) {
+                        synchronized (TopicSubscription.this) {
                             if (singleDestination && destination != null) {
                                 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
                             }
                         }
-                        dequeueCounter.addAndGet(ack.getMessageCount());
+                        getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
                         dispatchMatched();
                     }
                 });
@@ -292,7 +288,7 @@ public class TopicSubscription extends AbstractSubscription {
                         destination.getDestinationStatistics().getForwards().add(ack.getMessageCount());
                     }
                 }
-                dequeueCounter.addAndGet(ack.getMessageCount());
+                getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
             }
             while (true) {
                 int currentExtension = prefetchExtension.get();
@@ -314,7 +310,7 @@ public class TopicSubscription extends AbstractSubscription {
                 destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
                 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
             }
-            dequeueCounter.addAndGet(ack.getMessageCount());
+            getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
             while (true) {
                 int currentExtension = prefetchExtension.get();
                 int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
@@ -337,12 +333,12 @@ public class TopicSubscription extends AbstractSubscription {
         // The slave should not deliver pull messages.
         if (getPrefetchSize() == 0) {
 
-            final long currentDispatchedCount = dispatchedCounter.get();
+            final long currentDispatchedCount = getSubscriptionStatistics().getDispatched().getCount();
             prefetchExtension.set(pull.getQuantity());
             dispatchMatched();
 
             // If there was nothing dispatched.. we may need to setup a timeout.
-            if (currentDispatchedCount == dispatchedCounter.get() || pull.isAlwaysSignalDone())
{
+            if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount()
|| pull.isAlwaysSignalDone()) {
 
                 // immediate timeout used by receiveNoWait()
                 if (pull.getTimeout() == -1) {
@@ -371,7 +367,7 @@ public class TopicSubscription extends AbstractSubscription {
      */
     private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) {
         synchronized (matchedListMutex) {
-            if (currentDispatchedCount == dispatchedCounter.get() || alwaysSendDone) {
+            if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount()
|| alwaysSendDone) {
                 try {
                     dispatch(null);
                 } catch (Exception e) {
@@ -390,7 +386,8 @@ public class TopicSubscription extends AbstractSubscription {
 
     @Override
     public int getDispatchedQueueSize() {
-        return (int)(dispatchedCounter.get() - prefetchExtension.get() - dequeueCounter.get());
+        return (int)(getSubscriptionStatistics().getDispatched().getCount() -
+                prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount());
     }
 
     public int getMaximumPendingMessages() {
@@ -399,17 +396,17 @@ public class TopicSubscription extends AbstractSubscription {
 
     @Override
     public long getDispatchedCounter() {
-        return dispatchedCounter.get();
+        return getSubscriptionStatistics().getDispatched().getCount();
     }
 
     @Override
     public long getEnqueueCounter() {
-        return enqueueCounter.get();
+        return getSubscriptionStatistics().getEnqueues().getCount();
     }
 
     @Override
     public long getDequeueCounter() {
-        return dequeueCounter.get();
+        return getSubscriptionStatistics().getDequeues().getCount();
     }
 
     /**
@@ -599,7 +596,7 @@ public class TopicSubscription extends AbstractSubscription {
         md.setConsumerId(info.getConsumerId());
         if (node != null) {
             md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
-            dispatchedCounter.incrementAndGet();
+            getSubscriptionStatistics().getDispatched().increment();
             // Keep track if this subscription is receiving messages from a single destination.
             if (singleDestination) {
                 if (destination == null) {
@@ -667,7 +664,7 @@ public class TopicSubscription extends AbstractSubscription {
     @Override
     public String toString() {
         return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations="
+ destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
-               + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
+                + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/eece576b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
index f69c380..d692d03 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
@@ -32,6 +32,7 @@ import junit.framework.TestCase;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.SubscriptionStatistics;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
@@ -101,7 +102,7 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
     public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception
{
         final PersistenceAdapter persistenceAdapter =  brokerService.getPersistenceAdapter();
         final MessageStore queueMessageStore =
-            persistenceAdapter.createQueueMessageStore(destination);
+                persistenceAdapter.createQueueMessageStore(destination);
         final ConnectionContext contextNotInTx = new ConnectionContext();
         final ConsumerInfo consumerInfo = new ConsumerInfo();
         final DestinationStatistics destinationStatistics = new DestinationStatistics();
@@ -139,6 +140,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
         // pull from store in small windows
         Subscription subscription = new Subscription() {
 
+            private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
+
             @Override
             public void add(MessageReference node) throws Exception {
                 if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId())
{
@@ -358,6 +361,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
             public void resetConsumedCount(){
 
             }
+
+            @Override
+            public SubscriptionStatistics getSubscriptionStatistics() {
+                return subscriptionStatistics;
+            }
         };
 
         queue.addSubscription(contextNotInTx, subscription);

http://git-wip-us.apache.org/repos/asf/activemq/blob/eece576b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
index 6964842..50c2136 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
@@ -173,8 +173,9 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
 
     public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
 
+        private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
         List<MessageReference> dispatched =
-            Collections.synchronizedList(new ArrayList<MessageReference>());
+                Collections.synchronizedList(new ArrayList<MessageReference>());
 
         public void acknowledge(ConnectionContext context, MessageAck ack)
                 throws Exception {
@@ -370,5 +371,10 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
             return 10;
         }
 
+        @Override
+        public SubscriptionStatistics getSubscriptionStatistics() {
+            return subscriptionStatistics;
+        }
+
     }
 }


Mime
View raw message