activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [4/5] activemq-artemis git commit: ARTEMIS-1638 & ARTEMIS-1641 Making sure Paging survives Purge on a test & cleanup PgTX (fix)
Date Mon, 29 Jan 2018 07:47:41 GMT
ARTEMIS-1638 & ARTEMIS-1641 Making sure Paging survives Purge on a test & cleanup PgTX
(fix)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c10b7441
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c10b7441
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c10b7441

Branch: refs/heads/master
Commit: c10b74412a335f6c930c3dddace11ef300215d4f
Parents: 59d2ac5
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Fri Jan 26 23:05:52 2018 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Jan 26 23:24:56 2018 -0500

----------------------------------------------------------------------
 .../core/paging/PageTransactionInfo.java        |  4 ++-
 .../paging/impl/PageTransactionInfoImpl.java    | 32 ++++++++++++++------
 .../journal/AbstractJournalStorageManager.java  | 30 +++++++++++++++++-
 .../core/postoffice/impl/PostOfficeImpl.java    |  2 +-
 .../artemis/core/server/ActiveMQServer.java     | 28 +++++++++++++++++
 .../activemq/artemis/core/server/Queue.java     | 13 +++++++-
 .../core/server/impl/ActiveMQServerImpl.java    | 24 ++-------------
 .../artemis/core/server/impl/QueueImpl.java     | 27 ++++++++++-------
 .../core/server/impl/QueueManagerImpl.java      |  2 +-
 .../core/server/impl/RoutingContextImpl.java    |  2 +-
 .../core/server/impl/ServerConsumerImpl.java    |  2 +-
 .../management/impl/ManagementServiceImpl.java  |  2 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  7 ++++-
 .../unit/core/postoffice/impl/FakeQueue.java    |  8 ++++-
 14 files changed, 131 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
index 4fb49ea..90aaade 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
@@ -51,7 +51,9 @@ public interface PageTransactionInfo extends EncodingSupport {
                      int increment) throws Exception;
 
    // To be used after the update was stored or reload
-   void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
+   boolean onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
+
+   boolean checkSize(StorageManager storageManager, PagingManager pagingManager);
 
    void increment(int durableSize, int nonDurableSize);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
index b793aec..70594ca 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
@@ -89,16 +89,30 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
{
    }
 
    @Override
-   public void onUpdate(final int update, final StorageManager storageManager, PagingManager
pagingManager) {
-      int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
-      if (sizeAfterUpdate == 0 && storageManager != null) {
-         try {
-            storageManager.deletePageTransactional(this.recordID);
-         } catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID);
-         }
+   public boolean onUpdate(final int update, final StorageManager storageManager, PagingManager
pagingManager) {
+      int afterUpdate = numberOfMessages.addAndGet(-update);
+      return internalCheckSize(storageManager, pagingManager, afterUpdate);
+   }
+
+   @Override
+   public boolean checkSize(StorageManager storageManager, PagingManager pagingManager) {
+      return internalCheckSize(storageManager, pagingManager, numberOfMessages.get());
+   }
 
-         pagingManager.removeTransaction(this.transactionID);
+   public boolean internalCheckSize(StorageManager storageManager, PagingManager pagingManager,
int size) {
+      if (size <= 0) {
+         if (storageManager != null) {
+            try {
+               storageManager.deletePageTransactional(this.recordID);
+            } catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID);
+            }
+
+            pagingManager.removeTransaction(this.transactionID);
+         }
+         return false;
+      } else {
+         return true;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 7aa4096..5441368 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -839,6 +839,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
 
+      Set<PageTransactionInfo> invalidPageTransactions = null;
+
       Map<Long, Message> messages = new HashMap<>();
       readLock();
       try {
@@ -971,6 +973,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                   break;
                }
                case JournalRecordIds.PAGE_TRANSACTION: {
+                  PageTransactionInfo invalidPGTx = null;
                   if (record.isUpdate) {
                      PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
 
@@ -981,7 +984,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                      if (pageTX == null) {
                         ActiveMQServerLogger.LOGGER.journalCannotFindPageTX(pageUpdate.pageTX);
                      } else {
-                        pageTX.onUpdate(pageUpdate.recods, null, null);
+                        if (!pageTX.onUpdate(pageUpdate.recods, null, null)) {
+                           invalidPGTx = pageTX;
+                        }
                      }
                   } else {
                      PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
@@ -991,6 +996,17 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                      pageTransactionInfo.setRecordID(record.id);
 
                      pagingManager.addTransaction(pageTransactionInfo);
+
+                     if (!pageTransactionInfo.checkSize(null, null)) {
+                        invalidPGTx = pageTransactionInfo;
+                     }
+                  }
+
+                  if (invalidPGTx != null) {
+                     if (invalidPageTransactions == null) {
+                        invalidPageTransactions = new HashSet<>();
+                     }
+                     invalidPageTransactions.add(invalidPGTx);
                   }
 
                   break;
@@ -1170,6 +1186,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
          }
 
          journalLoader.postLoad(messageJournal, resourceManager, duplicateIDMap);
+
+         checkInvalidPageTransactions(pagingManager, invalidPageTransactions);
+
          journalLoaded = true;
          return info;
       } finally {
@@ -1177,6 +1196,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
       }
    }
 
+   public void checkInvalidPageTransactions(PagingManager pagingManager,
+                                            Set<PageTransactionInfo> invalidPageTransactions)
{
+      if (invalidPageTransactions != null) {
+         for (PageTransactionInfo pginfo : invalidPageTransactions) {
+            pginfo.checkSize(this, pagingManager);
+         }
+      }
+   }
+
    /**
     * @param queueID
     * @param pageSubscriptions

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index bc14f79..356674b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1464,7 +1464,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener,
Binding
          for (MessageReference ref : refs) {
             Message message = ref.getMessage();
 
-            if (message.isDurable() && ref.getQueue().isDurable()) {
+            if (message.isDurable() && ref.getQueue().isDurableMessage()) {
                message.decrementDurableRefCount();
             }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index d1d3029..6af94ff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -73,6 +73,34 @@ import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
  */
 public interface ActiveMQServer extends ServiceComponent {
 
+
+   enum SERVER_STATE {
+      /**
+       * start() has been called but components are not initialized. The whole point of this
state,
+       * is to be in a state which is different from {@link SERVER_STATE#STARTED} and
+       * {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such
as
+       * {@link #stop(boolean)} worked as intended.
+       */
+      STARTING, /**
+       * server is started. {@code server.isStarted()} returns {@code true}, and all assumptions
+       * about it hold.
+       */
+      STARTED, /**
+       * stop() was called but has not finished yet. Meant to avoids starting components
while
+       * stop() is executing.
+       */
+      STOPPING, /**
+       * Stopped: either stop() has been called and has finished running, or start() has
never been
+       * called.
+       */
+      STOPPED
+   }
+
+
+   void setState(SERVER_STATE state);
+
+   SERVER_STATE getState();
+
    /**
     * Sets the server identity.
     * <p>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index d4ec406..b39f4da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -53,6 +53,13 @@ public interface Queue extends Bindable,CriticalComponent {
 
    boolean isDurable();
 
+   /**
+    * The queue definition could be durable, but the messages could eventually be considered
non durable.
+    * (e.g. purgeOnNoConsumers)
+    * @return
+    */
+   boolean isDurableMessage();
+
    boolean isTemporary();
 
    boolean isAutoCreated();
@@ -161,7 +168,11 @@ public interface Queue extends Bindable,CriticalComponent {
 
    int deleteMatchingReferences(Filter filter) throws Exception;
 
-   int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception;
+   default int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
+      return deleteMatchingReferences(flushLImit, filter, AckReason.NORMAL);
+   }
+
+   int deleteMatchingReferences(int flushLImit, Filter filter, AckReason ackReason) throws
Exception;
 
    boolean expireReference(long messageID) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 047e839..6ce681e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -207,28 +207,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private HAPolicy haPolicy;
 
-   enum SERVER_STATE {
-      /**
-       * start() has been called but components are not initialized. The whole point of this
state,
-       * is to be in a state which is different from {@link SERVER_STATE#STARTED} and
-       * {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such
as
-       * {@link #stop(boolean)} worked as intended.
-       */
-      STARTING, /**
-       * server is started. {@code server.isStarted()} returns {@code true}, and all assumptions
-       * about it hold.
-       */
-      STARTED, /**
-       * stop() was called but has not finished yet. Meant to avoids starting components
while
-       * stop() is executing.
-       */
-      STOPPING, /**
-       * Stopped: either stop() has been called and has finished running, or start() has
never been
-       * called.
-       */
-      STOPPED
-   }
-
    private volatile SERVER_STATE state = SERVER_STATE.STOPPED;
 
    private final Version version;
@@ -712,10 +690,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       super.finalize();
    }
 
+   @Override
    public void setState(SERVER_STATE state) {
       this.state = state;
    }
 
+   @Override
    public SERVER_STATE getState() {
       return state;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 77adc85..fd13652 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -139,7 +139,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
    private volatile Filter filter;
 
-   private final boolean durable;
+   private final boolean propertyDurable;
 
    private final boolean temporary;
 
@@ -405,7 +405,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
       this.pageSubscription = pageSubscription;
 
-      this.durable = durable;
+      this.propertyDurable = durable;
 
       this.temporary = temporary;
 
@@ -495,7 +495,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
    @Override
    public boolean isDurable() {
-      return durable;
+      return propertyDurable;
+   }
+
+   @Override
+   public boolean isDurableMessage() {
+      return propertyDurable && !purgeOnNoConsumers;
    }
 
    @Override
@@ -1126,7 +1131,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
       } else {
          Message message = ref.getMessage();
 
-         boolean durableRef = message.isDurable() && durable;
+         boolean durableRef = message.isDurable() && isDurableMessage();
 
          if (durableRef) {
             storageManager.storeAcknowledge(id, message.getMessageID());
@@ -1161,7 +1166,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
       } else {
          Message message = ref.getMessage();
 
-         boolean durableRef = message.isDurable() && durable;
+         boolean durableRef = message.isDurable() && isDurableMessage();
 
          if (durableRef) {
             storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
@@ -1189,7 +1194,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
    public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception
{
       Message message = ref.getMessage();
 
-      if (message.isDurable() && durable) {
+      if (message.isDurable() && isDurableMessage()) {
          tx.setContainsPersistent();
       }
 
@@ -1372,12 +1377,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
    }
 
    @Override
-   public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter1)
throws Exception {
+   public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter1,
AckReason ackReason) throws Exception {
       return iterQueue(flushLimit, filter1, new QueueIterateAction() {
          @Override
          public void actMessage(Transaction tx, MessageReference ref) throws Exception {
             incDelivering();
-            acknowledge(tx, ref);
+            acknowledge(tx, ref, ackReason);
             refRemoved(ref);
          }
       });
@@ -2385,7 +2390,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
          return true;
       }
 
-      if (!internalQueue && message.isDurable() && durable && !reference.isPaged())
{
+      if (!internalQueue && message.isDurable() && isDurableMessage() &&
!reference.isPaged()) {
          storageManager.updateDeliveryCount(reference);
       }
 
@@ -2414,7 +2419,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
             reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
 
-            if (!reference.isPaged() && message.isDurable() && durable) {
+            if (!reference.isPaged() && message.isDurable() && isDurableMessage())
{
                storageManager.updateScheduledDeliveryTime(reference);
             }
          }
@@ -2858,7 +2863,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
       if (message == null)
          return;
 
-      boolean durableRef = message.isDurable() && queue.durable;
+      boolean durableRef = message.isDurable() && queue.isDurableMessage();
 
       try {
          message.decrementRefCount();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
index be83aca..23b0e5d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
@@ -59,7 +59,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
             ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount
= " + consumerCount + "; messageCount = " + messageCount);
          }
          try {
-            queue.deleteAllReferences();
+            queue.deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index feb12f9..29a70e4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -60,7 +60,7 @@ public final class RoutingContextImpl implements RoutingContext {
 
       RouteContextList listing = getContextListing(address);
 
-      if (queue.isDurable()) {
+      if (queue.isDurableMessage()) {
          listing.getDurableQueues().add(queue);
       } else {
          listing.getNonDurableQueues().add(queue);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 15b1465..e1c6c8d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -409,7 +409,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
             // If updateDeliveries = false (set by strict-update),
             // the updateDeliveryCountAfterCancel would still be updated after c
             if (strictUpdateDeliveryCount && !ref.isPaged()) {
-               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
+               if (ref.getMessage().isDurable() && ref.getQueue().isDurableMessage()
&&
                   !ref.getQueue().isInternalQueue() &&
                   !ref.isPaged()) {
                   storageManager.updateDeliveryCount(ref);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 8a7e009..9b9830a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -247,7 +247,7 @@ public class ManagementServiceImpl implements ManagementService {
 
       QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(),
postOffice, storageManager, securityStore, addressSettingsRepository);
       if (messageCounterManager != null) {
-         MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue,
false, queue.isDurable(), messageCounterManager.getMaxDayCount());
+         MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue,
false, queue.isDurableMessage(), messageCounterManager.getMaxDayCount());
          queueControl.setMessageCounter(counter);
          messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index a492efd..256a670 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -860,6 +860,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public boolean isDurableMessage() {
+         return false;
+      }
+
+      @Override
       public boolean isTemporary() {
          return false;
       }
@@ -1087,7 +1092,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception
{
+      public int deleteMatchingReferences(int flushLImit, Filter filter, AckReason reason)
throws Exception {
          return 0;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 7b37879..f0afd9e 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -442,6 +442,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue
{
    }
 
    @Override
+   public boolean isDurableMessage() {
+      // no-op
+      return false;
+   }
+
+   @Override
    public boolean isDurable() {
       // no-op
       return false;
@@ -601,7 +607,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue
{
    }
 
    @Override
-   public int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
+   public int deleteMatchingReferences(int flushLImit, Filter filter, AckReason reason) throws
Exception {
       return 0;
    }
 


Mime
View raw message