activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/5] activemq-artemis git commit: ARTEMIS-856 Fixing MessageRedistributionTest
Date Thu, 02 Aug 2018 14:31:54 GMT
ARTEMIS-856 Fixing MessageRedistributionTest



Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ec742cb8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ec742cb8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ec742cb8

Branch: refs/heads/master
Commit: ec742cb8899bd1b7d7cb28b7e64c87f75681f6e7
Parents: 59520b9
Author: Michael André Pearce <michael.andre.pearce@me.com>
Authored: Thu Aug 2 10:51:27 2018 +0100
Committer: Michael André Pearce <michael.andre.pearce@me.com>
Committed: Thu Aug 2 13:52:32 2018 +0100

----------------------------------------------------------------------
 .../artemis/core/server/impl/QueueImpl.java     | 37 ++++++++++++++------
 1 file changed, 27 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec742cb8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 2656217..09ff210 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -21,6 +21,7 @@ import java.io.StringWriter;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -1082,6 +1083,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
             supports = false;
          }
       }
+      if (redistributor != null) {
+         if (!redistributor.consumer.supportsDirectDelivery()) {
+            supports = false;
+         }
+      }
       return supports;
    }
 
@@ -2158,10 +2164,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
    @Override
    public synchronized void resetAllIterators() {
       for (ConsumerHolder holder : this.consumerList) {
-         if (holder.iter != null) {
-            holder.iter.close();
-         }
-         holder.iter = null;
+         holder.resetIterator();
+      }
+      if (redistributor != null) {
+         redistributor.resetIterator();
       }
    }
 
@@ -2369,7 +2375,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
          MessageReference ref;
 
          Consumer handledconsumer = null;
-
+         SimpleString groupID;
          synchronized (this) {
 
             // Need to do these checks inside the synchronized
@@ -2436,7 +2442,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
                // If a group id is set, then this overrides the consumer chosen round-robin
 
-               SimpleString groupID = extractGroupID(ref);
+               groupID = extractGroupID(ref);
 
                if (groupID != null) {
                   groupConsumer = groups.get(groupID);
@@ -2484,10 +2490,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
                }
             }
 
-            if (pos == endPos) {
+            if (pos == endPos || (redistributor != null || groupConsumer != null || exclusive))
{
                // Round robin'd all
 
-               if (noDelivery == size) {
+               if (noDelivery == size && redistributor == null || ((redistributor
!= null || groupConsumer != null || exclusive) && noDelivery > 0)) {
                   if (handledconsumer != null) {
                      // this shouldn't really happen,
                      // however I'm keeping this as an assertion case future developers ever
change the logic here on this class
@@ -3040,7 +3046,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
                return true;
             }
 
-            if (pos == startPos) {
+            if (pos == startPos || redistributor != null || groupConsumer != null || exclusive)
{
                // Tried them all
                break;
             }
@@ -3122,7 +3128,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
       List<ConsumerHolder> consumerListClone;
 
       synchronized (this) {
-         consumerListClone = new ArrayList<>(consumerList);
+         if (redistributor == null) {
+            consumerListClone = new ArrayList<>(consumerList);
+         } else {
+            consumerListClone = Collections.singletonList(redistributor);
+         }
       }
       return consumerListClone;
    }
@@ -3286,6 +3296,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
       LinkedListIterator<MessageReference> iter;
 
+      private void resetIterator() {
+         if (iter != null) {
+            iter.close();
+         }
+         iter = null;
+      }
+
    }
 
    private class DelayedAddRedistributor implements Runnable {


Mime
View raw message