activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1040151 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/activemq/store/
Date Mon, 29 Nov 2010 15:33:04 GMT
Author: gtully
Date: Mon Nov 29 15:33:03 2010
New Revision: 1040151

URL: http://svn.apache.org/viewvc?rev=1040151&view=rev
Log:
ensure new high priority messages get dispatched immediately rather than at the end of the
next batch, configurable via PendingDurableSubscriberMessageStoragePolicy.immediatePriorityDispatch
default true, most relevant with prefetch=1

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1040151&r1=1040150&r2=1040151&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Mon Nov 29 15:33:03 2010
@@ -34,7 +34,7 @@ public abstract class AbstractStoreCurso
     protected final Destination regionDestination;
     private final PendingList batchList;
     private Iterator<MessageReference> iterator = null;
-    private boolean cacheEnabled=false;
+    protected boolean cacheEnabled=false;
     protected boolean batchResetNeeded = true;
     protected boolean storeHasMessages = false;
     protected int size;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1040151&r1=1040150&r2=1040151&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Mon Nov 29 15:33:03 2010
@@ -49,6 +49,8 @@ public class StoreDurableSubscriberCurso
     private final PendingMessageCursor nonPersistent;
     private PendingMessageCursor currentCursor;
     private final Subscription subscription;
+    private int lastAddPriority = 0;
+    private boolean immediatePriorityDispatch = true;
     /**
      * @param broker Broker for this cursor
      * @param clientId clientId for this cursor
@@ -75,6 +77,7 @@ public class StoreDurableSubscriberCurso
     @Override
     public synchronized void start() throws Exception {
         if (!isStarted()) {
+            lastAddPriority = 0;
             super.start();
             for (PendingMessageCursor tsp : storePrefetches) {
             	tsp.setMessageAudit(getMessageAudit());
@@ -182,8 +185,18 @@ public class StoreDurableSubscriberCurso
                 TopicStorePrefetch tsp = topics.get(dest);
                 if (tsp != null) {
                     tsp.addMessageLast(node);
+                    if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch
&& !tsp.cacheEnabled) {
+                        final int priority = msg.getPriority();
+                        if (priority > lastAddPriority) {
+                            // go get the latest priority message
+                            LOG.debug("Clearing cursor on high priority message " + priority);
+                            tsp.clear();
+                        }
+                        lastAddPriority = priority;
+                    }
                 }
             }
+
         }
     }
 
@@ -344,4 +357,12 @@ public class StoreDurableSubscriberCurso
     public String toString() {
         return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
     }
+
+    public boolean isImmediatePriorityDispatch() {
+        return immediatePriorityDispatch;
+    }
+
+    public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) {
+        this.immediatePriorityDispatch = immediatePriorityDispatch;
+    }    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java?rev=1040151&r1=1040150&r2=1040151&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
Mon Nov 29 15:33:03 2010
@@ -32,6 +32,21 @@ import org.apache.activemq.kaha.Store;
  * @version $Revision$
  */
 public class StorePendingDurableSubscriberMessageStoragePolicy implements PendingDurableSubscriberMessageStoragePolicy
{
+    boolean immediatePriorityDispatch = true;
+
+    public boolean isImmediatePriorityDispatch() {
+        return immediatePriorityDispatch;
+    }
+
+    /**
+     * Ensure that new higher priority messages will get an immediate dispatch
+     * rather than wait for the end of the current cursor batch.
+     *
+     * @param immediatePriorityDispatch
+     */
+    public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) {
+        this.immediatePriorityDispatch = immediatePriorityDispatch;
+    }
 
     /**
      * Retrieve the configured pending message storage cursor;
@@ -44,6 +59,8 @@ public class StorePendingDurableSubscrib
      * @return the Pending Message cursor
      */
     public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, Subscription sub) {
-        return new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub);
+        StoreDurableSubscriberCursor cursor = new StoreDurableSubscriberCursor(broker,clientId,
name, maxBatchSize, sub);
+        cursor.setImmediatePriorityDispatch(isImmediatePriorityDispatch());
+        return cursor;
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1040151&r1=1040150&r2=1040151&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
Mon Nov 29 15:33:03 2010
@@ -30,8 +30,10 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -69,6 +71,10 @@ abstract public class MessagePriorityTes
         PolicyEntry policy = new PolicyEntry();
         policy.setPrioritizedMessages(prioritizeMessages);
         policy.setUseCache(useCache);
+        StorePendingDurableSubscriberMessageStoragePolicy durableSubPending =
+                new StorePendingDurableSubscriberMessageStoragePolicy();
+        durableSubPending.setImmediatePriorityDispatch(true);
+        policy.setPendingDurableSubscriberPolicy(durableSubPending);
         PolicyMap policyMap = new PolicyMap();
         policyMap.put(new ActiveMQQueue("TEST"), policy);
         policyMap.put(new ActiveMQTopic("TEST"), policy);
@@ -140,6 +146,14 @@ abstract public class MessagePriorityTes
                 e.printStackTrace();
             }
         }
+
+        public void setMessagePriority(int priority) {
+            this.priority = priority;
+        }
+
+        public void setMessageCount(int messageCount) {
+            this.messageCount = messageCount;    
+        }
         
     }
     
@@ -261,5 +275,46 @@ abstract public class MessagePriorityTes
             }
         }
     }
+
+    public void testHighPriorityDelivery() throws Exception {
+
+        // get zero prefetch
+        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+        prefetch.setAll(0);
+        factory.setPrefetchPolicy(prefetch);
+        conn.close();
+        conn = factory.createConnection();
+        conn.setClientID("priority");
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
+        final String subName = "priorityDisconnect";
+        TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
+        sub.close();
+
+        ProducerThread producerThread = new ProducerThread(topic, 5000, LOW_PRI);
+        producerThread.run();
+        LOG.info("Low priority messages sent");
+
+        sub = sess.createDurableSubscriber(topic, subName);
+        for (int i=0; i<200;i++) {
+            Message msg = sub.receive(15000);
+            LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
+            assertNotNull("Message " + i + " was null", msg);
+            assertEquals("Message " + i + " has wrong priority", LOW_PRI, msg.getJMSPriority());
+        }
+
+        producerThread.setMessagePriority(HIGH_PRI);
+        producerThread.setMessageCount(1);
+        producerThread.run();
+        LOG.info("High priority message sent");
+
+        // try and get the high priority message
+        Message msg = sub.receive(15000);
+        assertNotNull("Message was null", msg);
+        LOG.info("received: " + msg);
+        assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
+    }
     
 }



Mime
View raw message