activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1422113 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-stomp/src/test/resources/ apollo-stomp/src/test/scala/org/apache/...
Date Fri, 14 Dec 2012 21:57:59 GMT
Author: chirino
Date: Fri Dec 14 21:57:55 2012
New Revision: 1422113

URL: http://svn.apache.org/viewvc?rev=1422113&view=rev
Log:
Implements APLO-278: Support option on queues to control if a round robin message distribution
strategy should be used when multiple consumer are attached to the queue.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1422113&r1=1422112&r2=1422113&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Dec 14 21:57:55 2012
@@ -113,6 +113,11 @@ class Queue(val router: LocalRouter, val
   var tune_persistent = true
 
   /**
+   * Should use a round robin dispatching of messages?
+   */
+  var tune_round_robin = true
+
+  /**
    * Should messages be swapped out of memory if
    * no consumers need the message?
    */
@@ -222,6 +227,7 @@ class Queue(val router: LocalRouter, val
     session_manager.resize(Int.MaxValue, new_tail_buffer)
 
     tune_persistent = virtual_host.store !=null && update.persistent.getOrElse(true)
+    tune_round_robin = update.round_robin.getOrElse(true)
     tune_swap = tune_persistent && update.swap.getOrElse(true)
     tune_swap_range_size = update.swap_range_size.getOrElse(10000)
     tune_fast_delivery_rate = mem_size(update.fast_delivery_rate,"512k")

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1422113&r1=1422112&r2=1422113&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Fri Dec 14 21:57:55 2012
@@ -110,11 +110,11 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def ::=(sub:Subscription) = {
-    parked ::= sub
+    parked = parked ::: sub :: Nil
   }
 
   def :::=(l:List[Subscription]) = {
-    parked :::= l
+    parked = parked ::: l
   }
 
 
@@ -638,7 +638,11 @@ class QueueEntry(val queue:Queue, val se
                   heldBack += sub
                 } else {
                   // advance: accepted...
-                  acquiringSub = sub
+                  if( queue.tune_round_robin ) {
+                    acquiringSub = sub
+                  } else {
+                    advancing += sub
+                  }
                   acquirer = sub
 
                   val acquiredQueueEntry = sub.acquire(entry)

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java?rev=1422113&r1=1422112&r2=1422113&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
Fri Dec 14 21:57:55 2012
@@ -47,6 +47,15 @@ public class QueueSettingsDTO {
     public Boolean persistent;
 
     /**
+     * Should the destination dispatch messages to consumers
+     * using round robin distribution strategy?  Defaults to true.
+     * If set to false, then messages will be dispatched
+     * to the first attached consumers until they throttle the broker.
+     */
+    @XmlAttribute(name="round_robin")
+    public Boolean round_robin;
+
+    /**
      * Should messages be swapped out of memory if
      * no consumers need the message?
      */

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml?rev=1422113&r1=1422112&r2=1422113&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml Fri
Dec 14 21:57:55 2012
@@ -36,6 +36,7 @@
     <queue id="drop.tail.persistent" full_policy="drop tail" quota="100k"/>
     <queue id="drop.head.non" full_policy="drop head" tail_buffer="100k" persistent="false"/>
     <queue id="drop.tail.non" full_policy="drop tail" tail_buffer="100k" persistent="false"/>
+    <queue id="noroundrobin.**" round_robin="false"/>
 
     <bdb_store directory="${testdatadir}"/>
   </virtual_host>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1422113&r1=1422112&r2=1422113&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
Fri Dec 14 21:57:55 2012
@@ -36,6 +36,7 @@
     <queue id="drop.tail.persistent" full_policy="drop tail" quota="100k"/>
     <queue id="drop.head.non" full_policy="drop head" tail_buffer="100k" persistent="false"/>
     <queue id="drop.tail.non" full_policy="drop tail" tail_buffer="100k" persistent="false"/>
+    <queue id="noroundrobin.**" round_robin="false"/>
 
     <leveldb_store directory="${testdatadir}"/>
   </virtual_host>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1422113&r1=1422112&r2=1422113&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Fri Dec
14 21:57:55 2012
@@ -31,6 +31,7 @@
     <topic id="queued.**" slow_consumer_policy="queue">
       <subscription tail_buffer="4k"/>
     </topic>
+    <queue id="noroundrobin.**" round_robin="false"/>
 
   </virtual_host>
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1422113&r1=1422112&r2=1422113&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Fri Dec 14 21:57:55 2012
@@ -589,6 +589,32 @@ class StompParallelTest extends StompTes
 
   }
 
+  test("Queues do not load balance on queues with round_robin=false") {
+    connect("1.1")
+    subscribe("1", "/queue/noroundrobin.test1")
+    subscribe("2", "/queue/noroundrobin.test1")
+
+    for (i <- 0 until 4) {
+      async_send("/queue/noroundrobin.test1", "message:" + i)
+    }
+
+    var sub1_counter = 0
+    var sub2_counter = 0
+
+    for (i <- 0 until 4) {
+      val (frame, ack) = receive_message()
+      if (frame.contains("subscription:1\n")) {
+        sub1_counter += 1
+      } else if (frame.contains("subscription:2\n")) {
+        sub2_counter += 1
+      }
+    }
+
+    sub2_counter should be(0)
+    sub1_counter should be(4)
+  }
+
+
   test("Queues do NOT load balance across exclusive subscribers") {
     connect("1.1")
 

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1422113&r1=1422112&r2=1422113&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Fri Dec
14 21:57:55 2012
@@ -392,6 +392,11 @@ freshly enqueued message.  Defaults to `
 * `persistent` : If set to false, then the queue will not persistently
 store it's message.  Defaults to true.
 
+* `round_robin` : Should the destination dispatch messages to consumers
+  using round robin distribution strategy?  Defaults to true.
+  If set to false, then messages will be dispatched to the first attached 
+  consumers until those consumers start throttling the broker.
+
 * `swap` : If set to false, then the queue will not swap messages out of 
 memory.  Defaults to true.
 



Mime
View raw message