activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1222275 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/bugs/
Date Thu, 22 Dec 2011 15:15:49 GMT
Author: tabish
Date: Thu Dec 22 15:15:48 2011
New Revision: 1222275

URL: http://svn.apache.org/viewvc?rev=1222275&view=rev
Log:
apply patch and add test for: https://issues.apache.org/jira/browse/AMQ-3436

Some modifications needed for the patch to work correctly.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java 
 (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1222275&r1=1222274&r2=1222275&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Dec 22 15:15:48 2011
@@ -40,13 +40,18 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.ResourceAllocationException;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.cursors.OrderedPendingList;
+import org.apache.activemq.broker.region.cursors.PendingList;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PrioritizedPendingList;
 import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
@@ -92,8 +97,8 @@ public class Queue extends BaseDestinati
     // Messages that are paged in but have not yet been targeted at a
     // subscription
     private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
-    private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
-    private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>();
+    protected PendingList pagedInPendingDispatch = new OrderedPendingList();
+    protected PendingList redeliveredWaitingDispatch = new OrderedPendingList();
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
@@ -123,9 +128,7 @@ public class Queue extends BaseDestinati
         }
     };
 
-    private final Object iteratingMutex = new Object() {
-    };
-
+    private final Object iteratingMutex = new Object();
 
     class TimeoutMessage implements Delayed {
 
@@ -305,7 +308,21 @@ public class Queue extends BaseDestinati
     }
 
     @Override
+    public void setPrioritizedMessages(boolean prioritizedMessages) {
+        super.setPrioritizedMessages(prioritizedMessages);
+
+        if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList)
{
+            pagedInPendingDispatch = new PrioritizedPendingList();
+            redeliveredWaitingDispatch = new PrioritizedPendingList();
+        } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
+            pagedInPendingDispatch = new OrderedPendingList();
+            redeliveredWaitingDispatch = new OrderedPendingList();
+        }
+    }
+
+    @Override
     public void initialize() throws Exception {
+
         if (this.messages == null) {
             if (destination.isTemporary() || broker == null || store == null) {
                 this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
@@ -313,6 +330,7 @@ public class Queue extends BaseDestinati
                 this.messages = new StoreQueueCursor(broker, this);
             }
         }
+
         // If a VMPendingMessageCursor don't use the default Producer System
         // Usage
         // since it turns into a shared blocking queue which can lead to a
@@ -529,10 +547,10 @@ public class Queue extends BaseDestinati
                             }
                         }
                     }
-                    redeliveredWaitingDispatch.add(qmr);
+                    redeliveredWaitingDispatch.addMessageLast(qmr);
                 }
                 if (!redeliveredWaitingDispatch.isEmpty()) {
-                    doDispatch(new ArrayList<QueueMessageReference>());
+                    doDispatch(new OrderedPendingList());
                 }
             }finally {
                 consumersLock.writeLock().unlock();
@@ -994,7 +1012,7 @@ public class Queue extends BaseDestinati
 
             pagedInPendingDispatchLock.writeLock().lock();
             try {
-                addAll(pagedInPendingDispatch, browseList, max, toExpire);
+                addAll(pagedInPendingDispatch.values(), browseList, max, toExpire);
                 for (MessageReference ref : toExpire) {
                     pagedInPendingDispatch.remove(ref);
                     if (broker.isExpired(ref)) {
@@ -1066,10 +1084,10 @@ public class Queue extends BaseDestinati
         }
     }
 
-    private void addAll(Collection<QueueMessageReference> refs, List<Message>
l, int maxBrowsePageSize,
+    private void addAll(Collection<? extends MessageReference> refs, List<Message>
l, int maxBrowsePageSize,
             List<MessageReference> toExpire) throws Exception {
-        for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext() &&
l.size() < getMaxBrowsePageSize();) {
-            QueueMessageReference ref = i.next();
+        for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext()
&& l.size() < getMaxBrowsePageSize();) {
+            QueueMessageReference ref = (QueueMessageReference) i.next();
             if (ref.isExpired()) {
                 toExpire.add(ref);
             } else if (l.contains(ref.getMessage()) == false) {
@@ -1675,15 +1693,16 @@ public class Queue extends BaseDestinati
     }
 
     private void doPageIn(boolean force) throws Exception {
-        List<QueueMessageReference> newlyPaged = doPageInForDispatch(force);
+        PendingList newlyPaged = doPageInForDispatch(force);
         pagedInPendingDispatchLock.writeLock().lock();
         try {
             if (pagedInPendingDispatch.isEmpty()) {
                 pagedInPendingDispatch.addAll(newlyPaged);
+
             } else {
-                for (QueueMessageReference qmr : newlyPaged) {
+                for (MessageReference qmr : newlyPaged) {
                     if (!pagedInPendingDispatch.contains(qmr)) {
-                        pagedInPendingDispatch.add(qmr);
+                        pagedInPendingDispatch.addMessageLast(qmr);
                     }
                 }
             }
@@ -1692,9 +1711,9 @@ public class Queue extends BaseDestinati
         }
     }
 
-    private List<QueueMessageReference> doPageInForDispatch(boolean force) throws Exception
{
+    private PendingList doPageInForDispatch(boolean force) throws Exception {
         List<QueueMessageReference> result = null;
-        List<QueueMessageReference> resultList = null;
+        PendingList resultList = null;
 
         int toPageIn = Math.min(getMaxPageSize(), messages.size());
         if (LOG.isDebugEnabled()) {
@@ -1750,11 +1769,15 @@ public class Queue extends BaseDestinati
             // dispatch attempts
             pagedInMessagesLock.writeLock().lock();
             try {
-                resultList = new ArrayList<QueueMessageReference>(result.size());
+                if(isPrioritizedMessages()) {
+                    resultList = new PrioritizedPendingList();
+                } else {
+                    resultList = new OrderedPendingList();
+                }
                 for (QueueMessageReference ref : result) {
                     if (!pagedInMessages.containsKey(ref.getMessageId())) {
                         pagedInMessages.put(ref.getMessageId(), ref);
-                        resultList.add(ref);
+                        resultList.addMessageLast(ref);
                     } else {
                         ref.decrementReferenceCount();
                     }
@@ -1764,13 +1787,13 @@ public class Queue extends BaseDestinati
             }
         } else {
             // Avoid return null list, if condition is not validated
-            resultList = new ArrayList<QueueMessageReference>();
+            resultList = new OrderedPendingList();
         }
 
         return resultList;
     }
 
-    private void doDispatch(List<QueueMessageReference> list) throws Exception {
+    private void doDispatch(PendingList list) throws Exception {
         boolean doWakeUp = false;
 
         pagedInPendingDispatchLock.writeLock().lock();
@@ -1792,9 +1815,9 @@ public class Queue extends BaseDestinati
                 if (pagedInPendingDispatch.isEmpty()) {
                     pagedInPendingDispatch.addAll(doActualDispatch(list));
                 } else {
-                    for (QueueMessageReference qmr : list) {
+                    for (MessageReference qmr : list) {
                         if (!pagedInPendingDispatch.contains(qmr)) {
-                            pagedInPendingDispatch.add(qmr);
+                            pagedInPendingDispatch.addMessageLast(qmr);
                         }
                     }
                     doWakeUp = true;
@@ -1814,9 +1837,10 @@ public class Queue extends BaseDestinati
      * @return list of messages that could get dispatched to consumers if they
      *         were not full.
      */
-    private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference>
list) throws Exception {
+    private PendingList doActualDispatch(PendingList list) throws Exception {
         List<Subscription> consumers;
         consumersLock.writeLock().lock();
+
         try {
             if (this.consumers.isEmpty() || isSlave()) {
                 // slave dispatch happens in processDispatchNotification
@@ -1827,10 +1851,18 @@ public class Queue extends BaseDestinati
             consumersLock.writeLock().unlock();
         }
 
-        List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
+        PendingList rc;
+        if(isPrioritizedMessages()) {
+            rc = new PrioritizedPendingList();
+        } else {
+            rc = new OrderedPendingList();
+        }
+
         Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
 
-        for (MessageReference node : list) {
+        for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();)
{
+
+            MessageReference node = (MessageReference) iterator.next();
             Subscription target = null;
             int interestCount = 0;
             for (Subscription s : consumers) {
@@ -1863,7 +1895,7 @@ public class Queue extends BaseDestinati
             if ((target == null && interestCount > 0) || consumers.size() == 0)
{
                 // This means all subs were full or that there are no
                 // consumers...
-                rc.add((QueueMessageReference) node);
+                rc.addMessageLast((QueueMessageReference) node);
             }
 
             // If it got dispatched, rotate the consumer list to get round robin
@@ -1886,7 +1918,6 @@ public class Queue extends BaseDestinati
     }
 
     protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference
node) throws Exception {
-        //QueueMessageReference node = (QueueMessageReference) m;
         boolean result = true;
         // Keep message groups together.
         String groupId = node.getGroupID();
@@ -2002,9 +2033,9 @@ public class Queue extends BaseDestinati
 
         pagedInPendingDispatchLock.writeLock().lock();
         try {
-            for (QueueMessageReference ref : pagedInPendingDispatch) {
+            for (MessageReference ref : pagedInPendingDispatch) {
                 if (messageId.equals(ref.getMessageId())) {
-                    message = ref;
+                    message = (QueueMessageReference)ref;
                     pagedInPendingDispatch.remove(ref);
                     break;
                 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java?rev=1222275&r1=1222274&r2=1222275&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
Thu Dec 22 15:15:48 2011
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -25,9 +26,10 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.command.MessageId;
 
 public class OrderedPendingList implements PendingList {
-    PendingNode root = null;
-    PendingNode tail = null;
-    final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
+
+    private PendingNode root = null;
+    private PendingNode tail = null;
+    private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
 
     public PendingNode addMessageFirst(MessageReference message) {
         PendingNode node = new PendingNode(this, message);
@@ -130,4 +132,28 @@ public class OrderedPendingList implemen
         return "OrderedPendingList(" + System.identityHashCode(this) + ")";
     }
 
+    @Override
+    public boolean contains(MessageReference message) {
+        if(map.values().contains(message)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public Collection<MessageReference> values() {
+        List<MessageReference> messageReferences = new ArrayList<MessageReference>();
+        for(PendingNode pendingNode : map.values()) {
+            messageReferences.add(pendingNode.getMessage());
+        }
+        return messageReferences;
+    }
+
+    @Override
+    public void addAll(PendingList pendingList) {
+        for(MessageReference messageReference : pendingList) {
+            addMessageLast(messageReference);
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java?rev=1222275&r1=1222274&r2=1222275&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
Thu Dec 22 15:15:48 2011
@@ -16,16 +16,96 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
+import java.util.Collection;
 import java.util.Iterator;
+
 import org.apache.activemq.broker.region.MessageReference;
 
-public interface PendingList {
-    
+public interface PendingList extends Iterable<MessageReference> {
+
+    /**
+     * Returns true if there are no Messages in the PendingList currently.
+     * @return true if the PendingList is currently empty.
+     */
     public boolean isEmpty();
+
+    /**
+     * Discards all Messages currently held in the PendingList.
+     */
     public void clear();
+
+    /**
+     * Adds the given message to the head of the list.
+     *
+     * @param message
+     *      The MessageReference that is to be added to this list.
+     *
+     * @return the PendingNode that contains the newly added message.
+     */
     public PendingNode addMessageFirst(MessageReference message);
+
+    /**
+     * Adds the given message to the tail of the list.
+     *
+     * @param message
+     *      The MessageReference that is to be added to this list.
+     *
+     * @return the PendingNode that contains the newly added message.
+     */
     public PendingNode addMessageLast(MessageReference message);
+
+    /**
+     * Removes the given MessageReference from the PendingList if it is
+     * contained within.
+     *
+     * @param message
+     *      The MessageReference that is to be removed to this list.
+     *
+     * @return the PendingNode that contains the removed message or null if the
+     *         message was not present in this list.
+     */
     public PendingNode remove(MessageReference message);
+
+    /**
+     * Returns the number of MessageReferences that are awaiting dispatch.
+     * @return current count of the pending messages.
+     */
     public int size();
+
+    /**
+     * Returns an iterator over the pending Messages.  The subclass controls how
+     * the returned iterator actually traverses the list of pending messages allowing
+     * for the order to vary based on factors like Message priority or some other
+     * mechanism.
+     *
+     * @return an Iterator that returns MessageReferences contained in this list.
+     */
     public Iterator<MessageReference> iterator();
+
+    /**
+     * Query the PendingList to determine if the given message is contained within.
+     *
+     * @param message
+     *      The Message that is the target of this query.
+     *
+     * @return true if the MessageReference is contained in this list.
+     */
+    public boolean contains(MessageReference message);
+
+    /**
+     * Returns a new Collection that contains all the MessageReferences currently
+     * held in this PendingList.  The elements of the list are ordered using the
+     * same rules as the subclass uses for iteration.
+     *
+     * @return a new Collection containing this lists MessageReferences.
+     */
+    public Collection<MessageReference> values();
+
+    /**
+     * Adds all the elements of the given PendingList to this PendingList.
+     *
+     * @param pendingList
+     *      The PendingList that is to be added to this collection.
+     */
+    public void addAll(PendingList pendingList);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java?rev=1222275&r1=1222274&r2=1222275&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
Thu Dec 22 15:15:48 2011
@@ -17,23 +17,27 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
 
 public class PrioritizedPendingList implements PendingList {
-    static final Integer MAX_PRIORITY = 10;
+
+    private static final Integer MAX_PRIORITY = 10;
     private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
-    final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
+    private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
 
     public PrioritizedPendingList() {
         for (int i = 0; i < MAX_PRIORITY; i++) {
             this.lists[i] = new OrderedPendingList();
         }
     }
+
     public PendingNode addMessageFirst(MessageReference message) {
         PendingNode node = getList(message).addMessageFirst(message);
         this.map.put(message.getMessageId(), node);
@@ -124,9 +128,32 @@ public class PrioritizedPendingList impl
                 map.remove(node.getMessage().getMessageId());
                 node.getList().removeNode(node);
             }
+        }
+    }
+
+    @Override
+    public boolean contains(MessageReference message) {
+        if (map.values().contains(message)) {
+            return true;
+        }
+
+        return false;
+    }
 
+    @Override
+    public Collection<MessageReference> values() {
+        List<MessageReference> messageReferences = new ArrayList<MessageReference>();
+        for (PendingNode pendingNode : map.values()) {
+            messageReferences.add(pendingNode.getMessage());
         }
+        return messageReferences;
+    }
 
+    @Override
+    public void addAll(PendingList pendingList) {
+        for(MessageReference messageReference : pendingList) {
+            addMessageLast(messageReference);
+        }
     }
 
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java?rev=1222275&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java Thu
Dec 22 15:15:48 2011
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3436Test {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(AMQ3436Test.class);
+
+    private BrokerService broker;
+    private PersistenceAdapter adapter;
+    private boolean useCache = true;
+    private boolean prioritizeMessages = true;
+
+    protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception
{
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setConcurrentStoreAndDispatchQueues(false);
+        adapter.setConcurrentStoreAndDispatchTopics(false);
+        adapter.deleteAllMessages();
+        return adapter;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setBrokerName("priorityTest");
+        broker.setAdvisorySupport(false);
+        broker.setUseJmx(false);
+        adapter = createPersistenceAdapter(true);
+        broker.setPersistenceAdapter(adapter);
+        PolicyEntry policy = new PolicyEntry();
+        policy.setPrioritizedMessages(prioritizeMessages);
+        policy.setUseCache(useCache);
+        policy.setProducerFlowControl(false);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.put(new ActiveMQQueue("TEST"), policy);
+
+        // do not process expired for one test
+        PolicyEntry ignoreExpired = new PolicyEntry();
+        SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy();
+        ignoreExpiredStrategy.setProcessExpired(false);
+        ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy);
+
+        broker.setDestinationPolicy(policyMap);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    protected void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test
+    public void testPriorityWhenConsumerCreatedBeforeProduction() throws Exception {
+
+        int messageCount = 200;
+        URI failoverUri = new URI("vm://priorityTest?jms.prefetchPolicy.all=1");
+
+        ActiveMQQueue dest = new ActiveMQQueue("TEST?consumer.dispatchAsync=false");
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUri);
+        cf.setDispatchAsync(false);
+
+        // Create producer
+        ActiveMQConnection producerConnection = (ActiveMQConnection) cf.createConnection();
+        producerConnection.setMessagePrioritySupported(true);
+        producerConnection.start();
+        final Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = producerSession.createProducer(dest);
+
+        ActiveMQMessageConsumer consumer;
+
+        // Create consumer on separate connection
+        ActiveMQConnection consumerConnection = (ActiveMQConnection) cf.createConnection();
+        consumerConnection.setMessagePrioritySupported(true);
+        consumerConnection.start();
+        final ActiveMQSession consumerSession = (ActiveMQSession) consumerConnection.createSession(true,
+                Session.SESSION_TRANSACTED);
+        consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(dest);
+
+        // Produce X number of messages with a session commit after each message
+        Random random = new Random();
+        for (int i = 0; i < messageCount; ++i) {
+
+            Message message = producerSession.createTextMessage("Test message #" + i);
+            producer.send(message, DeliveryMode.PERSISTENT, random.nextInt(10), 45*1000);
+            producerSession.commit();
+        }
+        producer.close();
+
+        // ***************************************************
+        // If we create the consumer here instead of above, the
+        // the messages will be consumed in priority order
+        // ***************************************************
+        //consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(dest);
+
+        // Consume all of the messages we produce using a listener.
+        // Don't exit until we get all the messages.
+        final CountDownLatch latch = new CountDownLatch(messageCount);
+        final StringBuffer failureMessage = new StringBuffer();
+        consumer.setMessageListener(new MessageListener() {
+            int lowestPrioritySeen = 10;
+
+            boolean firstMessage = true;
+
+            public void onMessage(Message msg) {
+                try {
+
+                    int currentPriority = msg.getJMSPriority();
+                    LOG.debug(currentPriority + "<=" + lowestPrioritySeen);
+
+                    // Ignore the first message priority since it is prefetched
+                    // and is out of order by design
+                    if (firstMessage == true) {
+                        firstMessage = false;
+                        LOG.debug("Ignoring first message since it was prefetched");
+
+                    } else {
+
+                        // Verify that we never see a priority higher than the
+                        // lowest
+                        // priority seen
+                        if (lowestPrioritySeen > currentPriority) {
+                            lowestPrioritySeen = currentPriority;
+                        }
+                        if (lowestPrioritySeen < currentPriority) {
+                            failureMessage.append("Incorrect priority seen (Lowest Priority
= " + lowestPrioritySeen
+                                    + " Current Priority = " + currentPriority + ")"
+                                    + System.getProperty("line.separator"));
+                        }
+                    }
+
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                } finally {
+                    latch.countDown();
+                    LOG.debug("Messages remaining = " + latch.getCount());
+                }
+            }
+        });
+
+        latch.await();
+        consumer.close();
+
+        // Cleanup producer resources
+        producerSession.close();
+        producerConnection.stop();
+        producerConnection.close();
+
+        // Cleanup consumer resources
+        consumerSession.close();
+        consumerConnection.stop();
+        consumerConnection.close();
+
+        // Report the failure if found
+        if (failureMessage.length() > 0) {
+            Assert.fail(failureMessage.toString());
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message