activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748
Date Tue, 07 Jul 2015 20:28:35 GMT
https://issues.apache.org/jira/browse/AMQ-5748

Added a getMessageSize method to MessageStore to support retrieving the
total message size of all stored messages for a destination.  Added a
new storeMessageSize statistic to DestinationStatistics.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/785b16bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/785b16bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/785b16bf

Branch: refs/heads/master
Commit: 785b16bf9ef19180e7c9783442f4a125b44255e1
Parents: 7a68ad5
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Mon Apr 27 18:24:16 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Tue Jul 7 18:03:27 2015 +0000

----------------------------------------------------------------------
 .../activemq/broker/jmx/DestinationView.java    |   8 +
 .../broker/jmx/DestinationViewMBean.java        |   8 +
 .../apache/activemq/broker/region/Queue.java    |   1 +
 .../apache/activemq/broker/region/Topic.java    |   1 +
 .../activemq/store/AbstractMessageStore.java    |  21 ++
 .../org/apache/activemq/store/MessageStore.java |  12 +
 .../activemq/store/MessageStoreStatistics.java  |  81 ++++++
 .../activemq/store/ProxyMessageStore.java       |  11 +
 .../activemq/store/ProxyTopicMessageStore.java  |  12 +-
 .../store/memory/MemoryMessageStore.java        |  43 ++-
 .../activemq/management/SizeStatisticImpl.java  |  17 ++
 .../activemq/store/jdbc/JDBCMessageStore.java   |   2 +
 .../store/journal/JournalMessageStore.java      |  15 +-
 .../activemq/store/kahadb/KahaDBStore.java      |  69 ++---
 .../activemq/store/kahadb/MessageDatabase.java  | 172 +++++++++++-
 .../activemq/store/kahadb/TempKahaDBStore.java  |  40 +--
 .../kahadb/disk/util/LocationMarshaller.java    |   5 +
 .../apache/activemq/leveldb/LevelDBStore.scala  |   2 +-
 .../cursors/StoreQueueCursorOrderTest.java      |  10 +-
 .../store/AbstractMessageStoreSizeStatTest.java | 266 +++++++++++++++++++
 .../store/AbstractMessageStoreSizeTest.java     |  98 +++++++
 .../AbstractKahaDBMessageStoreSizeTest.java     | 147 ++++++++++
 .../kahadb/KahaDBMessageStoreSizeStatTest.java  |  82 ++++++
 .../kahadb/KahaDBMessageStoreSizeTest.java      |  46 ++++
 .../MultiKahaDBMessageStoreSizeStatTest.java    | 134 ++++++++++
 .../kahadb/MultiKahaDBMessageStoreSizeTest.java |  68 +++++
 .../memory/MemoryMessageStoreSizeStatTest.java  |  45 ++++
 .../memory/MemoryMessageStoreSizeTest.java      |  45 ++++
 .../kahadb/MessageStoreTest/version5/db-1.log   | Bin 0 -> 524288 bytes
 .../kahadb/MessageStoreTest/version5/db.data    | Bin 0 -> 32768 bytes
 .../kahadb/MessageStoreTest/version5/db.redo    | Bin 0 -> 32824 bytes
 .../version5/queue#3a#2f#2fTest/db-1.log        | Bin 0 -> 524288 bytes
 .../version5/queue#3a#2f#2fTest/db.data         | Bin 0 -> 32768 bytes
 .../version5/queue#3a#2f#2fTest/db.redo         | Bin 0 -> 32824 bytes
 34 files changed, 1382 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index b3bf869..3e51a49 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -38,6 +38,7 @@ import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.region.Destination;
@@ -51,6 +52,7 @@ import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -119,6 +121,12 @@ public class DestinationView implements DestinationViewMBean {
         return destination.getDestinationStatistics().getMessages().getCount();
     }
 
+    @Override
+    public long getStoreMessageSize() {
+        MessageStore messageStore = destination.getMessageStore();
+        return messageStore != null ? messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize() : 0;
+    }
+
     public long getMessagesCached() {
         return destination.getDestinationStatistics().getMessagesCached().getCount();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index 60340ff..aedc15d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -122,6 +122,14 @@ public interface DestinationViewMBean {
     long getQueueSize();
 
     /**
+     * Returns the memory size of all messages in this destination's store 
+     *
+     * @return Returns the memory size of all messages in this destination's store 
+     */
+    @MBeanInfo("The memory size of all messages in this destination's store.")
+    long getStoreMessageSize();
+
+    /**
      * @return An array of all the messages in the destination's queue.
      */
     @MBeanInfo("An array of all messages in the destination. Not HTML friendly.")

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index af61e19..c9823e1 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -375,6 +375,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             messages.setMaxProducersToAudit(getMaxProducersToAudit());
             messages.setUseCache(isUseCache());
             messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
+            store.start();
             final int messageCount = store.getMessageCount();
             if (messageCount > 0 && messages.isRecoveryRequired()) {
                 BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index bda000b..61c62ce 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -105,6 +105,7 @@ public class Topic extends BaseDestination implements Task {
             // misleading metrics.
             // int messageCount = store.getMessageCount();
             // destinationStatistics.getMessages().setCount(messageCount);
+            store.start();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
index faa6c1f..413f958 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
@@ -30,6 +30,7 @@ abstract public class AbstractMessageStore implements MessageStore {
     protected final ActiveMQDestination destination;
     protected boolean prioritizedMessages;
     protected IndexListener indexListener;
+    protected final MessageStoreStatistics messageStoreStatistics = new MessageStoreStatistics();
 
     public AbstractMessageStore(ActiveMQDestination destination) {
         this.destination = destination;
@@ -41,6 +42,7 @@ abstract public class AbstractMessageStore implements MessageStore {
 
     @Override
     public void start() throws Exception {
+        recoverMessageStoreStatistics();
     }
 
     @Override
@@ -132,4 +134,23 @@ abstract public class AbstractMessageStore implements MessageStore {
     static {
        FUTURE = new InlineListenableFuture();
     }
+
+    @Override
+    public int getMessageCount() throws IOException {
+        return (int) getMessageStoreStatistics().getMessageCount().getCount();
+    }
+
+    @Override
+    public long getMessageSize() throws IOException {
+        return getMessageStoreStatistics().getMessageSize().getTotalSize();
+    }
+
+    @Override
+    public MessageStoreStatistics getMessageStoreStatistics() {
+        return messageStoreStatistics;
+    }
+
+    protected void recoverMessageStoreStatistics() throws IOException {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
index 4cc472e..aee619a 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
@@ -159,6 +159,18 @@ public interface MessageStore extends Service {
     int getMessageCount() throws IOException;
 
     /**
+     * @return the size of the messages ready to deliver
+     * @throws IOException
+     */
+    long getMessageSize() throws IOException;
+
+
+    /**
+     * @return The statistics bean for this message store
+     */
+    MessageStoreStatistics getMessageStoreStatistics();
+
+    /**
      * A hint to the Store to reset any batching state for the Destination
      *
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreStatistics.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreStatistics.java
new file mode 100644
index 0000000..0a2b021
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreStatistics.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store;
+
+import org.apache.activemq.management.CountStatisticImpl;
+import org.apache.activemq.management.SizeStatisticImpl;
+import org.apache.activemq.management.StatsImpl;
+
+/**
+ * The J2EE Statistics for a Message Sore
+ */
+public class MessageStoreStatistics extends StatsImpl {
+
+    protected CountStatisticImpl messageCount;
+    protected SizeStatisticImpl messageSize;
+
+
+    public MessageStoreStatistics() {
+        this(true);
+    }
+
+    public MessageStoreStatistics(boolean enabled) {
+
+        messageCount = new CountStatisticImpl("messageCount", "The number of messages in the store passing through the destination");
+        messageSize = new SizeStatisticImpl("messageSize","Size of messages in the store passing through the destination");
+
+        addStatistic("messageCount", messageCount);
+        addStatistic("messageSize", messageSize);
+
+        this.setEnabled(enabled);
+    }
+
+
+    public CountStatisticImpl getMessageCount() {
+        return messageCount;
+    }
+
+    public SizeStatisticImpl getMessageSize() {
+        return messageSize;
+    }
+
+    public void reset() {
+        if (this.isDoReset()) {
+            super.reset();
+            messageCount.reset();
+            messageSize.reset();
+        }
+    }
+
+    public void setEnabled(boolean enabled) {
+        super.setEnabled(enabled);
+        messageCount.setEnabled(enabled);
+        messageSize.setEnabled(enabled);
+    }
+
+    public void setParent(MessageStoreStatistics parent) {
+        if (parent != null) {
+            messageCount.setParent(parent.messageCount);
+            messageSize.setParent(parent.messageSize);
+        } else {
+            messageCount.setParent(null);
+            messageSize.setParent(null);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
index c9b2060..cd319a6 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
@@ -101,6 +101,11 @@ public class ProxyMessageStore implements MessageStore {
     }
 
     @Override
+    public long getMessageSize() throws IOException {
+        return delegate.getMessageSize();
+    }
+
+    @Override
     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
         delegate.recoverNextMessages(maxReturned, listener);
     }
@@ -169,4 +174,10 @@ public class ProxyMessageStore implements MessageStore {
     public String toString() {
         return delegate.toString();
     }
+
+    @Override
+    public MessageStoreStatistics getMessageStoreStatistics() {
+        return delegate.getMessageStoreStatistics();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index 0f47f61..5c59158 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
-import java.util.concurrent.Future;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -146,6 +145,11 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
     }
 
     @Override
+    public long getMessageSize() throws IOException {
+        return delegate.getMessageSize();
+    }
+
+    @Override
     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
         delegate.recoverNextMessages(maxReturned, listener);
     }
@@ -213,4 +217,10 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
     public void registerIndexListener(IndexListener indexListener) {
         delegate.registerIndexListener(indexListener);
     }
+
+    @Override
+    public MessageStoreStatistics getMessageStoreStatistics() {
+        return delegate.getMessageStoreStatistics();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index 7cdaa78..e71dab8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -35,8 +35,8 @@ import org.apache.activemq.store.AbstractMessageStore;
 /**
  * An implementation of {@link org.apache.activemq.store.MessageStore} which
  * uses a
- * 
- * 
+ *
+ *
  */
 public class MemoryMessageStore extends AbstractMessageStore {
 
@@ -56,6 +56,8 @@ public class MemoryMessageStore extends AbstractMessageStore {
     public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
         synchronized (messageTable) {
             messageTable.put(message.getMessageId(), message);
+            getMessageStoreStatistics().getMessageCount().increment();
+            getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
         }
         message.incrementReferenceCount();
         message.getMessageId().setFutureOrSequenceLong(sequenceId++);
@@ -93,6 +95,8 @@ public class MemoryMessageStore extends AbstractMessageStore {
             if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
                 lastBatchId = null;
             }
+            getMessageStoreStatistics().getMessageCount().decrement();
+            getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize());
         }
     }
 
@@ -114,20 +118,17 @@ public class MemoryMessageStore extends AbstractMessageStore {
     public void removeAllMessages(ConnectionContext context) throws IOException {
         synchronized (messageTable) {
             messageTable.clear();
+            getMessageStoreStatistics().reset();
         }
     }
 
     public void delete() {
         synchronized (messageTable) {
             messageTable.clear();
+            getMessageStoreStatistics().reset();
         }
     }
 
-    
-    public int getMessageCount() {
-        return messageTable.size();
-    }
-
     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
         synchronized (messageTable) {
             boolean pastLackBatch = lastBatchId == null;
@@ -161,8 +162,34 @@ public class MemoryMessageStore extends AbstractMessageStore {
 
     public void updateMessage(Message message) {
         synchronized (messageTable) {
+            Message original = messageTable.get(message.getMessageId());
+
+            //if can't be found then increment count, else remove old size
+            if (original == null) {
+                getMessageStoreStatistics().getMessageCount().increment();
+            } else {
+                getMessageStoreStatistics().getMessageSize().addSize(-original.getSize());
+            }
             messageTable.put(message.getMessageId(), message);
+            getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
         }
     }
-    
+
+    @Override
+    public void recoverMessageStoreStatistics() throws IOException {
+        synchronized (messageTable) {
+            long size = 0;
+            int count = 0;
+            for (Iterator<Message> iter = messageTable.values().iterator(); iter
+                    .hasNext();) {
+                Message msg = iter.next();
+                size += msg.getSize();
+            }
+
+            getMessageStoreStatistics().reset();
+            getMessageStoreStatistics().getMessageCount().setCount(count);
+            getMessageStoreStatistics().getMessageSize().setTotalSize(size);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
index 1cf0058..e2bc033 100644
--- a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
+++ b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
@@ -68,6 +68,23 @@ public class SizeStatisticImpl extends StatisticImpl {
     }
 
     /**
+     * Reset the total size to the new value
+     *
+     * @param size
+     */
+    public synchronized void setTotalSize(long size) {
+        count++;
+        totalSize = size;
+        if (size > maxSize) {
+            maxSize = size;
+        }
+        if (size < minSize || minSize == 0) {
+            minSize = size;
+        }
+        updateSampleTime();
+    }
+
+    /**
      * @return the maximum size of any step
      */
     public long getMaxSize() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 4674d7a..ac4e8ce 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -304,6 +304,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
         }
     }
 
+    @Override
     public int getMessageCount() throws IOException {
         int result = 0;
         TransactionContext c = persistenceAdapter.getTransactionContext();
@@ -401,4 +402,5 @@ public class JDBCMessageStore extends AbstractMessageStore {
     public String toString() {
         return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
index 2d44769..7ec10c4 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
@@ -48,8 +48,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A MessageStore that uses a Journal to store it's messages.
- * 
- * 
+ *
+ *
  */
 public class JournalMessageStore extends AbstractMessageStore {
 
@@ -79,7 +79,7 @@ public class JournalMessageStore extends AbstractMessageStore {
         this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
     }
 
-    
+
     public void setMemoryUsage(MemoryUsage memoryUsage) {
         this.memoryUsage=memoryUsage;
         longTermStore.setMemoryUsage(memoryUsage);
@@ -323,7 +323,7 @@ public class JournalMessageStore extends AbstractMessageStore {
     }
 
     /**
-     * 
+     *
      */
     public Message getMessage(MessageId identity) throws IOException {
         Message answer = null;
@@ -348,7 +348,7 @@ public class JournalMessageStore extends AbstractMessageStore {
      * Replays the checkpointStore first as those messages are the oldest ones,
      * then messages are replayed from the transaction log and then the cache is
      * updated.
-     * 
+     *
      * @param listener
      * @throws Exception
      */
@@ -404,6 +404,11 @@ public class JournalMessageStore extends AbstractMessageStore {
         return longTermStore.getMessageCount();
     }
 
+    public long getMessageSize() throws IOException {
+        peristenceAdapter.checkpoint(true, true);
+        return longTermStore.getMessageSize();
+    }
+
     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
         peristenceAdapter.checkpoint(true, true);
         longTermStore.recoverNextMessages(maxReturned, listener);

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 8ceef36..44f93a6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -61,6 +61,7 @@ import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.ListenableFuture;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.MessageStoreStatistics;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionIdTransformer;
@@ -504,34 +505,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
         }
 
         @Override
-        public int getMessageCount() throws IOException {
-            try {
-                lockAsyncJobQueue();
-                indexLock.writeLock().lock();
-                try {
-                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
-                        @Override
-                        public Integer execute(Transaction tx) throws IOException {
-                            // Iterate through all index entries to get a count
-                            // of messages in the destination.
-                            StoredDestination sd = getStoredDestination(dest, tx);
-                            int rc = 0;
-                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
-                                iterator.next();
-                                rc++;
-                            }
-                            return rc;
-                        }
-                    });
-                } finally {
-                    indexLock.writeLock().unlock();
-                }
-            } finally {
-                unlockAsyncJobQueue();
-            }
-        }
-
-        @Override
         public boolean isEmpty() throws IOException {
             indexLock.writeLock().lock();
             try {
@@ -716,6 +689,38 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
         public String toString(){
             return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
         }
+
+        @Override
+        protected void recoverMessageStoreStatistics() throws IOException {
+            try {
+                MessageStoreStatistics recoveredStatistics;
+                lockAsyncJobQueue();
+                indexLock.writeLock().lock();
+                try {
+                    recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
+                        @Override
+                        public MessageStoreStatistics execute(Transaction tx) throws IOException {
+                            MessageStoreStatistics statistics = new MessageStoreStatistics();
+
+                            // Iterate through all index entries to get the size of each message
+                            StoredDestination sd = getStoredDestination(dest, tx);
+                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
+                                int locationSize = iterator.next().getKey().getSize();
+                                statistics.getMessageCount().increment();
+                                statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
+                            }
+                           return statistics;
+                        }
+                    });
+                    getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
+                    getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
+                } finally {
+                    indexLock.writeLock().unlock();
+                }
+            } finally {
+                unlockAsyncJobQueue();
+            }
+        }
     }
 
     class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
@@ -993,12 +998,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
 
     @Override
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
-        return this.transactionStore.proxy(new KahaDBMessageStore(destination));
+        MessageStore store = this.transactionStore.proxy(new KahaDBMessageStore(destination));
+        storeCache.put(key(convert(destination)), store);
+        return store;
     }
 
     @Override
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
-        return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
+        TopicMessageStore store = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
+        storeCache.put(key(convert(destination)), store);
+        return store;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index ef8fe0a..e35619e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -46,6 +46,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -53,10 +54,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.activemq.ActiveMQMessageAuditNoSync;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.MessageStoreStatistics;
 import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
@@ -113,7 +119,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     static final int OPEN_STATE = 2;
     static final long NOT_ACKED = -1;
 
-    static final int VERSION = 5;
+    static final int VERSION = 6;
 
     protected class Metadata {
         protected Page<Metadata> page;
@@ -738,7 +744,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         long undoCounter=0;
 
         // Go through all the destinations to see if they have messages past the lastAppendLocation
-        for (StoredDestination sd : storedDestinations.values()) {
+        for (String key : storedDestinations.keySet()) {
+            StoredDestination sd = storedDestinations.get(key);
 
             final ArrayList<Long> matches = new ArrayList<Long>();
             // Find all the Locations that are >= than the last Append Location.
@@ -755,6 +762,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 sd.messageIdIndex.remove(tx, keys.messageId);
                 metadata.producerSequenceIdTracker.rollback(keys.messageId);
                 undoCounter++;
+                decrementAndSubSizeToStoreStat(key, keys.location.getSize());
                 // TODO: do we need to modify the ack positions for the pub sub case?
             }
         }
@@ -858,6 +866,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                             sd.messageIdIndex.remove(tx, keys.messageId);
                             LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
                             undoCounter++;
+                            decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
                             // TODO: do we need to modify the ack positions for the pub sub case?
                         }
                     } else {
@@ -1312,6 +1321,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         if (previous == null) {
             previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
             if (previous == null) {
+                incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
                 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
                 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
                     addAckLocationForNewMessage(tx, sd, id);
@@ -1337,7 +1347,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         }
         // record this id in any event, initial send or recovery
         metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
-        return id;
+
+       return id;
     }
 
     void trackPendingAdd(KahaDestination destination, Long seq) {
@@ -1367,9 +1378,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     new MessageKeys(command.getMessageId(), location)
             );
             sd.locationIndex.put(tx, location, id);
+            incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
             // on first update previous is original location, on recovery/replay it may be the updated location
             if(previousKeys != null && !previousKeys.location.equals(location)) {
                 sd.locationIndex.remove(tx, previousKeys.location);
+                decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
             }
             metadata.lastUpdate = location;
         } else {
@@ -1387,6 +1400,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
                 if (keys != null) {
                     sd.locationIndex.remove(tx, keys.location);
+                    decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
                     recordAckMessageReferenceLocation(ackLocation, keys.location);
                     metadata.lastUpdate = ackLocation;
                 }  else if (LOG.isDebugEnabled()) {
@@ -1414,7 +1428,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     recordAckMessageReferenceLocation(ackLocation, keys.location);
                 }
                 // The following method handles deleting un-referenced messages.
-                removeAckLocation(tx, sd, subscriptionKey, sequence);
+                removeAckLocation(command, tx, sd, subscriptionKey, sequence);
                 metadata.lastUpdate = ackLocation;
             } else if (LOG.isDebugEnabled()) {
                 LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
@@ -1470,6 +1484,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         String key = key(command.getDestination());
         storedDestinations.remove(key);
         metadata.destinations.remove(tx, key);
+        clearStoreStats(command.getDestination());
+        storeCache.remove(key(command.getDestination()));
     }
 
     void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
@@ -1494,13 +1510,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             sd.subLocations.remove(tx, subscriptionKey);
             sd.subscriptionAcks.remove(tx, subscriptionKey);
             sd.subscriptionCache.remove(subscriptionKey);
-            removeAckLocationsForSub(tx, sd, subscriptionKey);
+            removeAckLocationsForSub(command, tx, sd, subscriptionKey);
 
             if (sd.subscriptions.isEmpty(tx)) {
                 // remove the stored destination
                 KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
                 removeDestinationCommand.setDestination(command.getDestination());
                 updateIndex(tx, removeDestinationCommand, null);
+                clearStoreStats(command.getDestination());
             }
         }
     }
@@ -1879,6 +1896,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         }
     }
 
+
     class StoredDestination {
 
         MessageOrderIndex orderIndex = new MessageOrderIndex();
@@ -1912,6 +1930,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
     protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
 
+    	final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
+
         @Override
         public StoredDestination readPayload(final DataInput dataIn) throws IOException {
             final StoredDestination value = new StoredDestination();
@@ -1996,12 +2016,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     public void execute(Transaction tx) throws IOException {
                         value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
                         value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-                        value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+                        value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
                         value.orderIndex.lowPriorityIndex.load(tx);
 
                         value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
                         value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-                        value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+                        value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
                         value.orderIndex.highPriorityIndex.load(tx);
                     }
                 });
@@ -2100,7 +2120,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         // Figure out the next key using the last entry in the destination.
         rc.orderIndex.configureLast(tx);
 
-        rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE);
+        rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller());
         rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
         rc.locationIndex.load(tx);
 
@@ -2202,6 +2222,133 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return rc;
     }
 
+    /**
+     * Clear the counter for the destination, if one exists.
+     *
+     * @param kahaDestination
+     */
+    protected void clearStoreStats(KahaDestination kahaDestination) {
+        MessageStoreStatistics storeStats = getStoreStats(key(kahaDestination));
+        if (storeStats != null) {
+            storeStats.reset();
+        }
+    }
+
+    /**
+     * Update MessageStoreStatistics
+     *
+     * @param kahaDestination
+     * @param size
+     */
+    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
+        incrementAndAddSizeToStoreStat(key(kahaDestination), size);
+    }
+
+    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
+        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
+        if (storeStats != null) {
+            storeStats.getMessageCount().increment();
+            if (size > 0) {
+                storeStats.getMessageSize().addSize(size);
+            }
+        }
+    }
+
+    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
+        decrementAndSubSizeToStoreStat(key(kahaDestination), size);
+    }
+
+    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
+        MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
+        if (storeStats != null) {
+            storeStats.getMessageCount().decrement();
+            if (size > 0) {
+                storeStats.getMessageSize().addSize(-size);
+            }
+        }
+    }
+
+    /**
+     * This is a map to cache DestinationStatistics for a specific
+     * KahaDestination key
+     */
+    protected final Map<String, MessageStore> storeCache =
+            new ConcurrentHashMap<String, MessageStore>();
+
+	/**
+	 * Locate the storeMessageSize counter for this KahaDestination
+	 * @param kahaDestination
+	 * @return
+	 */
+	protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
+	    MessageStoreStatistics storeStats = null;
+		try {
+		    MessageStore messageStore = storeCache.get(kahaDestKey);
+		    if (messageStore != null) {
+		        storeStats = messageStore.getMessageStoreStatistics();
+		    }
+		} catch (Exception e1) {
+			 LOG.error("Getting size counter of destination failed", e1);
+		}
+
+		return storeStats;
+	}
+
+    /**
+     * Determine whether this Destination matches the DestinationType
+     *
+     * @param destination
+     * @param type
+     * @return
+     */
+    protected boolean matchType(Destination destination,
+            KahaDestination.DestinationType type) {
+        if (destination instanceof Topic
+                && type.equals(KahaDestination.DestinationType.TOPIC)) {
+            return true;
+        } else if (destination instanceof Queue
+                && type.equals(KahaDestination.DestinationType.QUEUE)) {
+            return true;
+        }
+        return false;
+    }
+
+    class LocationSizeMarshaller implements Marshaller<Location> {
+
+        public LocationSizeMarshaller() {
+
+        }
+
+        public Location readPayload(DataInput dataIn) throws IOException {
+            Location rc = new Location();
+            rc.setDataFileId(dataIn.readInt());
+            rc.setOffset(dataIn.readInt());
+            if (metadata.version >= 6) {
+                rc.setSize(dataIn.readInt());
+            }
+            return rc;
+        }
+
+        public void writePayload(Location object, DataOutput dataOut)
+                throws IOException {
+            dataOut.writeInt(object.getDataFileId());
+            dataOut.writeInt(object.getOffset());
+            dataOut.writeInt(object.getSize());
+        }
+
+        public int getFixedSize() {
+            return 12;
+        }
+
+        public Location deepCopy(Location source) {
+            return new Location(source);
+        }
+
+        public boolean isDeepCopySupported() {
+            return true;
+        }
+    }
+
     private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
         SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
         if (sequences == null) {
@@ -2269,7 +2416,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         }
     }
 
-    private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
+    private void removeAckLocationsForSub(KahaSubscriptionCommand command,
+            Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
         if (!sd.ackPositions.isEmpty(tx)) {
             SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
             if (sequences == null || sequences.isEmpty()) {
@@ -2302,6 +2450,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     sd.locationIndex.remove(tx, entry.getValue().location);
                     sd.messageIdIndex.remove(tx, entry.getValue().messageId);
                     sd.orderIndex.remove(tx, entry.getKey());
+                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
                 }
             }
         }
@@ -2314,7 +2463,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
      * @param messageSequence
      * @throws IOException
      */
-    private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
+    private void removeAckLocation(KahaRemoveMessageCommand command,
+            Transaction tx, StoredDestination sd, String subscriptionKey,
+            Long messageSequence) throws IOException {
         // Remove the sub from the previous location set..
         if (messageSequence != null) {
             SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
@@ -2347,6 +2498,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     sd.locationIndex.remove(tx, entry.getValue().location);
                     sd.messageIdIndex.remove(tx, entry.getValue().messageId);
                     sd.orderIndex.remove(tx, entry.getKey());
+                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 45e35c6..04d74b6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -198,25 +198,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         }
 
         @Override
-        public int getMessageCount() throws IOException {
-            synchronized(indexMutex) {
-                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
-                    @Override
-                    public Integer execute(Transaction tx) throws IOException {
-                        // Iterate through all index entries to get a count of messages in the destination.
-                        StoredDestination sd = getStoredDestination(dest, tx);
-                        int rc=0;
-                        for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
-                            iterator.next();
-                            rc++;
-                        }
-                        return rc;
-                    }
-                });
-            }
-        }
-
-        @Override
         public void recover(final MessageRecoveryListener listener) throws Exception {
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
@@ -297,6 +278,27 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         public void stop() throws Exception {
         }
 
+        @Override
+        public void recoverMessageStoreStatistics() throws IOException {
+            int count = 0;
+            synchronized(indexMutex) {
+                count = pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+                    @Override
+                    public Integer execute(Transaction tx) throws IOException {
+                        // Iterate through all index entries to get a count of messages in the destination.
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        int rc=0;
+                        for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
+                            iterator.next();
+                            rc++;
+                        }
+                        return rc;
+                    }
+                });
+            }
+            getMessageStoreStatistics().getMessageCount().setCount(count);
+        }
+
     }
 
     class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java
index 7826a0b..e859f9c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java
@@ -22,8 +22,13 @@ import java.io.IOException;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 
 public class LocationMarshaller implements Marshaller<Location> {
+
     public final static LocationMarshaller INSTANCE = new LocationMarshaller();
 
+    public LocationMarshaller () {
+
+    }
+
     public Location readPayload(DataInput dataIn) throws IOException {
         Location rc = new Location();
         rc.setDataFileId(dataIn.readInt());

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 49e8cfa..7c2d327 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -834,7 +834,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
       cursorPosition = cursorResetPosition
     }
 
-    def getMessageCount: Int = {
+    override def getMessageCount: Int = {
       return db.collectionSize(key).toInt
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
index f8fab10..90b8428 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -491,11 +491,6 @@ public class StoreQueueCursorOrderTest {
         }
 
         @Override
-        public int getMessageCount() throws IOException {
-            return 0;
-        }
-
-        @Override
         public void resetBatching() {
 
         }
@@ -513,5 +508,10 @@ public class StoreQueueCursorOrderTest {
             batch.incrementAndGet();
         }
 
+        @Override
+        public void recoverMessageStoreStatistics() throws IOException {
+            this.getMessageStoreStatistics().reset();
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
new file mode 100644
index 0000000..59ae44b
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that KahaDB properly sets the new storeMessageSize statistic.
+ *
+ * AMQ-5748
+ *
+ */
+public abstract class AbstractMessageStoreSizeStatTest {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(AbstractMessageStoreSizeStatTest.class);
+
+
+    protected BrokerService broker;
+    protected URI brokerConnectURI;
+    protected String defaultQueueName = "test.queue";
+    protected static int messageSize = 1000;
+
+    @Before
+    public void startBroker() throws Exception {
+        setUpBroker(true);
+    }
+
+    protected void setUpBroker(boolean clearDataDir) throws Exception {
+
+        broker = new BrokerService();
+        this.initPersistence(broker);
+        //set up a transport
+        TransportConnector connector = broker
+                .addConnector(new TransportConnector());
+        connector.setUri(new URI("tcp://0.0.0.0:0"));
+        connector.setName("tcp");
+
+        broker.start();
+        broker.waitUntilStarted();
+        brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
+
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    protected abstract void initPersistence(BrokerService brokerService) throws IOException;
+
+    @Test
+    public void testMessageSize() throws Exception {
+        Destination dest = publishTestMessages(200);
+        verifyStats(dest, 200, 200 * messageSize);
+    }
+
+    @Test
+    public void testMessageSizeAfterConsumption() throws Exception {
+
+        Destination dest = publishTestMessages(200);
+        verifyStats(dest, 200, 200 * messageSize);
+
+        consumeTestMessages();
+        Thread.sleep(3000);
+        verifyStats(dest, 0, 0);
+    }
+
+    @Test
+    public void testMessageSizeDurable() throws Exception {
+
+        Destination dest = publishTestMessagesDurable();
+
+        //verify the count and size
+        verifyStats(dest, 200, 200 * messageSize);
+
+    }
+
+    @Test
+    public void testMessageSizeAfterDestinationDeletion() throws Exception {
+        Destination dest = publishTestMessages(200);
+        verifyStats(dest, 200, 200 * messageSize);
+
+        //check that the size is 0 after deletion
+        broker.removeDestination(dest.getActiveMQDestination());
+        verifyStats(dest, 0, 0);
+    }
+
+    protected void verifyStats(Destination dest, int count, long minimumSize) throws Exception {
+        MessageStore messageStore = dest.getMessageStore();
+        MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
+        assertEquals(messageStore.getMessageCount(), count);
+        assertEquals(messageStore.getMessageCount(),
+                storeStats.getMessageCount().getCount());
+        assertEquals(messageStore.getMessageSize(),
+                messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize());
+        if (count > 0) {
+            assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize);
+        } else {
+            assertEquals(storeStats.getMessageSize().getTotalSize(), 0);
+        }
+    }
+
+    /**
+     * Generate random 1 megabyte messages
+     * @param session
+     * @return
+     * @throws JMSException
+     */
+    protected BytesMessage createMessage(Session session) throws JMSException {
+        final BytesMessage message = session.createBytesMessage();
+        final byte[] data = new byte[messageSize];
+        final Random rng = new Random();
+        rng.nextBytes(data);
+        message.writeBytes(data);
+        return message;
+    }
+
+
+    protected Destination publishTestMessages(int count) throws Exception {
+        return publishTestMessages(count, defaultQueueName);
+    }
+
+    protected Destination publishTestMessages(int count, String queueName) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+                queueName);
+
+        Destination dest = broker.getDestination(activeMqQueue);
+
+        // Start the connection
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
+        .createConnection();
+        connection.setClientID("clientId" + queueName);
+        connection.start();
+        Session session = connection.createSession(false,
+                QueueSession.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(queueName);
+
+        try {
+            // publish a bunch of non-persistent messages to fill up the temp
+            // store
+            MessageProducer prod = session.createProducer(queue);
+            prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+            for (int i = 0; i < count; i++) {
+                prod.send(createMessage(session));
+            }
+
+        } finally {
+            connection.stop();
+        }
+
+        return dest;
+    }
+
+    protected Destination consumeTestMessages() throws Exception {
+        return consumeTestMessages(defaultQueueName);
+    }
+
+    protected Destination consumeTestMessages(String queueName) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+                queueName);
+
+        Destination dest = broker.getDestination(activeMqQueue);
+
+        // Start the connection
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
+        .createConnection();
+        connection.setClientID("clientId2" + queueName);
+        connection.start();
+        Session session = connection.createSession(false,
+                QueueSession.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(queueName);
+
+        try {
+            MessageConsumer consumer = session.createConsumer(queue);
+            for (int i = 0; i < 200; i++) {
+                consumer.receive();
+            }
+
+        } finally {
+            connection.stop();
+        }
+
+        return dest;
+    }
+
+    protected Destination publishTestMessagesDurable() throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+                "test.topic");
+
+        Destination dest = broker.getDestination(activeMqTopic);
+
+        // Start the connection
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
+        .createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+        Session session = connection.createSession(false,
+                TopicSession.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("test.topic");
+        session.createDurableSubscriber(topic, "sub1");
+
+        try {
+            // publish a bunch of non-persistent messages to fill up the temp
+            // store
+            MessageProducer prod = session.createProducer(topic);
+            prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+            for (int i = 0; i < 200; i++) {
+                prod.send(createMessage(session));
+            }
+
+        } finally {
+            connection.stop();
+        }
+
+        return dest;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java
new file mode 100644
index 0000000..923bc82
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Random;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IdGenerator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
+ * compute the size of the messages in the store.
+ *
+ */
+public abstract class AbstractMessageStoreSizeTest {
+
+    protected static final IdGenerator id = new IdGenerator();
+    protected ActiveMQQueue destination = new ActiveMQQueue("Test");
+    protected ProducerId producerId = new ProducerId("1.1.1");
+    protected static final int MESSAGE_COUNT = 20;
+    protected static String dataDirectory = "target/test-amq-5748/datadb";
+    protected static int testMessageSize = 1000;
+
+    @Before
+    public void init() throws Exception {
+        this.initStore();
+    }
+
+    @After
+    public void destroy() throws Exception {
+        this.destroyStore();
+    }
+
+    protected abstract void initStore() throws Exception;
+
+
+    protected abstract void destroyStore() throws Exception;
+
+
+    /**
+     * This method tests that the message size exists after writing a bunch of messages to the store.
+     * @throws Exception
+     */
+    @Test
+    public void testMessageSize() throws Exception {
+        writeMessages();
+        long messageSize = getMessageStore().getMessageSize();
+        assertTrue(getMessageStore().getMessageCount() == 20);
+        assertTrue(messageSize > 20 * testMessageSize);
+    }
+
+
+    /**
+     * Write random byte messages to the store for testing.
+     *
+     * @throws Exception
+     */
+    protected void writeMessages() throws Exception {
+        final ConnectionContext context = new ConnectionContext();
+
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            ActiveMQMessage message = new ActiveMQMessage();
+            final byte[] data = new byte[testMessageSize];
+            final Random rng = new Random();
+            rng.nextBytes(data);
+            message.setContent(new ByteSequence(data));
+            message.setDestination(destination);
+            message.setMessageId(new MessageId(id.generateId() + ":1", i));
+            getMessageStore().addMessage(context, message);
+        }
+    }
+
+    protected abstract MessageStore getMessageStore();
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
new file mode 100644
index 0000000..7d53cbd
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+
+import org.apache.activemq.store.AbstractMessageStoreSizeTest;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.junit.Test;
+
+/**
+ * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
+ * compute the size of the messages in the store.
+ *
+ * For KahaDB specifically, the size was not being stored in in the index ({@link LocationMarshaller}).  LocationMarshaller
+ * has been updated to include an option to include the size in the serialized value.  This way the message
+ * size will be persisted in the index and be available between broker restarts without needing to rebuild the index.
+ * Note that the KahaDB version has been incremented from 5 to 6 because the index will need to be rebuild when a version
+ * 5 index is detected since it will be detected as corrupt.
+ *
+ */
+public abstract class AbstractKahaDBMessageStoreSizeTest extends AbstractMessageStoreSizeTest {
+
+    MessageStore messageStore;
+    PersistenceAdapter store;
+
+    @Override
+    public void initStore() throws Exception {
+        createStore(true, dataDirectory);
+    }
+
+    abstract protected void createStore(boolean deleteAllMessages, String directory) throws Exception;
+
+    abstract protected String getVersion5Dir();
+
+    @Override
+    public void destroyStore() throws Exception {
+        if (store != null) {
+            store.stop();
+        }
+    }
+
+
+    /**
+     * This method tests that the message sizes exist for all messages that exist after messages are recovered
+     * off of disk.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMessageSizeStoreRecovery() throws Exception {
+        writeMessages();
+        store.stop();
+
+        createStore(false, dataDirectory);
+        writeMessages();
+        long messageSize = messageStore.getMessageSize();
+        assertEquals(40, messageStore.getMessageCount());
+        assertTrue(messageSize > 40 * testMessageSize);
+    }
+
+    /**
+     * This method tests that a version 5 store with an old index still works but returns 0 for messgage sizes.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMessageSizeStoreRecoveryVersion5() throws Exception {
+        store.stop();
+
+        //Copy over an existing version 5 store with messages
+        File dataDir = new File(dataDirectory);
+        if (dataDir.exists())
+            FileUtils.deleteDirectory(new File(dataDirectory));
+        FileUtils.copyDirectory(new File(getVersion5Dir()),
+                dataDir);
+
+        //reload store
+        createStore(false, dataDirectory);
+
+        //make sure size is 0
+        long messageSize = messageStore.getMessageSize();
+        assertTrue(messageStore.getMessageCount() == 20);
+        assertTrue(messageSize == 0);
+
+
+    }
+
+    /**
+     * This method tests that a version 5 store with existing messages will correctly be recovered and converted
+     * to version 6.  After index deletion, the index will be rebuilt and will include message sizes.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMessageSizeStoreRecoveryVersion5RebuildIndex() throws Exception {
+        store.stop();
+
+        //Copy over an existing version 5 store with messages
+        File dataDir = new File(dataDirectory);
+        if (dataDir.exists())
+            FileUtils.deleteDirectory(new File(dataDirectory));
+        FileUtils.copyDirectory(new File(getVersion5Dir()),
+                dataDir);
+        for (File index : FileUtils.listFiles(new File(dataDirectory), new WildcardFileFilter("*.data"), TrueFileFilter.INSTANCE)) {
+            FileUtils.deleteQuietly(index);
+        }
+
+        //append more messages...at this point the index should be rebuilt
+        createStore(false, dataDirectory);
+        writeMessages();
+
+        //after writing new messages to the existing store, make sure the index is rebuilt and size is correct
+        long messageSize = messageStore.getMessageSize();
+        assertTrue(messageStore.getMessageCount() == 40);
+        assertTrue(messageSize > 40 * testMessageSize);
+
+    }
+
+    @Override
+    protected MessageStore getMessageStore() {
+        return messageStore;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
new file mode 100644
index 0000000..bb46f20
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that KahaDB properly sets the new storeMessageSize
+ * statistic.
+ *
+ * AMQ-5748
+ *
+ */
+public class KahaDBMessageStoreSizeStatTest extends
+        AbstractMessageStoreSizeStatTest {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(KahaDBMessageStoreSizeStatTest.class);
+
+    File dataFileDir = new File("target/test-amq-5748/stat-datadb");
+
+    @Override
+    protected void setUpBroker(boolean clearDataDir) throws Exception {
+        if (clearDataDir && dataFileDir.exists())
+            FileUtils.cleanDirectory(dataFileDir);
+        super.setUpBroker(clearDataDir);
+    }
+
+    @Override
+    protected void initPersistence(BrokerService brokerService)
+            throws IOException {
+        broker.setPersistent(true);
+        broker.setDataDirectoryFile(dataFileDir);
+    }
+
+    /**
+     * Test that the the counter restores size and works after restart and more
+     * messages are published
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMessageSizeAfterRestartAndPublish() throws Exception {
+
+        Destination dest = publishTestMessages(200);
+
+        // verify the count and size
+        verifyStats(dest, 200, 200 * messageSize);
+
+        // stop, restart broker and publish more messages
+        stopBroker();
+        this.setUpBroker(false);
+        dest = publishTestMessages(200);
+
+        // verify the count and size
+        verifyStats(dest, 400, 400 * messageSize);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
new file mode 100644
index 0000000..43dc2f6
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+
+import org.apache.activemq.store.MessageStore;
+
+/**
+ * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
+ * compute the size of the messages in the KahaDB Store.
+ *
+ */
+public class KahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTest {
+
+    @Override
+    protected void createStore(boolean deleteAllMessages, String directory) throws Exception {
+        KahaDBStore kahaDBStore = new KahaDBStore();
+        store = kahaDBStore;
+        kahaDBStore.setJournalMaxFileLength(1024 * 512);
+        kahaDBStore.setDeleteAllMessages(deleteAllMessages);
+        kahaDBStore.setDirectory(new File(directory));
+        kahaDBStore.start();
+        messageStore = store.createQueueMessageStore(destination);
+        messageStore.start();
+    }
+
+    @Override
+    protected String getVersion5Dir() {
+        return "src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
new file mode 100644
index 0000000..4342e1d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that KahaDB properly sets the new storeMessageSize
+ * statistic.
+ *
+ * AMQ-5748
+ *
+ */
+public class MultiKahaDBMessageStoreSizeStatTest extends
+        AbstractMessageStoreSizeStatTest {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(MultiKahaDBMessageStoreSizeStatTest.class);
+
+    File dataFileDir = new File("target/test-amq-5748/stat-datadb");
+
+    @Override
+    protected void setUpBroker(boolean clearDataDir) throws Exception {
+        if (clearDataDir && dataFileDir.exists())
+            FileUtils.cleanDirectory(dataFileDir);
+        super.setUpBroker(clearDataDir);
+    }
+
+    @Override
+    protected void initPersistence(BrokerService brokerService)
+            throws IOException {
+        broker.setPersistent(true);
+
+        //setup multi-kaha adapter
+        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(dataFileDir);
+
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setJournalMaxFileLength(1024 * 512);
+
+        //set up a store per destination
+        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+        filtered.setPersistenceAdapter(kahaStore);
+        filtered.setPerDestination(true);
+        List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+        stores.add(filtered);
+
+        persistenceAdapter.setFilteredPersistenceAdapters(stores);
+        broker.setPersistenceAdapter(persistenceAdapter);
+    }
+
+    /**
+     * Test that the the counter restores size and works after restart and more
+     * messages are published
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMessageSizeAfterRestartAndPublish() throws Exception {
+
+        Destination dest = publishTestMessages(200);
+
+        // verify the count and size
+        verifyStats(dest, 200, 200 * messageSize);
+
+        // stop, restart broker and publish more messages
+        stopBroker();
+        this.setUpBroker(false);
+        dest = publishTestMessages(200);
+
+        // verify the count and size
+        verifyStats(dest, 400, 400 * messageSize);
+
+    }
+
+    @Test
+    public void testMessageSizeAfterRestartAndPublishMultiQueue() throws Exception {
+
+        Destination dest = publishTestMessages(200);
+
+        // verify the count and size
+        verifyStats(dest, 200, 200 * messageSize);
+        assertTrue(broker.getPersistenceAdapter().size() > 200 * messageSize);
+
+        Destination dest2 = publishTestMessages(200, "test.queue2");
+
+        // verify the count and size
+        verifyStats(dest2, 200, 200 * messageSize);
+        assertTrue(broker.getPersistenceAdapter().size() > 400 * messageSize);
+
+        // stop, restart broker and publish more messages
+        stopBroker();
+        this.setUpBroker(false);
+        dest = publishTestMessages(200);
+        dest2 = publishTestMessages(200, "test.queue2");
+
+        // verify the count and size after publishing messages
+        verifyStats(dest, 400, 400 * messageSize);
+        verifyStats(dest2, 400, 400 * messageSize);
+
+        System.out.println(broker.getPersistenceAdapter().size());
+        assertTrue(broker.getPersistenceAdapter().size() > 800 * messageSize);
+        assertTrue(broker.getPersistenceAdapter().size() >=
+                (dest.getMessageStore().getMessageSize() + dest2.getMessageStore().getMessageSize()));
+
+    }
+
+}


Mime
View raw message