activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: ARTEMIS-1915 Expiry Scanner is holding a lock on the queue
Date Wed, 06 Jun 2018 20:13:03 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 0db8353a3 -> 7580650b5


ARTEMIS-1915 Expiry Scanner is holding a lock on the queue

No tests required as there is no semantic changes here.
Current tests should validate this change.

(cherry picked from commit dfb41aed4ccdfb211a6c4529e05bef41d90046b6)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7580650b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7580650b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7580650b

Branch: refs/heads/2.6.x
Commit: 7580650b5069d69d5fd160b71f6ed51fa3b4247c
Parents: 0db8353
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Mon Jun 4 22:28:29 2018 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Jun 6 15:49:26 2018 -0400

----------------------------------------------------------------------
 .../artemis/core/server/impl/QueueImpl.java     | 96 ++++++++++----------
 1 file changed, 50 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7580650b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 94b7640..5da8dc1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -906,7 +906,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
          leaveCritical(CRITICAL_CONSUMER);
       }
 
-
    }
 
    @Override
@@ -1178,22 +1177,22 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
    }
 
    @Override
-   public synchronized int getScheduledCount() {
+   public int getScheduledCount() {
       return scheduledDeliveryHandler.getScheduledCount();
    }
 
    @Override
-   public synchronized long getScheduledSize() {
+   public long getScheduledSize() {
       return scheduledDeliveryHandler.getScheduledSize();
    }
 
    @Override
-   public synchronized int getDurableScheduledCount() {
+   public int getDurableScheduledCount() {
       return scheduledDeliveryHandler.getDurableScheduledCount();
    }
 
    @Override
-   public synchronized long getDurableScheduledSize() {
+   public long getDurableScheduledSize() {
       return scheduledDeliveryHandler.getDurableScheduledSize();
    }
 
@@ -1788,6 +1787,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
       @Override
       public void run() {
+
+         boolean expired = false;
+         boolean hasElements = false;
+         int elementsExpired = 0;
+
+         LinkedList<MessageReference> expiredMessages = new LinkedList<>();
          synchronized (QueueImpl.this) {
             if (queueDestroyed) {
                return;
@@ -1796,54 +1801,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
             LinkedListIterator<MessageReference> iter = iterator();
 
-            boolean expired = false;
-            boolean hasElements = false;
-
-            int elementsExpired = 0;
             try {
-               Transaction tx = null;
-
                while (postOffice.isStarted() && iter.hasNext()) {
                   hasElements = true;
                   MessageReference ref = iter.next();
-                  try {
-                     if (ref.getMessage().isExpired()) {
-                        if (tx == null) {
-                           tx = new TransactionImpl(storageManager);
-                        }
-                        incDelivering(ref);
-                        expired = true;
-                        expire(tx, ref);
-                        iter.remove();
-                        refRemoved(ref);
-
-                        if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
-                           logger.debug("Breaking loop of expiring");
-                           scannerRunning.incrementAndGet();
-                           getExecutor().execute(this);
-                           break;
-                        }
+                  if (ref.getMessage().isExpired()) {
+                     incDelivering(ref);
+                     expired = true;
+                     expiredMessages.add(ref);
+                     iter.remove();
+
+                     if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
+                        logger.debug("Breaking loop of expiring");
+                        scannerRunning.incrementAndGet();
+                        getExecutor().execute(this);
+                        break;
                      }
-
-                  } catch (Exception e) {
-                     ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
                   }
                }
-
-               logger.debug("Expired " + elementsExpired + " references");
-
-               try {
-                  if (tx != null) {
-                     tx.commit();
-                  }
-               } catch (Exception e) {
-                  ActiveMQServerLogger.LOGGER.unableToCommitTransaction(e);
-               }
-
-               // If empty we need to schedule depaging to make sure we would depage expired
messages as well
-               if ((!hasElements || expired) && pageIterator != null && pageIterator.hasNext())
{
-                  scheduleDepage(true);
-               }
             } finally {
                try {
                   iter.close();
@@ -1854,6 +1829,35 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
             }
          }
+
+         if (!expiredMessages.isEmpty()) {
+            Transaction tx = new TransactionImpl(storageManager);
+            for (MessageReference ref : expiredMessages) {
+               if (tx == null) {
+                  tx = new TransactionImpl(storageManager);
+               }
+               try {
+                  expire(tx, ref);
+                  refRemoved(ref);
+               } catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
+               }
+            }
+
+            try {
+               tx.commit();
+            } catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.unableToCommitTransaction(e);
+            }
+            logger.debug("Expired " + elementsExpired + " references");
+
+
+         }
+
+         // If empty we need to schedule depaging to make sure we would depage expired messages
as well
+         if ((!hasElements || expired) && pageIterator != null && pageIterator.hasNext())
{
+            scheduleDepage(true);
+         }
       }
    }
 


Mime
View raw message