activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [3/6] activemq-artemis git commit: ARTEMIS-1663 - Add new message count and size metrics
Date Thu, 08 Feb 2018 17:10:22 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 2c4db3e..1c4038b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -722,7 +722,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
             refs.remove(message.getMessageID());
 
             // The delivering count should also be decreased as to avoid inconsistencies
-            ((QueueImpl) ref.getQueue()).decDelivering();
+            ((QueueImpl) ref.getQueue()).decDelivering(ref);
          }
 
          connectionFailed(e, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 90b8814..2620cf9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -116,7 +117,7 @@ public class LastValueQueue extends QueueImpl {
             } else {
                // We keep the current ref and ack the one we are returning
 
-               super.referenceHandled();
+               super.referenceHandled(ref);
 
                try {
                   super.acknowledge(ref);
@@ -139,7 +140,7 @@ public class LastValueQueue extends QueueImpl {
    private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
       MessageReference oldRef = hr.getReference();
 
-      referenceHandled();
+      referenceHandled(ref);
 
       try {
          oldRef.acknowledge();
@@ -323,6 +324,11 @@ public class LastValueQueue extends QueueImpl {
       public Long getConsumerId() {
          return this.consumerId;
       }
+
+      @Override
+      public long getPersistentSize() throws ActiveMQException {
+         return ref.getPersistentSize();
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 7543ba5..2802740 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.impl;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -158,7 +159,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
 
    @Override
    public void handled() {
-      queue.referenceHandled();
+      queue.referenceHandled(this);
    }
 
    @Override
@@ -239,4 +240,9 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
    public int hashCode() {
       return this.getMessage().hashCode();
    }
+
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return this.getMessage().getPersistentSize();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index cfd06d9..89ea2fc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -364,16 +365,25 @@ public class PostOfficeJournalLoader implements JournalLoader {
 
                List<PagedMessage> pgMessages = pg.read(storageManager);
                Map<Long, AtomicInteger> countsPerQueueOnPage = new HashMap<>();
+               Map<Long, AtomicLong> sizePerQueueOnPage = new HashMap<>();
 
                for (PagedMessage pgd : pgMessages) {
                   if (pgd.getTransactionID() <= 0) {
                      for (long q : pgd.getQueueIDs()) {
                         AtomicInteger countQ = countsPerQueueOnPage.get(q);
+                        AtomicLong sizeQ = sizePerQueueOnPage.get(q);
                         if (countQ == null) {
                            countQ = new AtomicInteger(0);
                            countsPerQueueOnPage.put(q, countQ);
                         }
+                        if (sizeQ == null) {
+                           sizeQ = new AtomicLong(0);
+                           sizePerQueueOnPage.put(q, sizeQ);
+                        }
                         countQ.incrementAndGet();
+                        if (pgd.getPersistentSize() > 0) {
+                           sizeQ.addAndGet(pgd.getPersistentSize());
+                        }
                      }
                   }
                }
@@ -387,12 +397,13 @@ public class PostOfficeJournalLoader implements JournalLoader {
                   PageSubscriptionCounter counter = store.getCursorProvider().getSubscription(entry.getKey()).getCounter();
 
                   AtomicInteger value = countsPerQueueOnPage.get(entry.getKey());
+                  AtomicLong sizeValue = sizePerQueueOnPage.get(entry.getKey());
 
                   if (value == null) {
                      logger.debug("Page " + entry.getKey() + " wasn't open, so we will just ignore");
                   } else {
                      logger.debug("Replacing counter " + value.get());
-                     counter.increment(txRecoverCounter, value.get());
+                     counter.increment(txRecoverCounter, value.get(), sizeValue.get());
                   }
                }
             } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index dbf79e2..5530179 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -171,6 +171,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    // The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
    private final AtomicInteger queueMemorySize = new AtomicInteger(0);
 
+   private final QueuePendingMessageMetrics pendingMetrics = new QueuePendingMessageMetrics(this);
+
+   private final QueuePendingMessageMetrics deliveringMetrics = new QueuePendingMessageMetrics(this);
+
    // used to control if we should recalculate certain positions inside deliverAsync
    private volatile boolean consumersChanged = true;
 
@@ -186,8 +190,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private AtomicLong messagesKilled = new AtomicLong(0);
 
-   protected final AtomicInteger deliveringCount = new AtomicInteger(0);
-
    private boolean paused;
 
    private long pauseStatusRecord = -1;
@@ -452,7 +454,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       this.server = server;
 
-      scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
+      scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this);
 
       if (addressSettingsRepository != null) {
          addressSettingsRepositoryListener = new AddressSettingsRepositoryListener();
@@ -1118,10 +1120,45 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       if (pageSubscription != null) {
          // messageReferences will have depaged messages which we need to discount from the counter as they are
          // counted on the pageSubscription as well
-         return messageReferences.size() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
+         return pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
+      } else {
+         return pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount();
+      }
+   }
+
+   @Override
+   public long getPersistentSize() {
+      if (pageSubscription != null) {
+         // messageReferences will have depaged messages which we need to discount from the counter as they are
+         // counted on the pageSubscription as well
+         return pendingMetrics.getPersistentSize() + getScheduledSize() + getDeliveringSize() + pageSubscription.getPersistentSize();
       } else {
-         return messageReferences.size() + getScheduledCount() + deliveringCount.get();
+         return pendingMetrics.getPersistentSize() + getScheduledSize() + getDeliveringSize();
+      }
+   }
+
+   @Override
+   public long getDurableMessageCount() {
+      if (isDurable()) {
+         if (pageSubscription != null) {
+            return pendingMetrics.getDurableMessageCount() + getDurableScheduledCount() + getDurableDeliveringCount() + pageSubscription.getMessageCount();
+         } else {
+            return pendingMetrics.getDurableMessageCount() + getDurableScheduledCount() + getDurableDeliveringCount();
+         }
+      }
+      return 0;
+   }
+
+   @Override
+   public long getDurablePersistentSize() {
+      if (isDurable()) {
+         if (pageSubscription != null) {
+            return pendingMetrics.getDurablePersistentSize() + getDurableScheduledSize() + getDurableDeliveringSize() + pageSubscription.getPersistentSize();
+         } else {
+            return pendingMetrics.getDurablePersistentSize() + getDurableScheduledSize() + getDurableDeliveringSize();
+         }
       }
+      return 0;
    }
 
    @Override
@@ -1130,6 +1167,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public synchronized long getScheduledSize() {
+      return scheduledDeliveryHandler.getScheduledSize();
+   }
+
+   @Override
+   public synchronized int getDurableScheduledCount() {
+      return scheduledDeliveryHandler.getDurableScheduledCount();
+   }
+
+   @Override
+   public synchronized long getDurableScheduledSize() {
+      return scheduledDeliveryHandler.getDurableScheduledSize();
+   }
+
+   @Override
    public synchronized List<MessageReference> getScheduledMessages() {
       return scheduledDeliveryHandler.getScheduledReferences();
    }
@@ -1153,7 +1205,22 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public int getDeliveringCount() {
-      return deliveringCount.get();
+      return deliveringMetrics.getMessageCount();
+   }
+
+   @Override
+   public long getDeliveringSize() {
+      return deliveringMetrics.getPersistentSize();
+   }
+
+   @Override
+   public int getDurableDeliveringCount() {
+      return deliveringMetrics.getDurableMessageCount();
+   }
+
+   @Override
+   public long getDurableDeliveringSize() {
+      return deliveringMetrics.getDurablePersistentSize();
    }
 
    @Override
@@ -1239,7 +1306,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       getRefsOperation(tx).addAck(ref);
 
       // https://issues.jboss.org/browse/HORNETQ-609
-      incDelivering();
+      incDelivering(ref);
 
       messagesAcknowledged.incrementAndGet();
    }
@@ -1287,7 +1354,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
          resetAllIterators();
       } else {
-         decDelivering();
+         decDelivering(reference);
       }
    }
 
@@ -1354,8 +1421,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public void referenceHandled() {
-      incDelivering();
+   public void referenceHandled(MessageReference ref) {
+      incDelivering(ref);
    }
 
    @Override
@@ -1419,7 +1486,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return iterQueue(flushLimit, filter1, new QueueIterateAction() {
          @Override
          public void actMessage(Transaction tx, MessageReference ref) throws Exception {
-            incDelivering();
+            incDelivering(ref);
             acknowledge(tx, ref, ackReason);
             refRemoved(ref);
          }
@@ -1539,7 +1606,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          while (iter.hasNext()) {
             MessageReference ref = iter.next();
             if (ref.getMessage().getMessageID() == messageID) {
-               incDelivering();
+               incDelivering(ref);
                acknowledge(tx, ref);
                iter.remove();
                refRemoved(ref);
@@ -1618,7 +1685,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          while (iter.hasNext()) {
             MessageReference ref = iter.next();
             if (ref.getMessage().getMessageID() == messageID) {
-               incDelivering();
+               incDelivering(ref);
                expire(ref);
                iter.remove();
                refRemoved(ref);
@@ -1644,7 +1711,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          while (iter.hasNext()) {
             MessageReference ref = iter.next();
             if (filter == null || filter.match(ref.getMessage())) {
-               incDelivering();
+               incDelivering(ref);
                expire(tx, ref);
                iter.remove();
                refRemoved(ref);
@@ -1711,7 +1778,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                         if (tx == null) {
                            tx = new TransactionImpl(storageManager);
                         }
-                        incDelivering();
+                        incDelivering(ref);
                         expired = true;
                         expire(tx, ref);
                         iter.remove();
@@ -1763,7 +1830,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          while (iter.hasNext()) {
             MessageReference ref = iter.next();
             if (ref.getMessage().getMessageID() == messageID) {
-               incDelivering();
+               incDelivering(ref);
                sendToDeadLetterAddress(null, ref);
                iter.remove();
                refRemoved(ref);
@@ -1782,7 +1849,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          while (iter.hasNext()) {
             MessageReference ref = iter.next();
             if (filter == null || filter.match(ref.getMessage())) {
-               incDelivering();
+               incDelivering(ref);
                sendToDeadLetterAddress(null, ref);
                iter.remove();
                refRemoved(ref);
@@ -1804,11 +1871,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             if (ref.getMessage().getMessageID() == messageID) {
                iter.remove();
                refRemoved(ref);
-               incDelivering();
+               incDelivering(ref);
                try {
                   move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL);
                } catch (Exception e) {
-                  decDelivering();
+                  decDelivering(ref);
                   throw e;
                }
                return true;
@@ -1836,7 +1903,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          public void actMessage(Transaction tx, MessageReference ref) throws Exception {
             boolean ignored = false;
 
-            incDelivering();
+            incDelivering(ref);
 
             if (rejectDuplicates) {
                byte[] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
@@ -1881,7 +1948,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
             if (originalMessageAddress != null) {
 
-               incDelivering();
+               incDelivering(ref);
 
                Long targetQueue = null;
                if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) {
@@ -2065,6 +2132,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    private synchronized void internalAddTail(final MessageReference ref) {
       refAdded(ref);
       messageReferences.addTail(ref, getPriority(ref));
+      pendingMetrics.incrementMetrics(ref);
    }
 
    /**
@@ -2076,6 +2144,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
     */
    private void internalAddHead(final MessageReference ref) {
       queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
+      pendingMetrics.incrementMetrics(ref);
       refAdded(ref);
 
       int priority = getPriority(ref);
@@ -2330,6 +2399,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    protected void refRemoved(MessageReference ref) {
       queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
+      pendingMetrics.decrementMetrics(ref);
       if (ref.isPaged()) {
          pagedReferences.decrementAndGet();
       }
@@ -2379,6 +2449,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          }
          addTail(reference, false);
          pageIterator.remove();
+
+         //We have to increment this here instead of in the iterator so we have access to the reference from next()
+         pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
       }
 
       if (logger.isDebugEnabled()) {
@@ -2387,7 +2460,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          }
 
          if (logger.isDebugEnabled()) {
-            logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringCount.get());
+            logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringMetrics.getMessageCount());
 
          }
       }
@@ -2466,7 +2539,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             }
          }
 
-         decDelivering();
+         decDelivering(reference);
 
          return true;
       }
@@ -2890,7 +2963,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    public void postAcknowledge(final MessageReference ref) {
       QueueImpl queue = (QueueImpl) ref.getQueue();
 
-      queue.decDelivering();
+      queue.decDelivering(ref);
 
       if (ref.isPaged()) {
          // nothing to be done
@@ -2958,7 +3031,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       try {
          Transaction transaction = new TransactionImpl(storageManager);
          for (MessageReference reference : refs) {
-            incDelivering(); // post ack will decrement this, so need to inc
+            incDelivering(reference); // post ack will decrement this, so need to inc
             acknowledge(transaction, reference, AckReason.KILLED);
          }
          transaction.commit();
@@ -3264,17 +3337,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
-   private int incDelivering() {
-      return deliveringCount.incrementAndGet();
+   private void incDelivering(MessageReference ref) {
+      deliveringMetrics.incrementMetrics(ref);
    }
 
-   public void decDelivering() {
-      deliveringCount.decrementAndGet();
+   public void decDelivering(final MessageReference reference) {
+      deliveringMetrics.decrementMetrics(reference);
    }
 
-   @Override
-   public void decDelivering(int size) {
-      deliveringCount.addAndGet(-size);
+   private long getPersistentSize(final MessageReference reference) {
+      long size = 0;
+
+      try {
+         size = reference.getPersistentSize() > 0 ? reference.getPersistentSize() : 0;
+      } catch (Throwable e) {
+         ActiveMQServerLogger.LOGGER.errorCalculatePersistentSize(e);
+      }
+
+      return size;
    }
 
    private void configureExpiry(final AddressSettings settings) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java
new file mode 100644
index 0000000..f6d65d4
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+
+public class QueuePendingMessageMetrics {
+
+   private static final AtomicIntegerFieldUpdater<QueuePendingMessageMetrics> COUNT_UPDATER =
+         AtomicIntegerFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "messageCount");
+
+   private static final AtomicIntegerFieldUpdater<QueuePendingMessageMetrics> DURABLE_COUNT_UPDATER =
+         AtomicIntegerFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "durableMessageCount");
+
+   private static final AtomicLongFieldUpdater<QueuePendingMessageMetrics> SIZE_UPDATER =
+         AtomicLongFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "persistentSize");
+
+   private static final AtomicLongFieldUpdater<QueuePendingMessageMetrics> DURABLE_SIZE_UPDATER =
+         AtomicLongFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "durablePersistentSize");
+
+   private volatile int messageCount;
+
+   private volatile long persistentSize;
+
+   private volatile int durableMessageCount;
+
+   private volatile long durablePersistentSize;
+
+   private final Queue queue;
+
+   public QueuePendingMessageMetrics(final Queue queue) {
+      Preconditions.checkNotNull(queue);
+      this.queue = queue;
+   }
+
+   public void incrementMetrics(final MessageReference reference) {
+      long size = getPersistentSize(reference);
+      COUNT_UPDATER.incrementAndGet(this);
+      SIZE_UPDATER.addAndGet(this, size);
+      if (queue.isDurable() && reference.getMessage().isDurable()) {
+         DURABLE_COUNT_UPDATER.incrementAndGet(this);
+         DURABLE_SIZE_UPDATER.addAndGet(this, size);
+      }
+   }
+
+   public void decrementMetrics(final MessageReference reference) {
+      long size = -getPersistentSize(reference);
+      COUNT_UPDATER.decrementAndGet(this);
+      SIZE_UPDATER.addAndGet(this, size);
+      if (queue.isDurable() && reference.getMessage().isDurable()) {
+         DURABLE_COUNT_UPDATER.decrementAndGet(this);
+         DURABLE_SIZE_UPDATER.addAndGet(this, size);
+      }
+   }
+
+
+
+   /**
+    * @return the messageCount
+    */
+   public int getMessageCount() {
+      return messageCount;
+   }
+
+   /**
+    * @param messageCount the messageCount to set
+    */
+   public void setMessageCount(int messageCount) {
+      this.messageCount = messageCount;
+   }
+
+   /**
+    * @return the persistentSize
+    */
+   public long getPersistentSize() {
+      return persistentSize;
+   }
+
+   /**
+    * @param persistentSize the persistentSize to set
+    */
+   public void setPersistentSize(long persistentSize) {
+      this.persistentSize = persistentSize;
+   }
+
+   /**
+    * @return the durableMessageCount
+    */
+   public int getDurableMessageCount() {
+      return durableMessageCount;
+   }
+
+   /**
+    * @param durableMessageCount the durableMessageCount to set
+    */
+   public void setDurableMessageCount(int durableMessageCount) {
+      this.durableMessageCount = durableMessageCount;
+   }
+
+   /**
+    * @return the durablePersistentSize
+    */
+   public long getDurablePersistentSize() {
+      return durablePersistentSize;
+   }
+
+   /**
+    * @param durablePersistentSize the durablePersistentSize to set
+    */
+   public void setDurablePersistentSize(long durablePersistentSize) {
+      this.durablePersistentSize = durablePersistentSize;
+   }
+
+   private long getPersistentSize(final MessageReference reference) {
+      long size = 0;
+
+      try {
+         size = reference.getPersistentSize() > 0 ? reference.getPersistentSize() : 0;
+      } catch (Throwable e) {
+         ActiveMQServerLogger.LOGGER.errorCalculatePersistentSize(e);
+      }
+
+      return size;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
index 6eaba4c..78ec785 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
@@ -50,8 +50,12 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
    // just adding some information to keep it in order accordingly to the initial operations
    private final TreeSet<RefScheduled> scheduledReferences = new TreeSet<>(new MessageReferenceComparator());
 
-   public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor) {
+   private final QueuePendingMessageMetrics metrics;
+
+   public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor,
+         final Queue queue) {
       this.scheduledExecutor = scheduledExecutor;
+      this.metrics = new QueuePendingMessageMetrics(queue);
    }
 
    @Override
@@ -76,13 +80,27 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
       synchronized (scheduledReferences) {
          scheduledReferences.add(new RefScheduled(ref, tail));
       }
+      metrics.incrementMetrics(ref);
    }
 
    @Override
    public int getScheduledCount() {
-      synchronized (scheduledReferences) {
-         return scheduledReferences.size();
-      }
+      return metrics.getMessageCount();
+   }
+
+   @Override
+   public int getDurableScheduledCount() {
+      return metrics.getDurableMessageCount();
+   }
+
+   @Override
+   public long getScheduledSize() {
+      return metrics.getPersistentSize();
+   }
+
+   @Override
+   public long getDurableScheduledSize() {
+      return metrics.getDurablePersistentSize();
    }
 
    @Override
@@ -109,6 +127,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
             if (filter == null || filter.match(ref.getMessage())) {
                iter.remove();
                refs.add(ref);
+               metrics.decrementMetrics(ref);
             }
          }
       }
@@ -123,6 +142,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
             MessageReference ref = iter.next().getRef();
             if (ref.getMessage().getMessageID() == id) {
                iter.remove();
+               metrics.decrementMetrics(ref);
                return ref;
             }
          }
@@ -205,6 +225,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
                }
 
                iter.remove();
+               metrics.decrementMetrics(reference);
 
                reference.setScheduledDeliveryTime(0);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 4fda8b3..2cae2c7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
@@ -63,7 +64,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
    @Test
    public void testScheduleRandom() throws Exception {
-      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
+      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
 
       long nextMessage = 0;
       long NUMBER_OF_SEQUENCES = 100000;
@@ -88,7 +89,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
    @Test
    public void testScheduleSameTimeHeadAndTail() throws Exception {
-      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
+      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
 
       long time = System.currentTimeMillis() + 10000;
       for (int i = 10001; i < 20000; i++) {
@@ -110,7 +111,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
    @Test
    public void testScheduleFixedSample() throws Exception {
-      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
+      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
 
       addMessage(handler, 0, 48L, true);
       addMessage(handler, 1, 75L, true);
@@ -124,7 +125,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
    @Test
    public void testScheduleWithAddHeads() throws Exception {
-      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
+      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
 
       addMessage(handler, 0, 1, true);
       addMessage(handler, 1, 2, true);
@@ -145,7 +146,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
    @Test
    public void testScheduleFixedSampleTailAndHead() throws Exception {
-      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
+      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
 
       // mix a sequence of tails / heads, but at the end this was supposed to be all sequential
       addMessage(handler, 1, 48L, true);
@@ -191,8 +192,9 @@ public class ScheduledDeliveryHandlerTest extends Assert {
    private void internalSchedule(ExecutorService executor, ScheduledThreadPoolExecutor scheduler) throws Exception {
       final int NUMBER_OF_MESSAGES = 200;
       int NUMBER_OF_THREADS = 20;
-      final ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(scheduler);
+
       final FakeQueueForScheduleUnitTest fakeQueue = new FakeQueueForScheduleUnitTest(NUMBER_OF_MESSAGES * NUMBER_OF_THREADS);
+      final ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(scheduler, fakeQueue);
 
       final long now = System.currentTimeMillis();
 
@@ -776,6 +778,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       public void sendBuffer(ByteBuf buffer, int count) {
 
       }
+
+      @Override
+      public long getPersistentSize() throws ActiveMQException {
+         return 0;
+      }
    }
 
    public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {
@@ -1017,12 +1024,52 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public long getPersistentSize() {
+         return 0;
+      }
+
+      @Override
+      public long getDurableMessageCount() {
+         return 0;
+      }
+
+      @Override
+      public long getDurablePersistentSize() {
+         return 0;
+      }
+
+      @Override
       public int getDeliveringCount() {
          return 0;
       }
 
       @Override
-      public void referenceHandled() {
+      public long getDeliveringSize() {
+         return 0;
+      }
+
+      @Override
+      public int getDurableDeliveringCount() {
+         return 0;
+      }
+
+      @Override
+      public long getDurableDeliveringSize() {
+         return 0;
+      }
+
+      @Override
+      public int getDurableScheduledCount() {
+         return 0;
+      }
+
+      @Override
+      public long getDurableScheduledSize() {
+         return 0;
+      }
+
+      @Override
+      public void referenceHandled(MessageReference ref) {
 
       }
 
@@ -1032,6 +1079,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public long getScheduledSize() {
+         return 0;
+      }
+
+      @Override
       public List<MessageReference> getScheduledMessages() {
          return null;
       }
@@ -1310,7 +1362,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       public SimpleString getUser() {
          return null;
       }
-
       @Override
       public boolean isLastValue() {
          return false;
@@ -1326,13 +1377,5 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
       }
 
-      @Override
-      public void decDelivering(int size) {
-
-      }
-
-
-
-
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index b256eb9..3a9a785 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -587,7 +587,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
-      public long storePageCounter(long txID, long queueID, long value) throws Exception {
+      public long storePageCounter(long txID, long queueID, long value, long size) throws Exception {
          return 0;
       }
 
@@ -612,12 +612,12 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
-      public long storePageCounterInc(long txID, long queueID, int add) throws Exception {
+      public long storePageCounterInc(long txID, long queueID, int add, long size) throws Exception {
          return 0;
       }
 
       @Override
-      public long storePageCounterInc(long queueID, int add) throws Exception {
+      public long storePageCounterInc(long queueID, int add, long size) throws Exception {
          return 0;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy b/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy
new file mode 100644
index 0000000..4ef4425
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy
@@ -0,0 +1,37 @@
+package metrics
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+
+//validate metrics are recovered
+Object[] queueControls = server.getJMSServerManager().getActiveMQServer().getManagementService().getResources(QueueControl.class);
+for (Object o : queueControls) {
+    QueueControl c = (QueueControl) o;
+    GroovyRun.assertTrue(c.getPersistentSize() > 0);
+    GroovyRun.assertTrue(c.getDurablePersistentSize() > 0);
+    GroovyRun.assertEquals(16l, c.getMessageCount());
+    GroovyRun.assertEquals(16l, c.getDurableMessageCount());
+ }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
index fe58505..78d1241 100644
--- a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
+++ b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
@@ -31,8 +31,10 @@ String id = arg[1];
 String type = arg[2];
 String producer = arg[3];
 String consumer = arg[4];
+String globalMaxSize = arg[5];
 
 println("type = " + type);
+println("globalMaxSize = " + globalMaxSize);
 
 configuration = new ConfigurationImpl();
 configuration.setJournalType(JournalType.NIO);
@@ -44,6 +46,10 @@ configuration.setPersistenceEnabled(persistent);
 try {
     if (!type.startsWith("ARTEMIS-1")) {
         configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateAddresses(true));
+        if (globalMaxSize != null) {
+            configuration.getAddressesSettings().get("#").setPageSizeBytes(globalMaxSize);
+            configuration.setGlobalMaxSize(Long.parseLong(globalMaxSize));
+        }
     }
 } catch (Throwable e) {
     // need to ignore this for 1.4

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java
index 40da24c..958db27 100644
--- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java
+++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.activemq.artemis.tests.compatibility;
 
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -28,9 +31,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
-import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
-
 /**
  * To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
  *
@@ -105,5 +105,42 @@ public class JournalCompatibilityTest extends VersionedBaseTest {
       evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
    }
 
+   /**
+    * Test that the server starts properly using an old journal even though persistent size
+    * metrics were not originaly stored
+    */
+   @Test
+   public void testSendReceiveQueueMetrics() throws Throwable {
+      setVariable(senderClassloader, "persistent", true);
+      startServer(serverFolder.getRoot(), senderClassloader, "journalTest");
+      evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
+      stopServer(senderClassloader);
+
+      setVariable(receiverClassloader, "persistent", true);
+      startServer(serverFolder.getRoot(), receiverClassloader, "journalTest");
+
+      setVariable(receiverClassloader, "latch", null);
+      evaluate(receiverClassloader, "metrics/queueMetrics.groovy", server, receiver, "receiveMessages");
+   }
+
+   /**
+    * Test that the metrics are recovered when paging.  Even though the paging counts won't
+    * be persisted the journal the server should still start properly.  The persistent sizes
+    * will be recovered when the messages are depaged
+    */
+   @Test
+   public void testSendReceiveSizeQueueMetricsPaging() throws Throwable {
+      setVariable(senderClassloader, "persistent", true);
+      //Set max size to 1 to cause messages to immediately go to the paging store
+      startServer(serverFolder.getRoot(), senderClassloader, "journalTest", Long.toString(1));
+      evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
+      stopServer(senderClassloader);
+
+      setVariable(receiverClassloader, "persistent", true);
+      startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", Long.toString(1));
+
+
+      evaluate(receiverClassloader, "metrics/queueMetrics.groovy", server, receiver, "receiveMessages");
+   }
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java
index e2b9648..9001180 100644
--- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java
+++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java
@@ -189,6 +189,10 @@ public abstract class VersionedBaseTest {
    }
 
    public void startServer(File folder, ClassLoader loader, String serverName) throws Throwable {
+      startServer(folder, loader, serverName, null);
+   }
+
+   public void startServer(File folder, ClassLoader loader, String serverName, String globalMaxSize) throws Throwable {
       folder.mkdirs();
 
       System.out.println("Folder::" + folder);
@@ -202,9 +206,8 @@ public abstract class VersionedBaseTest {
          scriptToUse = "servers/hornetqServer.groovy";
       }
 
-      evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver);
+      evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver, globalMaxSize);
    }
-
    public void stopServer(ClassLoader loader) throws Throwable {
       execute(loader, "server.stop()");
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 078c397..511d476 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -817,5 +817,10 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       public Map<String, Object> toPropertyMap() {
          return null;
       }
+
+      @Override
+      public long getPersistentSize() throws ActiveMQException {
+         return 0;
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index d37d134..fe9ba17 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -668,8 +668,8 @@ public class SendAckFailTest extends ActiveMQTestBase {
       }
 
       @Override
-      public long storePageCounter(long txID, long queueID, long value) throws Exception {
-         return manager.storePageCounter(txID, queueID, value);
+      public long storePageCounter(long txID, long queueID, long value, long size) throws Exception {
+         return manager.storePageCounter(txID, queueID, value, size);
       }
 
       @Override
@@ -693,13 +693,13 @@ public class SendAckFailTest extends ActiveMQTestBase {
       }
 
       @Override
-      public long storePageCounterInc(long txID, long queueID, int add) throws Exception {
-         return manager.storePageCounterInc(txID, queueID, add);
+      public long storePageCounterInc(long txID, long queueID, int add, long size) throws Exception {
+         return manager.storePageCounterInc(txID, queueID, add, size);
       }
 
       @Override
-      public long storePageCounterInc(long queueID, int add) throws Exception {
-         return manager.storePageCounterInc(queueID, add);
+      public long storePageCounterInc(long queueID, int add, long size) throws Exception {
+         return manager.storePageCounterInc(queueID, add, size);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
index 8836ba8..9967e76 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
@@ -122,6 +122,21 @@ public abstract class ManagementTestBase extends ActiveMQTestBase {
       return control.getMessageCount();
    }
 
+   protected long getDurableMessageCount(QueueControl control) throws Exception {
+      control.flushExecutor();
+      return control.getDurableMessageCount();
+   }
+
+   protected long getMessageSize(QueueControl control) throws Exception {
+      control.flushExecutor();
+      return control.getPersistentSize();
+   }
+
+   protected long getDurableMessageSize(QueueControl control) throws Exception {
+      control.flushExecutor();
+      return control.getDurablePersistentSize();
+   }
+
    protected long getMessagesAdded(QueueControl control) throws Exception {
       control.flushExecutor();
       return control.getMessagesAdded();


Mime
View raw message