activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1422188 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-openw...
Date Sat, 15 Dec 2012 05:43:38 GMT
Author: chirino
Date: Sat Dec 15 05:43:34 2012
New Revision: 1422188

URL: http://svn.apache.org/viewvc?rev=1422188&view=rev
Log:
Implements APLO-19: Support message groups

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/stomp-manual.md
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
    activemq/activemq-apollo/trunk/apollo-website/src/index.page

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
Sat Dec 15 05:43:34 2012
@@ -107,6 +107,9 @@ class AmqpMessage(private var encoded_bu
     encoded_buffer
   }
 
+
+  override def message_group = decoded.getGroupId
+
   def getBodyAs[T](toType : Class[T]): T = {
     if (toType == classOf[Buffer]) {
       encoded

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Sat Dec 15 05:43:34 2012
@@ -121,7 +121,8 @@ trait Message extends Filterable with Re
   def codec:MessageCodec
 
   def encoded:Buffer = codec.encode(this).buffer
-  
+
+  def message_group: String = null
 }
 
 /**

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Sat Dec 15 05:43:34 2012
@@ -32,7 +32,7 @@ import java.util.regex.Pattern
 import collection.mutable.ListBuffer
 
 object Queue extends Log {
-  val subcsription_counter = new AtomicInteger(0)
+  val subscription_counter = new AtomicInteger(0)
 
   class MemorySpace {
     var items = 0
@@ -54,6 +54,10 @@ object Queue extends Log {
 
 import Queue._
 
+case class GroupBucket(sub:Subscription) {
+  override def toString: String = sub.id.toString
+}
+
 /**
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -75,6 +79,21 @@ class Queue(val router: LocalRouter, val
   var all_subscriptions = Map[DeliveryConsumer, Subscription]()
   var exclusive_subscriptions = ListBuffer[Subscription]()
 
+  var _message_group_buckets: HashRing[GroupBucket, String] = _
+
+  def message_group_buckets = {
+    // If the queue is not using message groups, lets avoid
+    // creating the bucket hash ring.
+    if( _message_group_buckets == null )  {
+      _message_group_buckets = new HashRing[GroupBucket, String]()
+      // Create a bucket for each subscription
+      for( sub <- all_subscriptions.values if !sub.browser) {
+        _message_group_buckets.add(GroupBucket(sub), 10)
+      }
+    }
+    _message_group_buckets
+  }
+
   def filter = binding.message_filter
 
   override val dispatch_queue: DispatchQueue = createQueue(id);
@@ -215,6 +234,8 @@ class Queue(val router: LocalRouter, val
 
   def dlq_nak_limit = OptionSupport(config.nak_limit).getOrElse(0)
 
+  def message_group_graceful_handoff = OptionSupport(config.message_group_graceful_handoff).getOrElse(true)
+
   def configure(update:QueueSettingsDTO) = {
     def mem_size(value:String, default:String) = MemoryPropertyEditor.parse(Option(value).getOrElse(default)).toInt
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Sat Dec 15 05:43:34 2012
@@ -598,6 +598,21 @@ class QueueEntry(val queue:Queue, val se
       }
 
       var acquiringSub: Subscription = null
+
+      // Find the the first exclusive target of the message
+      var exclusive_target = queue.exclusive_subscriptions.find( _.matches(delivery) )
+
+      // Should we looks for the message group bucket?
+      if ( exclusive_target.isEmpty && delivery.message.message_group != null ) {
+        var iterator = queue.message_group_buckets.iterator(delivery.message.message_group)
+        while (exclusive_target==None && iterator.hasNext) {
+          val bucket = iterator.next();
+          if( bucket.sub.matches(delivery) ) {
+            exclusive_target = Some(bucket.sub)
+          }
+        }
+      }
+
       parked.foreach{ sub=>
 
         if( sub.browser ) {
@@ -619,48 +634,43 @@ class QueueEntry(val queue:Queue, val se
             // advance: another sub already acquired this entry..
             advancing += sub
           } else {
-            if (!sub.matches(delivery)) {
+
+            // Is the current sub not the exclusive target?
+            if( (exclusive_target.isDefined && (exclusive_target.get != sub))
+                || !sub.matches(delivery)
+                || (exclusive_target.isEmpty && delivery.message.message_group!=null)
) {
               // advance: not interested.
               advancing += sub
             } else {
 
-              // Find the the first exclusive target of the message
-              val exclusive_target = queue.exclusive_subscriptions.find( _.matches(delivery)
)
-
-              // Is the current sub not the exclusive target?
-              if( exclusive_target.isDefined && (exclusive_target.get != sub) ) {
-                // advance: not interested.
-                advancing += sub
+              // Is the sub flow controlled?
+              if( sub.full ) {
+                // hold back: flow controlled
+                heldBack += sub
               } else {
-                // Is the sub flow controlled?
-                if( sub.full ) {
-                  // hold back: flow controlled
-                  heldBack += sub
+                // advance: accepted...
+                if( queue.tune_round_robin ) {
+                  acquiringSub = sub
                 } else {
-                  // advance: accepted...
-                  if( queue.tune_round_robin ) {
-                    acquiringSub = sub
-                  } else {
-                    advancing += sub
-                  }
-                  acquirer = sub
+                  advancing += sub
+                }
+                acquirer = sub
 
-                  val acquiredQueueEntry = sub.acquire(entry)
-                  val acquiredDelivery = delivery.copy
-                  if( acquiredDelivery.sender == Nil) {
-                    acquiredDelivery.sender ::= queue.address
-                  }
+                val acquiredQueueEntry = sub.acquire(entry)
+                val acquiredDelivery = delivery.copy
+                if( acquiredDelivery.sender == Nil) {
+                  acquiredDelivery.sender ::= queue.address
+                }
 
-                  acquiredDelivery.ack = (consumed, uow)=> {
-                    if( uow!=null ) {
-                      uow.retain()
-                    }
-                    queue.ack_source.merge((acquiredQueueEntry, consumed, uow))
+                acquiredDelivery.ack = (consumed, uow)=> {
+                  if( uow!=null ) {
+                    uow.retain()
                   }
-
-                  val accepted = sub.offer(acquiredDelivery)
-                  assert(accepted, "sub should have accepted, it had reported not full earlier.")
+                  queue.ack_source.merge((acquiredQueueEntry, consumed, uow))
                 }
+
+                val accepted = sub.offer(acquiredDelivery)
+                assert(accepted, "sub should have accepted, it had reported not full earlier.")
               }
             }
           }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
Sat Dec 15 05:43:34 2012
@@ -39,7 +39,7 @@ class Subscription(val queue:Queue, val 
 
   def dispatch_queue = queue.dispatch_queue
 
-  val id = Queue.subcsription_counter.incrementAndGet
+  val id = Queue.subscription_counter.incrementAndGet
   var acquired = new LinkedNodeList[AcquiredQueueEntry]
   var session: DeliverySession = null
   var pos:QueueEntry = null
@@ -120,6 +120,43 @@ class Subscription(val queue:Queue, val 
     pos ::= this
 
     queue.all_subscriptions += consumer -> this
+    if( !consumer.browser && queue._message_group_buckets != null ) {
+
+      var iterators = queue._message_group_buckets.add(GroupBucket(this), 10)
+
+      // If we are doing graceful handoffs of message groups...
+      if( queue.message_group_graceful_handoff ) {
+        import collection.JavaConversions._
+        for ( iterator <- iterators ) {
+
+          // When we add the new bucket, it's going to get assigned
+          // message groups that were previously being serviced by the next
+          // bucket.  We need to suspend dispatch to both these groups
+          // until all dispatched messages get ack/drained, so that
+          // messages groups are not being concurrently being processed
+          // by two subscriptions.
+
+          var taking_over:Subscription = null
+          while ( iterator.hasNext && taking_over==null) {
+            val next = iterator.next()
+            if( next.sub != this ) {
+              taking_over = next.sub
+            }
+          }
+
+          // If we are taking over
+          if( taking_over!=null ) {
+            this.suspend
+            taking_over.suspend
+            taking_over.on_drain {
+              resume
+              taking_over.resume
+            }
+          }
+        }
+      }
+    }
+
     queue.consumer_counter += 1
     queue.change_consumer_capacity( consumer_buffer )
 
@@ -135,14 +172,32 @@ class Subscription(val queue:Queue, val 
     queue.check_idle
   }
 
-  var pending_close_action: ()=>Unit = _
+  var suspend_count = 0;
 
-  def check_finish_close = {
+  def suspend = suspend_count+=1
+  def resume = {
+    suspend_count-=1
+    if( suspend_count <= 0) {
+      queue.dispatch_queue << pos.task
+    }
+  }
+
+  def on_drain(func: =>Unit) {
+    drain_watchers ::= func _
+    check_drained
+  }
+
+  var drain_watchers: List[()=>Unit] = Nil
+
+  def check_drained = {
     // We can complete the closing of the sub
     // once the outstanding acks are settled.
-    if (pending_close_action!=null && acquired.isEmpty) {
-      pending_close_action()
-      pending_close_action = null
+    if (acquired.isEmpty && drain_watchers!=Nil) {
+      val t = drain_watchers
+      drain_watchers = Nil
+      for ( action <- t) {
+        action()
+      }
     }
   }
 
@@ -160,7 +215,9 @@ class Subscription(val queue:Queue, val 
 
       // The following action gets executed once all acquired messages
       // ared acked or nacked.
-      pending_close_action = ()=> {
+
+      consumer.release
+      on_drain {
         queue.change_consumer_capacity( - consumer_buffer )
 
         if( exclusive ) {
@@ -171,9 +228,6 @@ class Subscription(val queue:Queue, val 
         queue.check_idle
         queue.trigger_swap
       }
-
-      consumer.release
-      check_finish_close
     } else {}
   }
 
@@ -206,11 +260,11 @@ class Subscription(val queue:Queue, val 
   def tail_parked = pos eq queue.tail_entry
 
   def matches(entry:Delivery) = consumer.matches(entry)
-  def full = session.full
+  def full = suspend_count > 0 || session.full
 
   def offer(delivery:Delivery) = try {
     assert(delivery.seq > 0 )
-    if( session.full ) {
+    if( full ) {
       false
     } else {
       val accepted = session.offer(delivery)
@@ -288,7 +342,7 @@ class Subscription(val queue:Queue, val 
 
       queue.trigger_swap
       next.task.run
-      check_finish_close
+      check_drained
 
     }
 
@@ -336,7 +390,7 @@ class Subscription(val queue:Queue, val 
       if( acquired.isEmpty ) {
         idle_start = System.nanoTime()
       }
-      check_finish_close
+      check_drained
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
Sat Dec 15 05:43:34 2012
@@ -56,6 +56,15 @@ public class QueueSettingsDTO {
     public Boolean round_robin;
 
     /**
+     * When set to true, the queue
+     * will drain the required message group consumers of messages before
+     * allowing new messages to dispatched to messages groups which have been
+     * moved to a different consumer due to re-balancing. Defaults to true.
+     */
+    @XmlAttribute(name="message_group_graceful_handoff")
+    public Boolean message_group_graceful_handoff;
+
+    /**
      * Should messages be swapped out of memory if
      * no consumers need the message?
      */

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
Sat Dec 15 05:43:34 2012
@@ -35,6 +35,8 @@ class OpenwireMessage(val message:Active
 
   def toString(buffer:AnyRef) = if (buffer==null) null else buffer.toString
 
+  override def message_group: String = if(message.getGroupID!=null ) message.getGroupID.toString
else null
+
   def getProperty(name: String) = {
     name match {
       case "JMSDeliveryMode" =>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Sat Dec 15 05:43:34 2012
@@ -60,6 +60,9 @@ case class StompFrameMessage(frame:Stomp
    */
   var persistent = false
 
+  var message_group_buffer:AsciiBuffer = null
+  override def message_group = if( message_group_buffer==null ) null else message_group_buffer.toString
+
   for( header <- (frame.updated_headers ::: frame.headers).reverse ) {
     header match {
       case (MESSAGE_ID, value) =>
@@ -70,10 +73,13 @@ case class StompFrameMessage(frame:Stomp
         expiration = java.lang.Long.parseLong(value)
       case (PERSISTENT, value) =>
         persistent = java.lang.Boolean.parseBoolean(value)
+      case (MESSAGE_GROUP, value) =>
+        message_group_buffer = value
       case _ =>
     }
   }
 
+
   def getBodyAs[T](toType : Class[T]) = {
     (frame.content match {
       case x:BufferContent =>
@@ -386,6 +392,7 @@ object Stomp {
   val RETAIN = ascii("retain")
   val SET = ascii("set")
   val REMOVE = ascii("remove")
+  val MESSAGE_GROUP = ascii("message_group")
 
   val MESSAGE_ID = ascii("message-id")
   val PRORITY = ascii("priority")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Sat Dec 15 05:43:34 2012
@@ -23,6 +23,7 @@ import org.apache.activemq.apollo.broker
 import java.net.{SocketTimeoutException, InetSocketAddress}
 import org.apache.activemq.apollo.stomp.{Stomp, StompProtocolHandler}
 import org.fusesource.hawtdispatch._
+import collection.mutable
 
 /**
  * These tests can be run in parallel against a single Apollo broker.
@@ -614,6 +615,63 @@ class StompParallelTest extends StompTes
     sub1_counter should be(4)
   }
 
+  test("Message groups are sticky to a consumer") {
+
+    val dest = next_id("/queue/msggroups")
+    connect("1.1")
+    subscribe("1", dest)
+    subscribe("2", dest)
+
+    var actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
+
+    def send_receive = {
+      for (i <- 0 until 26 ) { async_send(dest, "data", "message_group:"+('a'+i).toChar+"\n")
}
+      for (i <- 0 until 26 ) {
+        val (frame, ack) = receive_message()
+        for( sub <- List("1", "2", "3") if( frame.contains("subscription:"+sub+"\n"))
) {
+          val set = actual_mapping.getOrElseUpdate(sub, mutable.HashSet())
+          for (i <- 0 until 26 ) {
+            var c = ('a' + i).toChar
+            if( frame.contains("message_group:"+c+"\n")) {
+              set.add(c)
+            }
+          }
+        }
+        ack
+      }
+    }
+
+    send_receive
+
+    var expected_mapping = actual_mapping
+    println(expected_mapping)
+    expected_mapping.get("1").get.intersect(expected_mapping.get("2").get).isEmpty should
be(true)
+
+    actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
+    // Send more messages in, make sure they stay mapping to same consumers.
+    send_receive; send_receive; send_receive; send_receive
+
+    actual_mapping should be (expected_mapping)
+
+    // Add another subscriber, the groups should re-balance
+    subscribe("3", dest)
+
+    actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
+    send_receive
+    expected_mapping = actual_mapping
+    println(expected_mapping)
+
+    expected_mapping.get("1").get.intersect(expected_mapping.get("2").get).isEmpty should
be(true)
+    expected_mapping.get("2").get.intersect(expected_mapping.get("3").get).isEmpty should
be(true)
+    expected_mapping.get("1").get.intersect(expected_mapping.get("3").get).isEmpty should
be(true)
+
+    actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
+    // Send more messages in, make sure they stay mapping to same consumers.
+    send_receive; send_receive; send_receive; send_receive
+    actual_mapping should be (expected_mapping)
+
+  }
+
 
   test("Queues do NOT load balance across exclusive subscribers") {
     connect("1.1")

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/stomp-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/stomp-manual.md?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/stomp-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/stomp-manual.md Sat Dec
15 05:43:34 2012
@@ -549,6 +549,38 @@ to the `SUBSCRIBE` frame.  Example:
     
     ^@
 
+### Message Groups
+
+Message Groups are an enhancement to the Exclusive Consumer feature to provide
+
+* guaranteed ordering of the processing of related messages across a single queue
+* load balancing of the processing of messages across multiple consumers
+* high availability / auto-failover to other consumers if a JVM goes down
+
+Message Groups are logically like a parallel Exclusive Consumer. Rather 
+than all messages going to a single consumer, the stomp `message_group` header
+is used to define which message group the message belongs to. The Message Group feature
+then ensures that all messages for the same message group will be sent to the same 
+consumer - while that consumer stays alive. As soon as the consumer dies another 
+will be chosen.
+
+Another way of explaining Message Groups is that it provides sticky load balancing 
+of messages across consumers; where the message group value is kinda like a HTTP 
+session ID or cookie value and the message broker is acting like a HTTP load balancer.
+
+Here is an example message with the message group set:
+
+    MESSAGE
+    destination:/queue/PO.REQUEST
+    message_group:hiram
+    
+    PO145
+    ^@
+
+The broker uses consistent hashing to map message groups to consumers.  When you another
+subscription to a queue, the broker will first wait for messages sent to previous subscriptions
+to be processed and then the broker rebalances the message groups across consumers.
+
 ### Temporary Destinations
 
 Temporary destinations are typically used to receive response messages in

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Sat Dec
15 05:43:34 2012
@@ -387,10 +387,15 @@ A `queue` element may be configured with
   details.  Defaults to false.
 
 * `tail_buffer` : The amount of memory buffer space allocated for holding
-freshly enqueued message.  Defaults to `640k`.
+  freshly enqueued message.  Defaults to `640k`.
 
 * `persistent` : If set to false, then the queue will not persistently
-store it's message.  Defaults to true.
+  store it's message.  Defaults to true.
+
+* `message_group_graceful_handoff` : When set to true, the queue
+  will drain message group consumers of messages before
+  allowing new messages to dispatched to messages groups which have been 
+  moved to a different consumer due to re-balancing. Defaults to true.
 
 * `round_robin` : Should the destination dispatch messages to consumers
   using round robin distribution strategy?  Defaults to true.
@@ -398,7 +403,7 @@ store it's message.  Defaults to true.
   consumers until those consumers start throttling the broker.
 
 * `swap` : If set to false, then the queue will not swap messages out of 
-memory.  Defaults to true.
+  memory.  Defaults to true.
 
 * `swap_range_size` : The number max number of flushed queue entries to load
   from the store at a time. Note that Flushed entires are just reference

Modified: activemq/activemq-apollo/trunk/apollo-website/src/index.page
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/index.page?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/index.page (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/index.page Sat Dec 15 05:43:34 2012
@@ -31,18 +31,19 @@ multi-protocol broker and supports STOMP
 
 ## Features
 
-* [STOMP Protocol Support](documentation/stomp-manual.html) 
-* [AMQP Protocol Support](documentation/stomp-manual.html) 
+* STOMP Protocol Support
+* AMQP Protocol Support
 * MQTT Protocol Support
-* [OpenWire Protocol Support](documentation/openwire-manual.html)
-* [Topics and Queues](documentation/user-manual.html#Destination_Types)
-* [Queue Browsers](documentation/user-manual.html#Browsing_Subscriptions)
-* [Durable Subscriptions on Topics](documentation/user-manual.html#Topic_Durable_Subscriptions)
+* OpenWire Protocol Support
+* [Topics and Queues](documentation/stomp-manual.html#Destination_Types)
+* [Queue Browsers](documentation/stino-manual.html#Browsing_Subscriptions)
+* [Durable Subscriptions on Topics](documentation/stomp-manual.html#Topic_Durable_Subscriptions)
 * [Mirrored Queues](documentation/user-manual.html#Mirrored_Queues)
-* [Reliable Messaging](documentation/user-manual.html#Reliable_Messaging)
-* [Message Expiration](documentation/user-manual.html#Message_Expiration)
+* [Reliable Messaging](documentation/stomp-manual.html#Reliable_Messaging)
+* [Message Expiration](documentation/stomp-manual.html#Message_Expiration)
 * [Message Swapping](documentation/architecture.html#Message_Swapping)
-* [Message Selectors](documentation/user-manual.html#Message_Selectors)
+* [Message Selectors](documentation/stomp-manual.html#Message_Selectors)
+* [Message Groups](documentation/stomp-manual.html#Message_Groups)
 * [JAAS Authentication](documentation/user-manual.html#Authentication)
 * [ACL based Authorization](documentation/user-manual.html#Authorization)
 * [SSL/TLS Support](documentation/user-manual.html#Using_SSL_TLS) and Certificate based Authentication
@@ -51,8 +52,11 @@ multi-protocol broker and supports STOMP
 
 ## Documentation
 
- * [Getting Started Guide](documentation/getting-started.html)
- * [User Manual](documentation/user-manual.html)
- * [Migration Guide](documentation/migration-guide.html)
+* [Getting Started Guide](documentation/getting-started.html)
+* [User Manual](documentation/user-manual.html)
+* [Migration Guide](documentation/migration-guide.html)
+* [STOMP Protocol Manual](documentation/stomp-manual.html) 
+* [OpenWire Protocol Manual](documentation/openwire-manual.html)
+* [AMQP Protocol Manual](documentation/amqp-manual.html) 
 
 



Mime
View raw message