activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [3/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5923
Date Wed, 09 Sep 2015 18:12:49 GMT
https://issues.apache.org/jira/browse/AMQ-5923

Adding metrics to track the pending message size for a queue and for
subscribers.  This is useful so that not only the pending count is
known but also the total message size left to consume. Also improving
the message size store tests as well.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/734fb7dd
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/734fb7dd
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/734fb7dd

Branch: refs/heads/master
Commit: 734fb7dda35285ada7bc57642215077e08c88e80
Parents: b17cc37
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Fri Aug 21 18:58:08 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Wed Sep 9 18:12:15 2015 +0000

----------------------------------------------------------------------
 .../broker/region/PrefetchSubscription.java     |   7 +
 .../apache/activemq/broker/region/Queue.java    |  13 +
 .../activemq/broker/region/Subscription.java    |   5 +
 .../broker/region/TopicSubscription.java        |   7 +
 .../region/cursors/AbstractStoreCursor.java     |  27 +
 .../cursors/FilePendingMessageCursor.java       |  21 +-
 .../region/cursors/OrderedPendingList.java      |  35 +-
 .../broker/region/cursors/PendingList.java      |   3 +
 .../region/cursors/PendingMessageCursor.java    |  66 +--
 .../region/cursors/PendingMessageHelper.java    |  68 +++
 .../region/cursors/PrioritizedPendingList.java  |  31 +-
 .../cursors/QueueDispatchPendingList.java       |   5 +
 .../region/cursors/QueueStorePrefetch.java      |  33 +-
 .../cursors/StoreDurableSubscriberCursor.java   |   9 +
 .../broker/region/cursors/StoreQueueCursor.java |  25 +
 .../region/cursors/TopicStorePrefetch.java      |  24 +-
 .../region/cursors/VMPendingMessageCursor.java  |  60 +-
 .../java/org/apache/activemq/store/PList.java   |   2 +
 .../activemq/store/ProxyTopicMessageStore.java  |  10 +
 .../activemq/store/TopicMessageStore.java       |   2 +
 .../store/memory/MemoryTopicMessageStore.java   |  10 +
 .../activemq/store/memory/MemoryTopicSub.java   |  23 +-
 .../store/jdbc/JDBCTopicMessageStore.java       |  31 +-
 .../store/journal/JournalTopicMessageStore.java |  24 +-
 .../activemq/store/kahadb/KahaDBStore.java      |  24 +
 .../activemq/store/kahadb/MessageDatabase.java  |  26 +
 .../activemq/store/kahadb/TempKahaDBStore.java  |   5 +
 .../store/kahadb/disk/index/ListIndex.java      |  20 +-
 .../store/kahadb/disk/index/ListNode.java       |  13 +-
 .../activemq/store/kahadb/plist/PListImpl.java  |  67 ++-
 .../apache/activemq/leveldb/LevelDBStore.scala  |   6 +
 .../region/QueueDuplicatesFromStoreTest.java    |   7 +
 .../region/SubscriptionAddRemoveQueueTest.java  |  42 ++
 .../AbstractPendingMessageCursorTest.java       | 547 +++++++++++++++++++
 .../cursors/KahaDBPendingMessageCursorTest.java | 126 +++++
 .../cursors/MemoryPendingMessageCursorTest.java | 145 +++++
 .../MultiKahaDBPendingMessageCursorTest.java    |  60 ++
 .../region/cursors/OrderPendingListTest.java    |  10 +
 .../store/AbstractMessageStoreSizeStatTest.java | 244 ++-------
 .../store/AbstractStoreStatTestSupport.java     | 268 +++++++++
 .../kahadb/KahaDBMessageStoreSizeStatTest.java  |  22 +-
 .../MultiKahaDBMessageStoreSizeStatTest.java    |  50 +-
 .../memory/MemoryMessageStoreSizeStatTest.java  |  22 +-
 43 files changed, 1919 insertions(+), 326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index ef1b372..4e688cc 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -582,6 +582,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
     }
 
     @Override
+    public long getPendingMessageSize() {
+        synchronized (pendingLock) {
+            return pending.messageSize();
+        }
+    }
+
+    @Override
     public int getDispatchedQueueSize() {
         return dispatched.size();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index c9823e1..b0b609b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -927,6 +927,19 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
         return msg;
     }
 
+    public long getPendingMessageSize() {
+        messagesLock.readLock().lock();
+        try{
+            return messages.messageSize();
+        } finally {
+            messagesLock.readLock().unlock();
+        }
+    }
+
+    public long getPendingMessageCount() {
+         return this.destinationStatistics.getMessages().getCount();
+    }
+
     @Override
     public String toString() {
         return destination.getQualifiedName() + ", subscriptions=" + consumers.size()

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
index 9452b99..4a8b341 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
@@ -119,6 +119,11 @@ public interface Subscription extends SubscriptionRecovery {
     int getPendingQueueSize();
 
     /**
+     * @return size of the messages pending delivery
+     */
+    long getPendingMessageSize();
+
+    /**
      * @return number of messages dispatched to the client
      */
     int getDispatchedQueueSize();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index d3e683d..e1c8a95 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -419,6 +419,13 @@ public class TopicSubscription extends AbstractSubscription {
     }
 
     @Override
+    public long getPendingMessageSize() {
+        synchronized (matchedListMutex) {
+            return matched.messageSize();
+        }
+    }
+
+    @Override
     public int getDispatchedQueueSize() {
         return (int)(getSubscriptionStatistics().getDispatched().getCount() -
                 prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount());

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 4bdd7f6..05e4b1f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -49,6 +50,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     final MessageId[] lastCachedIds = new MessageId[2];
     protected boolean hadSpace = false;
 
+
+
     protected AbstractStoreCursor(Destination destination) {
         super((destination != null ? destination.isPrioritizedMessages():false));
         this.regionDestination=destination;
@@ -60,6 +63,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized void start() throws Exception{
         if (!isStarted()) {
             super.start();
@@ -78,6 +82,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         resetSize();
     }
 
+    @Override
     public final synchronized void stop() throws Exception {
         resetBatch();
         super.stop();
@@ -85,6 +90,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final boolean recoverMessage(Message message) throws Exception {
         return recoverMessage(message,false);
     }
@@ -136,6 +142,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         duplicatesFromStore.clear();
     }
 
+    @Override
     public final synchronized void reset() {
         if (batchList.isEmpty()) {
             try {
@@ -150,6 +157,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public synchronized void release() {
         clearIterator(false);
     }
@@ -173,6 +181,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized boolean hasNext() {
         if (batchList.isEmpty()) {
             try {
@@ -187,6 +196,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized MessageReference next() {
         MessageReference result = null;
         if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
@@ -199,6 +209,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         return result;
     }
 
+    @Override
     public synchronized boolean addMessageLast(MessageReference node) throws Exception {
         boolean disableCache = false;
         if (hasSpace()) {
@@ -333,12 +344,14 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
         setCacheEnabled(false);
         size++;
     }
 
 
+    @Override
     public final synchronized void remove() {
         size--;
         if (iterator!=null) {
@@ -350,6 +363,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized void remove(MessageReference node) {
         if (batchList.remove(node) != null) {
             size--;
@@ -358,11 +372,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized void clear() {
         gc();
     }
 
 
+    @Override
     public synchronized void gc() {
         for (MessageReference msg : batchList) {
             rollback(msg.getMessageId());
@@ -374,6 +390,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         setCacheEnabled(false);
     }
 
+    @Override
     protected final synchronized void fillBatch() {
         if (LOG.isTraceEnabled()) {
             LOG.trace("{} fillBatch", this);
@@ -395,17 +412,20 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized boolean isEmpty() {
         // negative means more messages added to store through queue.send since last reset
         return size == 0;
     }
 
 
+    @Override
     public final synchronized boolean hasMessagesBufferedToDeliver() {
         return !batchList.isEmpty();
     }
 
 
+    @Override
     public final synchronized int size() {
         if (size < 0) {
             this.size = getStoreSize();
@@ -414,6 +434,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
     @Override
+    public final synchronized long messageSize() {
+        return getStoreMessageSize();
+    }
+
+    @Override
     public String toString() {
         return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
                     + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
@@ -428,6 +453,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
 
     protected abstract int getStoreSize();
 
+    protected abstract long getStoreMessageSize();
+
     protected abstract boolean isStoreEmpty();
 
     public Subscription getSubscription() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 7512e39..3f3f33b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -44,8 +44,8 @@ import org.apache.activemq.util.ByteSequence;
 /**
  * persist pending messages pending message (messages awaiting dispatch to a
  * consumer) cursor
- * 
- * 
+ *
+ *
  */
 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
     static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
@@ -198,15 +198,15 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
-     * @throws Exception 
+     * @throws Exception
      */
     @Override
     public synchronized boolean addMessageLast(MessageReference node) throws Exception {
         return tryAddMessageLast(node, 0);
     }
-    
+
     @Override
     public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
         if (!node.isExpired()) {
@@ -252,7 +252,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      */
     @Override
@@ -356,6 +356,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
         return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
     }
 
+    @Override
+    public synchronized long messageSize() {
+        return memoryList.messageSize() + (isDiskListEmpty() ? 0 : (int)getDiskList().messageSize());
+    }
+
     /**
      * clear all pending messages
      */
@@ -389,6 +394,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
         super.setSystemUsage(usageManager);
     }
 
+    @Override
     public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
             synchronized (this) {
@@ -497,10 +503,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
             }
         }
 
+        @Override
         public boolean hasNext() {
             return iterator.hasNext();
         }
 
+        @Override
         public MessageReference next() {
             try {
                 PListEntry entry = iterator.next();
@@ -513,6 +521,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
             }
         }
 
+        @Override
         public void remove() {
             iterator.remove();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
index 9bf9588..31870b1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
@@ -25,13 +25,23 @@ import java.util.Map;
 
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.management.SizeStatisticImpl;
 
 public class OrderedPendingList implements PendingList {
 
     private PendingNode root = null;
     private PendingNode tail = null;
     private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
+    private final SizeStatisticImpl messageSize;
+    private final PendingMessageHelper pendingMessageHelper;
 
+    public OrderedPendingList() {
+        messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages");
+        messageSize.setEnabled(true);
+        pendingMessageHelper = new PendingMessageHelper(map, messageSize);
+    }
+
+    @Override
     public PendingNode addMessageFirst(MessageReference message) {
         PendingNode node = new PendingNode(this, message);
         if (root == null) {
@@ -41,10 +51,11 @@ public class OrderedPendingList implements PendingList {
             root.linkBefore(node);
             root = node;
         }
-        this.map.put(message.getMessageId(), node);
+        pendingMessageHelper.addToMap(message, node);
         return node;
     }
 
+    @Override
     public PendingNode addMessageLast(MessageReference message) {
         PendingNode node = new PendingNode(this, message);
         if (root == null) {
@@ -53,29 +64,35 @@ public class OrderedPendingList implements PendingList {
             tail.linkAfter(node);
         }
         tail = node;
-        this.map.put(message.getMessageId(), node);
+        pendingMessageHelper.addToMap(message, node);
         return node;
     }
 
+    @Override
     public void clear() {
         this.root = null;
         this.tail = null;
         this.map.clear();
+        this.messageSize.reset();
     }
 
+    @Override
     public boolean isEmpty() {
         return this.map.isEmpty();
     }
 
+    @Override
     public Iterator<MessageReference> iterator() {
         return new Iterator<MessageReference>() {
             private PendingNode current = null;
             private PendingNode next = root;
 
+            @Override
             public boolean hasNext() {
                 return next != null;
             }
 
+            @Override
             public MessageReference next() {
                 MessageReference result = null;
                 this.current = this.next;
@@ -84,31 +101,39 @@ public class OrderedPendingList implements PendingList {
                 return result;
             }
 
+            @Override
             public void remove() {
                 if (this.current != null && this.current.getMessage() != null) {
-                    map.remove(this.current.getMessage().getMessageId());
+                    pendingMessageHelper.removeFromMap(this.current.getMessage());
                 }
                 removeNode(this.current);
             }
         };
     }
 
+    @Override
     public PendingNode remove(MessageReference message) {
         PendingNode node = null;
         if (message != null) {
-            node = this.map.remove(message.getMessageId());
+            node = pendingMessageHelper.removeFromMap(message);
             removeNode(node);
         }
         return node;
     }
 
+    @Override
     public int size() {
         return this.map.size();
     }
 
+    @Override
+    public long messageSize() {
+        return this.messageSize.getTotalSize();
+    }
+
     void removeNode(PendingNode node) {
         if (node != null) {
-            map.remove(node.getMessage().getMessageId());
+            pendingMessageHelper.removeFromMap(node.getMessage());
             if (root == node) {
                 root = (PendingNode) node.getNext();
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
index 153d8bd..adfa78e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
@@ -73,6 +73,8 @@ public interface PendingList extends Iterable<MessageReference> {
      */
     public int size();
 
+    public long messageSize();
+
     /**
      * Returns an iterator over the pending Messages.  The subclass controls how
      * the returned iterator actually traverses the list of pending messages allowing
@@ -81,6 +83,7 @@ public interface PendingList extends Iterable<MessageReference> {
      *
      * @return an Iterator that returns MessageReferences contained in this list.
      */
+    @Override
     public Iterator<MessageReference> iterator();
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
index 06d59f1..bf7fd7a 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
@@ -30,14 +30,14 @@ import org.apache.activemq.usage.SystemUsage;
 /**
  * Interface to pending message (messages awaiting disptach to a consumer)
  * cursor
- * 
- * 
+ *
+ *
  */
 public interface PendingMessageCursor extends Service {
 
     /**
      * Add a destination
-     * 
+     *
      * @param context
      * @param destination
      * @throws Exception
@@ -46,7 +46,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * remove a destination
-     * 
+     *
      * @param context
      * @param destination
      * @throws Exception
@@ -60,7 +60,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * check if a Destination is Empty for this cursor
-     * 
+     *
      * @param destination
      * @return true id the Destination is empty
      */
@@ -79,7 +79,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      * @return boolean true if successful, false if cursor traps a duplicate
      * @throws IOException
@@ -89,9 +89,9 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * add message to await dispatch - if it can
-     * 
+     *
      * @param node
-     * @param maxWaitTime 
+     * @param maxWaitTime
      * @return true if successful
      * @throws IOException
      * @throws Exception
@@ -100,7 +100,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      * @throws Exception
      */
@@ -108,7 +108,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * Add a message recovered from a retroactive policy
-     * 
+     *
      * @param node
      * @throws Exception
      */
@@ -134,6 +134,8 @@ public interface PendingMessageCursor extends Service {
      */
     int size();
 
+    long messageSize();
+
     /**
      * clear all pending messages
      */
@@ -142,7 +144,7 @@ public interface PendingMessageCursor extends Service {
     /**
      * Informs the Broker if the subscription needs to intervention to recover
      * it's state e.g. DurableTopicSubscriber may do
-     * 
+     *
      * @return true if recovery required
      */
     boolean isRecoveryRequired();
@@ -154,7 +156,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * Set the max batch size
-     * 
+     *
      * @param maxBatchSize
      */
     void setMaxBatchSize(int maxBatchSize);
@@ -167,7 +169,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * remove a node
-     * 
+     *
      * @param node
      */
     void remove(MessageReference node);
@@ -179,7 +181,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * Set the UsageManager
-     * 
+     *
      * @param systemUsage
      * @see org.apache.activemq.usage.SystemUsage
      */
@@ -204,7 +206,7 @@ public interface PendingMessageCursor extends Service {
      * @return true if the cursor is full
      */
     boolean isFull();
-    
+
     /**
      * @return true if the cursor has space to page messages into
      */
@@ -217,41 +219,41 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * destroy the cursor
-     * 
+     *
      * @throws Exception
      */
     void destroy() throws Exception;
 
     /**
      * Page in a restricted number of messages and increment the reference count
-     * 
+     *
      * @param maxItems
      * @return a list of paged in messages
      */
     LinkedList<MessageReference> pageInList(int maxItems);
-    
+
     /**
      * set the maximum number of producers to track at one time
      * @param value
      */
     void setMaxProducersToAudit(int value);
-    
+
     /**
      * @return the maximum number of producers to audit
      */
     int getMaxProducersToAudit();
-    
+
     /**
      * Set the maximum depth of message ids to track
-     * @param depth 
+     * @param depth
      */
     void setMaxAuditDepth(int depth);
-    
+
     /**
      * @return the audit depth
      */
     int getMaxAuditDepth();
-    
+
     /**
      * @return the enableAudit
      */
@@ -260,37 +262,37 @@ public interface PendingMessageCursor extends Service {
      * @param enableAudit the enableAudit to set
      */
     public void setEnableAudit(boolean enableAudit);
-    
+
     /**
-     * @return true if the underlying state of this cursor 
+     * @return true if the underlying state of this cursor
      * disappears when the broker shuts down
      */
     public boolean isTransient();
-    
-    
+
+
     /**
      * set the audit
      * @param audit
      */
     public void setMessageAudit(ActiveMQMessageAudit audit);
-    
-    
+
+
     /**
      * @return the audit - could be null
      */
     public ActiveMQMessageAudit getMessageAudit();
-    
+
     /**
      * use a cache to improve performance
      * @param useCache
      */
     public void setUseCache(boolean useCache);
-    
+
     /**
      * @return true if a cache may be used
      */
     public boolean isUseCache();
-    
+
     /**
      * remove from auditing the message id
      * @param id

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageHelper.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageHelper.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageHelper.java
new file mode 100644
index 0000000..f28d61b
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageHelper.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.Map;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.management.SizeStatisticImpl;
+
+/**
+ *
+ *
+ */
+public class PendingMessageHelper {
+
+    private final Map<MessageId, PendingNode> map;
+    private final SizeStatisticImpl messageSize;
+
+    public PendingMessageHelper(Map<MessageId, PendingNode> map,
+            SizeStatisticImpl messageSize) {
+        super();
+        this.map = map;
+        this.messageSize = messageSize;
+    }
+
+    public void addToMap(MessageReference message, PendingNode node) {
+        PendingNode previous = this.map.put(message.getMessageId(), node);
+        if (previous != null) {
+            try {
+                messageSize.addSize(-previous.getMessage().getSize());
+            } catch (Exception e) {
+              //expected for NullMessageReference
+            }
+        }
+        try {
+            messageSize.addSize(message.getSize());
+        } catch (Exception e) {
+          //expected for NullMessageReference
+        }
+    }
+
+    public PendingNode removeFromMap(MessageReference message) {
+        PendingNode removed = this.map.remove(message.getMessageId());
+        if (removed != null) {
+            try {
+                messageSize.addSize(-removed.getMessage().getSize());
+            } catch (Exception e) {
+                //expected for NullMessageReference
+            }
+        }
+        return removed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
index 9235b2c..70eaa53 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
@@ -25,50 +25,64 @@ import java.util.Map;
 
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.management.SizeStatisticImpl;
 
 public class PrioritizedPendingList implements PendingList {
 
     private static final Integer MAX_PRIORITY = 10;
     private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
     private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
+    private final SizeStatisticImpl messageSize;
+    private final PendingMessageHelper pendingMessageHelper;
+
 
     public PrioritizedPendingList() {
         for (int i = 0; i < MAX_PRIORITY; i++) {
             this.lists[i] = new OrderedPendingList();
         }
+        messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages");
+        messageSize.setEnabled(true);
+        pendingMessageHelper = new PendingMessageHelper(map, messageSize);
     }
 
+    @Override
     public PendingNode addMessageFirst(MessageReference message) {
         PendingNode node = getList(message).addMessageFirst(message);
-        this.map.put(message.getMessageId(), node);
+        this.pendingMessageHelper.addToMap(message, node);
         return node;
     }
 
+    @Override
     public PendingNode addMessageLast(MessageReference message) {
         PendingNode node = getList(message).addMessageLast(message);
-        this.map.put(message.getMessageId(), node);
+        this.pendingMessageHelper.addToMap(message, node);
         return node;
     }
 
+    @Override
     public void clear() {
         for (int i = 0; i < MAX_PRIORITY; i++) {
             this.lists[i].clear();
         }
         this.map.clear();
+        this.messageSize.reset();
     }
 
+    @Override
     public boolean isEmpty() {
         return this.map.isEmpty();
     }
 
+    @Override
     public Iterator<MessageReference> iterator() {
         return new PrioritizedPendingListIterator();
     }
 
+    @Override
     public PendingNode remove(MessageReference message) {
         PendingNode node = null;
         if (message != null) {
-            node = this.map.remove(message.getMessageId());
+            node = this.pendingMessageHelper.removeFromMap(message);
             if (node != null) {
                 node.getList().removeNode(node);
             }
@@ -76,11 +90,17 @@ public class PrioritizedPendingList implements PendingList {
         return node;
     }
 
+    @Override
     public int size() {
         return this.map.size();
     }
 
     @Override
+    public long messageSize() {
+        return this.messageSize.getTotalSize();
+    }
+
+    @Override
     public String toString() {
         return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
     }
@@ -111,10 +131,12 @@ public class PrioritizedPendingList implements PendingList {
                 }
             }
         }
+        @Override
         public boolean hasNext() {
             return list.size() > index;
         }
 
+        @Override
         public MessageReference next() {
             PendingNode node = list.get(this.index);
             this.currentIndex = this.index;
@@ -122,10 +144,11 @@ public class PrioritizedPendingList implements PendingList {
             return node.getMessage();
         }
 
+        @Override
         public void remove() {
             PendingNode node = list.get(this.currentIndex);
             if (node != null) {
-                map.remove(node.getMessage().getMessageId());
+                pendingMessageHelper.removeFromMap(node.getMessage());
                 node.getList().removeNode(node);
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
index 380569e..cdddd4c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
@@ -97,6 +97,11 @@ public class QueueDispatchPendingList implements PendingList {
     }
 
     @Override
+    public long messageSize() {
+        return pagedInPendingDispatch.messageSize() + redeliveredWaitingDispatch.messageSize();
+    }
+
+    @Override
     public Iterator<MessageReference> iterator() {
         return new Iterator<MessageReference>() {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index 9fb73c5..b10b2e2 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -32,14 +32,14 @@ import org.slf4j.LoggerFactory;
 /**
  * persist pending messages pending message (messages awaiting dispatch to a
  * consumer) cursor
- * 
- * 
+ *
+ *
  */
 class QueueStorePrefetch extends AbstractStoreCursor {
     private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
     private final MessageStore store;
     private final Broker broker;
-   
+
     /**
      * Construct it
      * @param queue
@@ -51,6 +51,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
 
     }
 
+    @Override
     public boolean recoverMessageReference(MessageId messageReference) throws Exception {
         Message msg = this.store.getMessage(messageReference);
         if (msg != null) {
@@ -62,36 +63,46 @@ class QueueStorePrefetch extends AbstractStoreCursor {
         }
     }
 
-   
-        
+
+
     @Override
     protected synchronized int getStoreSize() {
         try {
             int result = this.store.getMessageCount();
             return result;
-            
+
         } catch (IOException e) {
             LOG.error("Failed to get message count", e);
             throw new RuntimeException(e);
         }
     }
-    
+
+    @Override
+    protected synchronized long getStoreMessageSize() {
+        try {
+            return this.store.getMessageSize();
+        } catch (IOException e) {
+            LOG.error("Failed to get message size", e);
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     protected synchronized boolean isStoreEmpty() {
         try {
             return this.store.isEmpty();
-            
+
         } catch (Exception e) {
             LOG.error("Failed to get message count", e);
             throw new RuntimeException(e);
         }
     }
-    
+
     @Override
     protected void resetBatch() {
         this.store.resetBatching();
     }
-    
+
     @Override
     protected void setBatch(MessageId messageId) throws Exception {
         if (LOG.isTraceEnabled()) {
@@ -101,7 +112,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
         batchResetNeeded = false;
     }
 
-    
+
     @Override
     protected void doFillBatch() throws Exception {
         hadSpace = this.hasSpace();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
index 32000f5..9d723b8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
@@ -303,6 +303,15 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
     }
 
     @Override
+    public synchronized long messageSize() {
+        long pendingSize=0;
+        for (PendingMessageCursor tsp : storePrefetches) {
+            pendingSize += tsp.messageSize();
+        }
+        return pendingSize;
+    }
+
+    @Override
     public void setMaxBatchSize(int newMaxBatchSize) {
         for (PendingMessageCursor storePrefetch : storePrefetches) {
             storePrefetch.setMaxBatchSize(newMaxBatchSize);

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
index 5b072a6..caa93b6 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
@@ -51,6 +51,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         currentCursor = persistent;
     }
 
+    @Override
     public synchronized void start() throws Exception {
         started = true;
         super.start();
@@ -73,6 +74,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         pendingCount = persistent.size() + nonPersistent.size();
     }
 
+    @Override
     public synchronized void stop() throws Exception {
         started = false;
         if (nonPersistent != null) {
@@ -87,6 +89,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         pendingCount = 0;
     }
 
+    @Override
     public synchronized boolean addMessageLast(MessageReference node) throws Exception {
         boolean result = true;
         if (node != null) {
@@ -104,6 +107,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         return result;
     }
 
+    @Override
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
         if (node != null) {
             Message msg = node.getMessage();
@@ -119,10 +123,12 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         }
     }
 
+    @Override
     public synchronized void clear() {
         pendingCount = 0;
     }
 
+    @Override
     public synchronized boolean hasNext() {
         try {
             getNextCursor();
@@ -133,11 +139,13 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
        return currentCursor != null ? currentCursor.hasNext() : false;
     }
 
+    @Override
     public synchronized MessageReference next() {
         MessageReference result = currentCursor != null ? currentCursor.next() : null;
         return result;
     }
 
+    @Override
     public synchronized void remove() {
         if (currentCursor != null) {
             currentCursor.remove();
@@ -145,6 +153,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         pendingCount--;
     }
 
+    @Override
     public synchronized void remove(MessageReference node) {
         if (!node.isPersistent()) {
             nonPersistent.remove(node);
@@ -154,18 +163,21 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         pendingCount--;
     }
 
+    @Override
     public synchronized void reset() {
         nonPersistent.reset();
         persistent.reset();
         pendingCount = persistent.size() + nonPersistent.size();
     }
 
+    @Override
     public void release() {
         nonPersistent.release();
         persistent.release();
     }
 
 
+    @Override
     public synchronized int size() {
         if (pendingCount < 0) {
             pendingCount = persistent.size() + nonPersistent.size();
@@ -173,6 +185,12 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         return pendingCount;
     }
 
+    @Override
+    public synchronized long messageSize() {
+        return persistent.messageSize() + nonPersistent.messageSize();
+    }
+
+    @Override
     public synchronized boolean isEmpty() {
         // if negative, more messages arrived in store since last reset so non empty
         return pendingCount == 0;
@@ -185,6 +203,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
      * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
      * @return true if recovery required
      */
+    @Override
     public boolean isRecoveryRequired() {
         return false;
     }
@@ -203,6 +222,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         this.nonPersistent = nonPersistent;
     }
 
+    @Override
     public void setMaxBatchSize(int maxBatchSize) {
         persistent.setMaxBatchSize(maxBatchSize);
         if (nonPersistent != null) {
@@ -212,6 +232,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
     }
 
 
+    @Override
     public void setMaxProducersToAudit(int maxProducersToAudit) {
         super.setMaxProducersToAudit(maxProducersToAudit);
         if (persistent != null) {
@@ -222,6 +243,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         }
     }
 
+    @Override
     public void setMaxAuditDepth(int maxAuditDepth) {
         super.setMaxAuditDepth(maxAuditDepth);
         if (persistent != null) {
@@ -232,6 +254,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         }
     }
 
+    @Override
     public void setEnableAudit(boolean enableAudit) {
         super.setEnableAudit(enableAudit);
         if (persistent != null) {
@@ -266,6 +289,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
 
 
 
+    @Override
     public synchronized void gc() {
         if (persistent != null) {
             persistent.gc();
@@ -276,6 +300,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         pendingCount = persistent.size() + nonPersistent.size();
     }
 
+    @Override
     public void setSystemUsage(SystemUsage usageManager) {
         super.setSystemUsage(usageManager);
         if (persistent != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index 811531e..c3f788f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory;
 /**
  * persist pendingCount messages pendingCount message (messages awaiting disptach
  * to a consumer) cursor
- * 
- * 
+ *
+ *
  */
 class TopicStorePrefetch extends AbstractStoreCursor {
     private static final Logger LOG = LoggerFactory.getLogger(TopicStorePrefetch.class);
@@ -59,14 +59,17 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         this.storeHasMessages=this.size > 0;
     }
 
+    @Override
     public boolean recoverMessageReference(MessageId messageReference) throws Exception {
         // shouldn't get called
         throw new RuntimeException("Not supported");
     }
 
+    @Override
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
         batchList.addMessageFirst(node);
         size++;
+        //this.messageSize.addSize(node.getMessage().getSize());
     }
 
     @Override
@@ -88,7 +91,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
             }
             storeHasMessages = true;
         }
-        return recovered;      
+        return recovered;
     }
 
     @Override
@@ -100,7 +103,18 @@ class TopicStorePrefetch extends AbstractStoreCursor {
             throw new RuntimeException(e);
         }
     }
-    
+
+
+    @Override
+    protected synchronized long getStoreMessageSize() {
+        try {
+            return store.getMessageSize(clientId, subscriberName);
+        } catch (Exception e) {
+            LOG.error("{} Failed to get the outstanding message count from the store", this, e);
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     protected synchronized boolean isStoreEmpty() {
         try {
@@ -111,7 +125,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         }
     }
 
-            
+
     @Override
     protected void resetBatch() {
         this.store.resetBatching(clientId, subscriberName);

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
index 15c61df..75be766 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -28,13 +29,13 @@ import org.apache.activemq.broker.region.QueueMessageReference;
 /**
  * hold pending messages in a linked list (messages awaiting disptach to a
  * consumer) cursor
- * 
- * 
+ *
+ *
  */
 public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     private final PendingList list;
     private Iterator<MessageReference> iter;
-    
+
     public VMPendingMessageCursor(boolean prioritizedMessages) {
         super(prioritizedMessages);
         if (this.prioritizedMessages) {
@@ -44,7 +45,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
         }
     }
 
-    
+
+    @Override
     public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
             throws Exception {
         List<MessageReference> rc = new ArrayList<MessageReference>();
@@ -62,7 +64,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     /**
      * @return true if there are no pending messages
      */
-    
+
+    @Override
     public synchronized boolean isEmpty() {
         if (list.isEmpty()) {
             return true;
@@ -85,7 +88,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     /**
      * reset the cursor
      */
-    
+
+    @Override
     public synchronized void reset() {
         iter = list.iterator();
         last = null;
@@ -93,10 +97,11 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      */
-    
+
+    @Override
     public synchronized boolean addMessageLast(MessageReference node) {
         node.incrementReferenceCount();
         list.addMessageLast(node);
@@ -105,10 +110,11 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      */
-    
+
+    @Override
     public synchronized void addMessageFirst(MessageReference node) {
         node.incrementReferenceCount();
         list.addMessageFirst(node);
@@ -117,7 +123,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     /**
      * @return true if there pending messages to dispatch
      */
-    
+
+    @Override
     public synchronized boolean hasNext() {
         return iter.hasNext();
     }
@@ -125,7 +132,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     /**
      * @return the next pending message
      */
-    
+
+    @Override
     public synchronized MessageReference next() {
         last = iter.next();
         if (last != null) {
@@ -137,7 +145,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     /**
      * remove the message at the cursor position
      */
-    
+
+    @Override
     public synchronized void remove() {
         if (last != null) {
             last.decrementReferenceCount();
@@ -148,15 +157,22 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     /**
      * @return the number of pending messages
      */
-    
+
+    @Override
     public synchronized int size() {
         return list.size();
     }
 
+    @Override
+    public synchronized long messageSize() {
+        return list.messageSize();
+    }
+
     /**
      * clear all pending messages
      */
-    
+
+    @Override
     public synchronized void clear() {
         for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
             MessageReference ref = i.next();
@@ -165,7 +181,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
         list.clear();
     }
 
-    
+
+    @Override
     public synchronized void remove(MessageReference node) {
         list.remove(node);
         node.decrementReferenceCount();
@@ -173,11 +190,12 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
 
     /**
      * Page in a restricted number of messages
-     * 
+     *
      * @param maxItems
      * @return a list of paged in messages
      */
-    
+
+    @Override
     public LinkedList<MessageReference> pageInList(int maxItems) {
         LinkedList<MessageReference> result = new LinkedList<MessageReference>();
         for (Iterator<MessageReference>i = list.iterator();i.hasNext();) {
@@ -191,12 +209,14 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
         return result;
     }
 
-    
+
+    @Override
     public boolean isTransient() {
         return true;
     }
 
-    
+
+    @Override
     public void destroy() throws Exception {
         super.destroy();
         clear();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PList.java b/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
index 7438963..efe29ac 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
@@ -41,6 +41,8 @@ public interface PList {
 
     long size();
 
+    long messageSize();
+
     public interface PListIterator extends Iterator<PListEntry> {
         void release();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index 5c59158..6e79358 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -209,6 +209,7 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
         return delegate.isPrioritizedMessages();
     }
 
+    @Override
     public void updateMessage(Message message) throws IOException {
         delegate.updateMessage(message);
     }
@@ -223,4 +224,13 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
         return delegate.getMessageStoreStatistics();
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.activemq.store.TopicMessageStore#getMessageSize(java.lang.String, java.lang.String)
+     */
+    @Override
+    public long getMessageSize(String clientId, String subscriberName)
+            throws IOException {
+        return delegate.getMessageSize(clientId, subscriberName);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
index 163b184..a55118f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
@@ -102,6 +102,8 @@ public interface TopicMessageStore extends MessageStore {
      */
     int getMessageCount(String clientId, String subscriberName) throws IOException;
 
+    long getMessageSize(String clientId, String subscriberName) throws IOException;
+
     /**
      * Finds the subscriber entry for the given consumer info
      *

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
index 76199d7..ae693f1 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
@@ -146,6 +146,16 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
     }
 
     @Override
+    public synchronized long getMessageSize(String clientId, String subscriberName) throws IOException {
+        long result = 0;
+        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
+        if (sub != null) {
+            result = sub.messageSize();
+        }
+        return result;
+    }
+
+    @Override
     public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
         MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
         if (sub != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
index ec3807e..fc986f2 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
@@ -26,8 +26,8 @@ import org.apache.activemq.store.MessageRecoveryListener;
 
 /**
  * A holder for a durable subscriber
- * 
- * 
+ *
+ *
  */
 class MemoryTopicSub {
 
@@ -58,9 +58,20 @@ class MemoryTopicSub {
         return map.size();
     }
 
+    synchronized long messageSize() {
+        long messageSize = 0;
+
+        for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator(); iter.hasNext();) {
+            Entry<MessageId, Message> entry = iter.next();
+            messageSize += entry.getValue().getSize();
+        }
+
+        return messageSize;
+    }
+
     synchronized void recoverSubscription(MessageRecoveryListener listener) throws Exception {
-        for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) {
-            Map.Entry entry = (Entry)iter.next();
+        for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator(); iter.hasNext();) {
+            Entry<MessageId, Message> entry = iter.next();
             Object msg = entry.getValue();
             if (msg.getClass() == MessageId.class) {
                 listener.recoverMessageReference((MessageId)msg);
@@ -76,8 +87,8 @@ class MemoryTopicSub {
         // the message table is a synchronizedMap - so just have to synchronize
         // here
         int count = 0;
-        for (Iterator iter = map.entrySet().iterator(); iter.hasNext() && count < maxReturned;) {
-            Map.Entry entry = (Entry)iter.next();
+        for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator(); iter.hasNext() && count < maxReturned;) {
+            Entry<MessageId, Message> entry = iter.next();
             if (pastLackBatch) {
                 count++;
                 Object msg = entry.getValue();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index a0cb133..3bff9b2 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * 
+ *
  */
 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
 
@@ -57,7 +57,8 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
                PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
     private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
     private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() {
-         protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
+         @Override
+        protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
            return size() > SEQUENCE_ID_CACHE_SIZE;
         }
     };
@@ -67,6 +68,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         super(persistenceAdapter, adapter, wireFormat, topic, audit);
     }
 
+    @Override
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
         if (ack != null && ack.isUnmatchedAck()) {
             if (LOG.isTraceEnabled()) {
@@ -110,16 +112,19 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
     /**
      * @throws Exception
      */
+    @Override
     public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
             adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
+                @Override
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                     Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                     msg.getMessageId().setBrokerSequenceId(sequenceId);
                     return listener.recoverMessage(msg);
                 }
 
+                @Override
                 public boolean recoverMessageReference(String reference) throws Exception {
                     return listener.recoverMessageReference(new MessageId(reference));
                 }
@@ -149,16 +154,19 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
             return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
         }
 
+        @Override
         public String toString() {
             return Arrays.deepToString(perPriority);
         }
 
+        @Override
         public Iterator<LastRecoveredEntry> iterator() {
             return new PriorityIterator();
         }
 
         class PriorityIterator implements Iterator<LastRecoveredEntry> {
             int current = 9;
+            @Override
             public boolean hasNext() {
                 for (int i=current; i>=0; i--) {
                     if (perPriority[i].hasMessages()) {
@@ -169,10 +177,12 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
                 return false;
             }
 
+            @Override
             public LastRecoveredEntry next() {
                 return perPriority[current];
             }
 
+            @Override
             public void remove() {
                 throw new RuntimeException("not implemented");
             }
@@ -188,6 +198,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
             this.priority = priority;
         }
 
+        @Override
         public String toString() {
             return priority + "-" + stored + ":" + recovered;
         }
@@ -213,6 +224,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
             this.maxMessages = maxMessages;
         }
 
+        @Override
         public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
             if (delegate.hasSpace() && recoveredCount < maxMessages) {
                 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
@@ -226,6 +238,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
             return false;
         }
 
+        @Override
         public boolean recoverMessageReference(String reference) throws Exception {
             return delegate.recoverMessageReference(new MessageId(reference));
         }
@@ -244,6 +257,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
     public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
             throws Exception {
         //Duration duration = new Duration("recoverNextMessages");
@@ -253,7 +267,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         if (!subscriberLastRecoveredMap.containsKey(key)) {
            subscriberLastRecoveredMap.put(key, new LastRecovered());
         }
-        final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);        
+        final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);
         LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
         try {
             if (LOG.isTraceEnabled()) {
@@ -293,6 +307,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
     public void resetBatching(String clientId, String subscriptionName) {
         String key = getSubscriptionKey(clientId, subscriptionName);
         if (!pendingCompletion.contains(key))  {
@@ -330,6 +345,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
     public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -347,6 +363,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
      * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
      *      String)
      */
+    @Override
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -359,6 +376,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
     public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -372,6 +390,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -384,6 +403,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
     public int getMessageCount(String clientId, String subscriberName) throws IOException {
         //Duration duration = new Duration("getMessageCount");
         int result = 0;
@@ -403,6 +423,11 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         return result;
     }
 
+    @Override
+    public long getMessageSize(String clientId, String subscriberName) throws IOException {
+        return 0;
+    }
+
     protected String getSubscriptionKey(String clientId, String subscriberName) {
         String result = clientId + ":";
         result += subscriberName != null ? subscriberName : "NOT_SET";

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
index 51d9693..aa0cb5d 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
@@ -38,8 +38,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A MessageStore that uses a Journal to store it's messages.
- * 
- * 
+ *
+ *
  */
 public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
 
@@ -54,12 +54,14 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
         this.longTermStore = checkpointStore;
     }
 
+    @Override
     public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
         throws Exception {
         this.peristenceAdapter.checkpoint(true, true);
         longTermStore.recoverSubscription(clientId, subscriptionName, listener);
     }
 
+    @Override
     public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
                                     MessageRecoveryListener listener) throws Exception {
         this.peristenceAdapter.checkpoint(true, true);
@@ -67,21 +69,25 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
 
     }
 
+    @Override
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
         return longTermStore.lookupSubscription(clientId, subscriptionName);
     }
 
+    @Override
     public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
         this.peristenceAdapter.checkpoint(true, true);
         longTermStore.addSubscription(subscriptionInfo, retroactive);
     }
 
+    @Override
     public void addMessage(ConnectionContext context, Message message) throws IOException {
         super.addMessage(context, message);
     }
 
     /**
      */
+    @Override
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
                             final MessageId messageId, MessageAck originalAck) throws IOException {
         final boolean debug = LOG.isDebugEnabled();
@@ -111,6 +117,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
             }
             transactionStore.acknowledge(this, ack, location);
             context.getTransaction().addSynchronization(new Synchronization() {
+                @Override
                 public void afterCommit() throws Exception {
                     if (debug) {
                         LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
@@ -121,6 +128,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
                     }
                 }
 
+                @Override
                 public void afterRollback() throws Exception {
                     if (debug) {
                         LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
@@ -159,6 +167,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
         }
     }
 
+    @Override
     public RecordLocation checkpoint() throws IOException {
 
         final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
@@ -170,6 +179,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
         }
 
         return super.checkpoint(new Callback() {
+            @Override
             public void execute() throws Exception {
 
                 // Checkpoint the acknowledged messages.
@@ -193,19 +203,29 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
         return longTermStore;
     }
 
+    @Override
     public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
         longTermStore.deleteSubscription(clientId, subscriptionName);
     }
 
+    @Override
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return longTermStore.getAllSubscriptions();
     }
 
+    @Override
     public int getMessageCount(String clientId, String subscriberName) throws IOException {
         this.peristenceAdapter.checkpoint(true, true);
         return longTermStore.getMessageCount(clientId, subscriberName);
     }
 
+    @Override
+    public long getMessageSize(String clientId, String subscriberName) throws IOException {
+        this.peristenceAdapter.checkpoint(true, true);
+        return longTermStore.getMessageSize(clientId, subscriberName);
+    }
+
+    @Override
     public void resetBatching(String clientId, String subscriptionName) {
         longTermStore.resetBatching(clientId, subscriptionName);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 84aba07..bd45394 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -892,6 +892,30 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             }
         }
 
+
+        @Override
+        public long getMessageSize(String clientId, String subscriptionName) throws IOException {
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            indexLock.writeLock().lock();
+            try {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
+                    @Override
+                    public Integer execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
+                        if (cursorPos == null) {
+                            // The subscription might not exist.
+                            return 0;
+                        }
+
+                        return (int) getStoredMessageSize(tx, sd, subscriptionKey);
+                    }
+                });
+            } finally {
+                indexLock.writeLock().unlock();
+            }
+        }
+
         @Override
         public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
                 throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index ac767a7..b3fcfaa 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2536,6 +2536,32 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return 0;
     }
 
+    public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
+        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
+        long locationSize = 0;
+        if (messageSequences != null) {
+            Iterator<Long> sequences = messageSequences.iterator();
+
+            while (sequences.hasNext()) {
+                Long sequenceId = sequences.next();
+                //the last item is the next marker
+                if (!sequences.hasNext()) {
+                    break;
+                }
+                Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx);
+                while (iterator.hasNext()) {
+                    Entry<Location, Long> entry = iterator.next();
+                    if (entry.getValue() == sequenceId - 1) {
+                        locationSize += entry.getKey().getSize();
+                        break;
+                    }
+
+                }
+            }
+        }
+
+        return locationSize;
+    }
     protected String key(KahaDestination destination) {
         return destination.getType().getNumber() + ":" + destination.getName();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 04d74b6..920fc53 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -409,6 +409,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         }
 
         @Override
+        public long getMessageSize(String clientId, String subscriptionName) throws IOException {
+            return 0;
+        }
+
+        @Override
         public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             synchronized(indexMutex) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
index 82379ea..79ac7d1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.lang.ref.WeakReference;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -59,6 +60,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
         this(pageFile, page.getPageId());
     }
 
+    @Override
     synchronized public void load(Transaction tx) throws IOException {
         if (loaded.compareAndSet(false, true)) {
             LOG.debug("loading");
@@ -81,15 +83,22 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
                 ListNode<Key, Value> node = loadNode(tx, getHeadPageId());
                 setTailPageId(getHeadPageId());
                 size.addAndGet(node.size(tx));
+                onLoad(node, tx);
                 while (node.getNext() != NOT_SET ) {
                     node = loadNode(tx, node.getNext());
                     size.addAndGet(node.size(tx));
+                    onLoad(node, tx);
                     setTailPageId(node.getPageId());
                 }
             }
         }
     }
 
+    protected void onLoad(ListNode<Key, Value> node, Transaction tx) {
+
+    }
+
+    @Override
     synchronized public void unload(Transaction tx) {
         if (loaded.compareAndSet(true, false)) {
         }
@@ -103,6 +112,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
         return loadNode(tx, getTailPageId());
     }
 
+    @Override
     synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
         assertLoaded();
 
@@ -123,6 +133,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
     private Map.Entry<Key, Value> lastGetEntryCache = null;
     private WeakReference<Transaction> lastCacheTxSrc = new WeakReference<Transaction>(null);
 
+    @Override
     @SuppressWarnings({ "rawtypes", "unchecked" })
     synchronized public Value get(Transaction tx, Key key) throws IOException {
         assertLoaded();
@@ -144,6 +155,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
      *
      * @return the old value contained in the list if one exists or null.
      */
+    @Override
     @SuppressWarnings({ "rawtypes" })
     synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
 
@@ -211,6 +223,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
         return null;
     }
 
+    @Override
     @SuppressWarnings("rawtypes")
     synchronized public Value remove(Transaction tx, Key key) throws IOException {
         assertLoaded();
@@ -252,15 +265,17 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
         return null;
     }
 
-    public void onRemove() {
+    public void onRemove(Entry<Key, Value> removed) {
         size.decrementAndGet();
         flushCache();
     }
 
+    @Override
     public boolean isTransient() {
         return false;
     }
 
+    @Override
     synchronized public void clear(Transaction tx) throws IOException {
         for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); iterator.hasNext(); ) {
             ListNode<Key,Value>candidate = iterator.next();
@@ -280,6 +295,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
         return getHead(tx).isEmpty(tx);
     }
 
+    @Override
     synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
         return getHead(tx).iterator(tx);
     }
@@ -346,6 +362,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
     public Marshaller<Key> getKeyMarshaller() {
         return keyMarshaller;
     }
+    @Override
     public void setKeyMarshaller(Marshaller<Key> keyMarshaller) {
         this.keyMarshaller = keyMarshaller;
     }
@@ -353,6 +370,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
     public Marshaller<Value> getValueMarshaller() {
         return valueMarshaller;
     }
+    @Override
     public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
         this.valueMarshaller = valueMarshaller;
     }


Mime
View raw message