Author: chirino
Date: Sat Dec 15 05:43:34 2012
New Revision: 1422188
URL: http://svn.apache.org/viewvc?rev=1422188&view=rev
Log:
Implements APLO-19: Support message groups
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
activemq/activemq-apollo/trunk/apollo-website/src/documentation/stomp-manual.md
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
activemq/activemq-apollo/trunk/apollo-website/src/index.page
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
Sat Dec 15 05:43:34 2012
@@ -107,6 +107,9 @@ class AmqpMessage(private var encoded_bu
encoded_buffer
}
+
+ override def message_group = decoded.getGroupId
+
def getBodyAs[T](toType : Class[T]): T = {
if (toType == classOf[Buffer]) {
encoded
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Sat Dec 15 05:43:34 2012
@@ -121,7 +121,8 @@ trait Message extends Filterable with Re
def codec:MessageCodec
def encoded:Buffer = codec.encode(this).buffer
-
+
+ def message_group: String = null
}
/**
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Sat Dec 15 05:43:34 2012
@@ -32,7 +32,7 @@ import java.util.regex.Pattern
import collection.mutable.ListBuffer
object Queue extends Log {
- val subcsription_counter = new AtomicInteger(0)
+ val subscription_counter = new AtomicInteger(0)
class MemorySpace {
var items = 0
@@ -54,6 +54,10 @@ object Queue extends Log {
import Queue._
+case class GroupBucket(sub:Subscription) {
+ override def toString: String = sub.id.toString
+}
+
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -75,6 +79,21 @@ class Queue(val router: LocalRouter, val
var all_subscriptions = Map[DeliveryConsumer, Subscription]()
var exclusive_subscriptions = ListBuffer[Subscription]()
+ var _message_group_buckets: HashRing[GroupBucket, String] = _
+
+ def message_group_buckets = {
+ // If the queue is not using message groups, lets avoid
+ // creating the bucket hash ring.
+ if( _message_group_buckets == null ) {
+ _message_group_buckets = new HashRing[GroupBucket, String]()
+ // Create a bucket for each subscription
+ for( sub <- all_subscriptions.values if !sub.browser) {
+ _message_group_buckets.add(GroupBucket(sub), 10)
+ }
+ }
+ _message_group_buckets
+ }
+
def filter = binding.message_filter
override val dispatch_queue: DispatchQueue = createQueue(id);
@@ -215,6 +234,8 @@ class Queue(val router: LocalRouter, val
def dlq_nak_limit = OptionSupport(config.nak_limit).getOrElse(0)
+ def message_group_graceful_handoff = OptionSupport(config.message_group_graceful_handoff).getOrElse(true)
+
def configure(update:QueueSettingsDTO) = {
def mem_size(value:String, default:String) = MemoryPropertyEditor.parse(Option(value).getOrElse(default)).toInt
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Sat Dec 15 05:43:34 2012
@@ -598,6 +598,21 @@ class QueueEntry(val queue:Queue, val se
}
var acquiringSub: Subscription = null
+
+ // Find the the first exclusive target of the message
+ var exclusive_target = queue.exclusive_subscriptions.find( _.matches(delivery) )
+
+ // Should we looks for the message group bucket?
+ if ( exclusive_target.isEmpty && delivery.message.message_group != null ) {
+ var iterator = queue.message_group_buckets.iterator(delivery.message.message_group)
+ while (exclusive_target==None && iterator.hasNext) {
+ val bucket = iterator.next();
+ if( bucket.sub.matches(delivery) ) {
+ exclusive_target = Some(bucket.sub)
+ }
+ }
+ }
+
parked.foreach{ sub=>
if( sub.browser ) {
@@ -619,48 +634,43 @@ class QueueEntry(val queue:Queue, val se
// advance: another sub already acquired this entry..
advancing += sub
} else {
- if (!sub.matches(delivery)) {
+
+ // Is the current sub not the exclusive target?
+ if( (exclusive_target.isDefined && (exclusive_target.get != sub))
+ || !sub.matches(delivery)
+ || (exclusive_target.isEmpty && delivery.message.message_group!=null)
) {
// advance: not interested.
advancing += sub
} else {
- // Find the the first exclusive target of the message
- val exclusive_target = queue.exclusive_subscriptions.find( _.matches(delivery)
)
-
- // Is the current sub not the exclusive target?
- if( exclusive_target.isDefined && (exclusive_target.get != sub) ) {
- // advance: not interested.
- advancing += sub
+ // Is the sub flow controlled?
+ if( sub.full ) {
+ // hold back: flow controlled
+ heldBack += sub
} else {
- // Is the sub flow controlled?
- if( sub.full ) {
- // hold back: flow controlled
- heldBack += sub
+ // advance: accepted...
+ if( queue.tune_round_robin ) {
+ acquiringSub = sub
} else {
- // advance: accepted...
- if( queue.tune_round_robin ) {
- acquiringSub = sub
- } else {
- advancing += sub
- }
- acquirer = sub
+ advancing += sub
+ }
+ acquirer = sub
- val acquiredQueueEntry = sub.acquire(entry)
- val acquiredDelivery = delivery.copy
- if( acquiredDelivery.sender == Nil) {
- acquiredDelivery.sender ::= queue.address
- }
+ val acquiredQueueEntry = sub.acquire(entry)
+ val acquiredDelivery = delivery.copy
+ if( acquiredDelivery.sender == Nil) {
+ acquiredDelivery.sender ::= queue.address
+ }
- acquiredDelivery.ack = (consumed, uow)=> {
- if( uow!=null ) {
- uow.retain()
- }
- queue.ack_source.merge((acquiredQueueEntry, consumed, uow))
+ acquiredDelivery.ack = (consumed, uow)=> {
+ if( uow!=null ) {
+ uow.retain()
}
-
- val accepted = sub.offer(acquiredDelivery)
- assert(accepted, "sub should have accepted, it had reported not full earlier.")
+ queue.ack_source.merge((acquiredQueueEntry, consumed, uow))
}
+
+ val accepted = sub.offer(acquiredDelivery)
+ assert(accepted, "sub should have accepted, it had reported not full earlier.")
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
Sat Dec 15 05:43:34 2012
@@ -39,7 +39,7 @@ class Subscription(val queue:Queue, val
def dispatch_queue = queue.dispatch_queue
- val id = Queue.subcsription_counter.incrementAndGet
+ val id = Queue.subscription_counter.incrementAndGet
var acquired = new LinkedNodeList[AcquiredQueueEntry]
var session: DeliverySession = null
var pos:QueueEntry = null
@@ -120,6 +120,43 @@ class Subscription(val queue:Queue, val
pos ::= this
queue.all_subscriptions += consumer -> this
+ if( !consumer.browser && queue._message_group_buckets != null ) {
+
+ var iterators = queue._message_group_buckets.add(GroupBucket(this), 10)
+
+ // If we are doing graceful handoffs of message groups...
+ if( queue.message_group_graceful_handoff ) {
+ import collection.JavaConversions._
+ for ( iterator <- iterators ) {
+
+ // When we add the new bucket, it's going to get assigned
+ // message groups that were previously being serviced by the next
+ // bucket. We need to suspend dispatch to both these groups
+ // until all dispatched messages get ack/drained, so that
+ // messages groups are not being concurrently being processed
+ // by two subscriptions.
+
+ var taking_over:Subscription = null
+ while ( iterator.hasNext && taking_over==null) {
+ val next = iterator.next()
+ if( next.sub != this ) {
+ taking_over = next.sub
+ }
+ }
+
+ // If we are taking over
+ if( taking_over!=null ) {
+ this.suspend
+ taking_over.suspend
+ taking_over.on_drain {
+ resume
+ taking_over.resume
+ }
+ }
+ }
+ }
+ }
+
queue.consumer_counter += 1
queue.change_consumer_capacity( consumer_buffer )
@@ -135,14 +172,32 @@ class Subscription(val queue:Queue, val
queue.check_idle
}
- var pending_close_action: ()=>Unit = _
+ var suspend_count = 0;
- def check_finish_close = {
+ def suspend = suspend_count+=1
+ def resume = {
+ suspend_count-=1
+ if( suspend_count <= 0) {
+ queue.dispatch_queue << pos.task
+ }
+ }
+
+ def on_drain(func: =>Unit) {
+ drain_watchers ::= func _
+ check_drained
+ }
+
+ var drain_watchers: List[()=>Unit] = Nil
+
+ def check_drained = {
// We can complete the closing of the sub
// once the outstanding acks are settled.
- if (pending_close_action!=null && acquired.isEmpty) {
- pending_close_action()
- pending_close_action = null
+ if (acquired.isEmpty && drain_watchers!=Nil) {
+ val t = drain_watchers
+ drain_watchers = Nil
+ for ( action <- t) {
+ action()
+ }
}
}
@@ -160,7 +215,9 @@ class Subscription(val queue:Queue, val
// The following action gets executed once all acquired messages
// ared acked or nacked.
- pending_close_action = ()=> {
+
+ consumer.release
+ on_drain {
queue.change_consumer_capacity( - consumer_buffer )
if( exclusive ) {
@@ -171,9 +228,6 @@ class Subscription(val queue:Queue, val
queue.check_idle
queue.trigger_swap
}
-
- consumer.release
- check_finish_close
} else {}
}
@@ -206,11 +260,11 @@ class Subscription(val queue:Queue, val
def tail_parked = pos eq queue.tail_entry
def matches(entry:Delivery) = consumer.matches(entry)
- def full = session.full
+ def full = suspend_count > 0 || session.full
def offer(delivery:Delivery) = try {
assert(delivery.seq > 0 )
- if( session.full ) {
+ if( full ) {
false
} else {
val accepted = session.offer(delivery)
@@ -288,7 +342,7 @@ class Subscription(val queue:Queue, val
queue.trigger_swap
next.task.run
- check_finish_close
+ check_drained
}
@@ -336,7 +390,7 @@ class Subscription(val queue:Queue, val
if( acquired.isEmpty ) {
idle_start = System.nanoTime()
}
- check_finish_close
+ check_drained
}
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
Sat Dec 15 05:43:34 2012
@@ -56,6 +56,15 @@ public class QueueSettingsDTO {
public Boolean round_robin;
/**
+ * When set to true, the queue
+ * will drain the required message group consumers of messages before
+ * allowing new messages to dispatched to messages groups which have been
+ * moved to a different consumer due to re-balancing. Defaults to true.
+ */
+ @XmlAttribute(name="message_group_graceful_handoff")
+ public Boolean message_group_graceful_handoff;
+
+ /**
* Should messages be swapped out of memory if
* no consumers need the message?
*/
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
Sat Dec 15 05:43:34 2012
@@ -35,6 +35,8 @@ class OpenwireMessage(val message:Active
def toString(buffer:AnyRef) = if (buffer==null) null else buffer.toString
+ override def message_group: String = if(message.getGroupID!=null ) message.getGroupID.toString
else null
+
def getProperty(name: String) = {
name match {
case "JMSDeliveryMode" =>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Sat Dec 15 05:43:34 2012
@@ -60,6 +60,9 @@ case class StompFrameMessage(frame:Stomp
*/
var persistent = false
+ var message_group_buffer:AsciiBuffer = null
+ override def message_group = if( message_group_buffer==null ) null else message_group_buffer.toString
+
for( header <- (frame.updated_headers ::: frame.headers).reverse ) {
header match {
case (MESSAGE_ID, value) =>
@@ -70,10 +73,13 @@ case class StompFrameMessage(frame:Stomp
expiration = java.lang.Long.parseLong(value)
case (PERSISTENT, value) =>
persistent = java.lang.Boolean.parseBoolean(value)
+ case (MESSAGE_GROUP, value) =>
+ message_group_buffer = value
case _ =>
}
}
+
def getBodyAs[T](toType : Class[T]) = {
(frame.content match {
case x:BufferContent =>
@@ -386,6 +392,7 @@ object Stomp {
val RETAIN = ascii("retain")
val SET = ascii("set")
val REMOVE = ascii("remove")
+ val MESSAGE_GROUP = ascii("message_group")
val MESSAGE_ID = ascii("message-id")
val PRORITY = ascii("priority")
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Sat Dec 15 05:43:34 2012
@@ -23,6 +23,7 @@ import org.apache.activemq.apollo.broker
import java.net.{SocketTimeoutException, InetSocketAddress}
import org.apache.activemq.apollo.stomp.{Stomp, StompProtocolHandler}
import org.fusesource.hawtdispatch._
+import collection.mutable
/**
* These tests can be run in parallel against a single Apollo broker.
@@ -614,6 +615,63 @@ class StompParallelTest extends StompTes
sub1_counter should be(4)
}
+ test("Message groups are sticky to a consumer") {
+
+ val dest = next_id("/queue/msggroups")
+ connect("1.1")
+ subscribe("1", dest)
+ subscribe("2", dest)
+
+ var actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
+
+ def send_receive = {
+ for (i <- 0 until 26 ) { async_send(dest, "data", "message_group:"+('a'+i).toChar+"\n")
}
+ for (i <- 0 until 26 ) {
+ val (frame, ack) = receive_message()
+ for( sub <- List("1", "2", "3") if( frame.contains("subscription:"+sub+"\n"))
) {
+ val set = actual_mapping.getOrElseUpdate(sub, mutable.HashSet())
+ for (i <- 0 until 26 ) {
+ var c = ('a' + i).toChar
+ if( frame.contains("message_group:"+c+"\n")) {
+ set.add(c)
+ }
+ }
+ }
+ ack
+ }
+ }
+
+ send_receive
+
+ var expected_mapping = actual_mapping
+ println(expected_mapping)
+ expected_mapping.get("1").get.intersect(expected_mapping.get("2").get).isEmpty should
be(true)
+
+ actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
+ // Send more messages in, make sure they stay mapping to same consumers.
+ send_receive; send_receive; send_receive; send_receive
+
+ actual_mapping should be (expected_mapping)
+
+ // Add another subscriber, the groups should re-balance
+ subscribe("3", dest)
+
+ actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
+ send_receive
+ expected_mapping = actual_mapping
+ println(expected_mapping)
+
+ expected_mapping.get("1").get.intersect(expected_mapping.get("2").get).isEmpty should
be(true)
+ expected_mapping.get("2").get.intersect(expected_mapping.get("3").get).isEmpty should
be(true)
+ expected_mapping.get("1").get.intersect(expected_mapping.get("3").get).isEmpty should
be(true)
+
+ actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
+ // Send more messages in, make sure they stay mapping to same consumers.
+ send_receive; send_receive; send_receive; send_receive
+ actual_mapping should be (expected_mapping)
+
+ }
+
test("Queues do NOT load balance across exclusive subscribers") {
connect("1.1")
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/stomp-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/stomp-manual.md?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/stomp-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/stomp-manual.md Sat Dec
15 05:43:34 2012
@@ -549,6 +549,38 @@ to the `SUBSCRIBE` frame. Example:
^@
+### Message Groups
+
+Message Groups are an enhancement to the Exclusive Consumer feature to provide
+
+* guaranteed ordering of the processing of related messages across a single queue
+* load balancing of the processing of messages across multiple consumers
+* high availability / auto-failover to other consumers if a JVM goes down
+
+Message Groups are logically like a parallel Exclusive Consumer. Rather
+than all messages going to a single consumer, the stomp `message_group` header
+is used to define which message group the message belongs to. The Message Group feature
+then ensures that all messages for the same message group will be sent to the same
+consumer - while that consumer stays alive. As soon as the consumer dies another
+will be chosen.
+
+Another way of explaining Message Groups is that it provides sticky load balancing
+of messages across consumers; where the message group value is kinda like a HTTP
+session ID or cookie value and the message broker is acting like a HTTP load balancer.
+
+Here is an example message with the message group set:
+
+ MESSAGE
+ destination:/queue/PO.REQUEST
+ message_group:hiram
+
+ PO145
+ ^@
+
+The broker uses consistent hashing to map message groups to consumers. When you another
+subscription to a queue, the broker will first wait for messages sent to previous subscriptions
+to be processed and then the broker rebalances the message groups across consumers.
+
### Temporary Destinations
Temporary destinations are typically used to receive response messages in
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Sat Dec
15 05:43:34 2012
@@ -387,10 +387,15 @@ A `queue` element may be configured with
details. Defaults to false.
* `tail_buffer` : The amount of memory buffer space allocated for holding
-freshly enqueued message. Defaults to `640k`.
+ freshly enqueued message. Defaults to `640k`.
* `persistent` : If set to false, then the queue will not persistently
-store it's message. Defaults to true.
+ store it's message. Defaults to true.
+
+* `message_group_graceful_handoff` : When set to true, the queue
+ will drain message group consumers of messages before
+ allowing new messages to dispatched to messages groups which have been
+ moved to a different consumer due to re-balancing. Defaults to true.
* `round_robin` : Should the destination dispatch messages to consumers
using round robin distribution strategy? Defaults to true.
@@ -398,7 +403,7 @@ store it's message. Defaults to true.
consumers until those consumers start throttling the broker.
* `swap` : If set to false, then the queue will not swap messages out of
-memory. Defaults to true.
+ memory. Defaults to true.
* `swap_range_size` : The number max number of flushed queue entries to load
from the store at a time. Note that Flushed entires are just reference
Modified: activemq/activemq-apollo/trunk/apollo-website/src/index.page
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/index.page?rev=1422188&r1=1422187&r2=1422188&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/index.page (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/index.page Sat Dec 15 05:43:34 2012
@@ -31,18 +31,19 @@ multi-protocol broker and supports STOMP
## Features
-* [STOMP Protocol Support](documentation/stomp-manual.html)
-* [AMQP Protocol Support](documentation/stomp-manual.html)
+* STOMP Protocol Support
+* AMQP Protocol Support
* MQTT Protocol Support
-* [OpenWire Protocol Support](documentation/openwire-manual.html)
-* [Topics and Queues](documentation/user-manual.html#Destination_Types)
-* [Queue Browsers](documentation/user-manual.html#Browsing_Subscriptions)
-* [Durable Subscriptions on Topics](documentation/user-manual.html#Topic_Durable_Subscriptions)
+* OpenWire Protocol Support
+* [Topics and Queues](documentation/stomp-manual.html#Destination_Types)
+* [Queue Browsers](documentation/stino-manual.html#Browsing_Subscriptions)
+* [Durable Subscriptions on Topics](documentation/stomp-manual.html#Topic_Durable_Subscriptions)
* [Mirrored Queues](documentation/user-manual.html#Mirrored_Queues)
-* [Reliable Messaging](documentation/user-manual.html#Reliable_Messaging)
-* [Message Expiration](documentation/user-manual.html#Message_Expiration)
+* [Reliable Messaging](documentation/stomp-manual.html#Reliable_Messaging)
+* [Message Expiration](documentation/stomp-manual.html#Message_Expiration)
* [Message Swapping](documentation/architecture.html#Message_Swapping)
-* [Message Selectors](documentation/user-manual.html#Message_Selectors)
+* [Message Selectors](documentation/stomp-manual.html#Message_Selectors)
+* [Message Groups](documentation/stomp-manual.html#Message_Groups)
* [JAAS Authentication](documentation/user-manual.html#Authentication)
* [ACL based Authorization](documentation/user-manual.html#Authorization)
* [SSL/TLS Support](documentation/user-manual.html#Using_SSL_TLS) and Certificate based Authentication
@@ -51,8 +52,11 @@ multi-protocol broker and supports STOMP
## Documentation
- * [Getting Started Guide](documentation/getting-started.html)
- * [User Manual](documentation/user-manual.html)
- * [Migration Guide](documentation/migration-guide.html)
+* [Getting Started Guide](documentation/getting-started.html)
+* [User Manual](documentation/user-manual.html)
+* [Migration Guide](documentation/migration-guide.html)
+* [STOMP Protocol Manual](documentation/stomp-manual.html)
+* [OpenWire Protocol Manual](documentation/openwire-manual.html)
+* [AMQP Protocol Manual](documentation/amqp-manual.html)
|