activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1341729 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/...
Date Wed, 23 May 2012 02:25:29 GMT
Author: chirino
Date: Wed May 23 02:25:29 2012
New Revision: 1341729

URL: http://svn.apache.org/viewvc?rev=1341729&view=rev
Log:
Fixes APLO-206: Load balance of job queues when 'credit:1,0' is used on the consumer.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1341729&r1=1341728&r2=1341729&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed May 23 02:25:29 2012
@@ -529,7 +529,7 @@ class Queue(val router: LocalRouter, val
     consumer_swapped_in.size_max += amount
   }
 
-  object messages extends Sink[Delivery] {
+  object messages extends Sink[(Session[Delivery], Delivery)] {
 
     var refiller: Task = null
 
@@ -550,11 +550,12 @@ class Queue(val router: LocalRouter, val
       false
     }
 
-    def offer(delivery: Delivery): Boolean = {
+    def offer(event: (Session[Delivery], Delivery)): Boolean = {
       if (full) {
         false
       } else {
-
+        val (session, delivery) = event
+        session_manager.delivered(session, delivery.size)
         // We may need to drop this enqueue or head entries due
         // to the drop policy.
         var drop = false
@@ -1067,7 +1068,7 @@ class Queue(val router: LocalRouter, val
     override def consumer = Queue.this
 
     val session_max = producer.send_buffer_size
-    val downstream = session_manager.open(producer.dispatch_queue, session_max)
+    val downstream = session_manager.open(producer.dispatch_queue, Integer.MAX_VALUE, session_max)
 
     dispatch_queue {
       inbound_sessions += this

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1341729&r1=1341728&r2=1341729&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Wed May 23 02:25:29 2012
@@ -266,8 +266,8 @@ class SinkMux[T](val downstream:Sink[T])
 
 class CreditWindowFilter[T](val downstream:Sink[T], val sizer:Sizer[T]) extends SinkMapper[T,T]
{
 
-  var byte_credits = 0
   var delivery_credits = 0
+  var byte_credits = 0
   var disabled = true
 
   override def full: Boolean = downstream.full || ( disabled && byte_credits <=
0 && delivery_credits <= 0 )
@@ -278,12 +278,12 @@ class CreditWindowFilter[T](val downstre
   }
 
   def passing(value: T) = {
-    byte_credits -= sizer.size(value)
     delivery_credits -= 1
+    byte_credits -= sizer.size(value)
     value
   }
 
-  def credit(byte_credits:Int, delivery_credits:Int) = {
+  def credit(delivery_credits:Int, byte_credits:Int) = {
     this.byte_credits += byte_credits
     this.delivery_credits += delivery_credits
     if( !full ) {
@@ -322,10 +322,6 @@ trait SessionSinkFilter[T] extends Sessi
   def remaining_capacity = downstream.remaining_capacity
 }
 
-object SessionSinkMux {
-  val default_session_max_credits = System.getProperty("apollo.default_session_max_credits",
""+(1024*32)).toInt
-}
-
 /**
  *  <p>
  * A SinkMux multiplexes access to a target sink so that multiple
@@ -337,18 +333,14 @@ object SessionSinkMux {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class SessionSinkMux[T](val downstream:Sink[T], val consumer_queue:DispatchQueue, val sizer:Sizer[T])
{
+class SessionSinkMux[T](val downstream:Sink[(Session[T], T)], val consumer_queue:DispatchQueue,
val sizer:Sizer[T]) {
 
   var sessions = HashSet[Session[T]]()
+  val overflow = new OverflowSink[(Session[T],T)](downstream)
 
-  val overflow = new OverflowSink[(Session[T],T)](downstream.map(_._2)) {
-    // Once a value leaves the overflow, then we can credit the
-    // session so that more messages can be accepted.
-    override protected def onDelivered(event:(Session[T],T)) = {
-      val session = event._1
-      val value = event._2
-      session.credit_adder.merge(sizer.size(value));
-    }
+  def delivered(session:Session[Delivery], size:Int) = {
+    consumer_queue.assertExecuting()
+    session.credit_adder.merge((1, size));
   }
 
   // use a event aggregating source to coalesce multiple events from the same thread.
@@ -365,10 +357,10 @@ class SessionSinkMux[T](val downstream:S
     }
   }
 
-  def open(producer_queue:DispatchQueue, credits:Int=SessionSinkMux.default_session_max_credits):SessionSink[T]
= {
-    val session = new Session[T](producer_queue, 0, this)
+  def open(producer_queue:DispatchQueue, delivery_credits:Int, size_credits:Int):SessionSink[T]
= {
+    val session = new Session[T](this, producer_queue)
     consumer_queue <<| ^{
-      session.credit_adder.merge(credits);
+      session.credit_adder.merge((delivery_credits, size_credits));
       sessions += session
     }
     session
@@ -392,12 +384,14 @@ class SessionSinkMux[T](val downstream:S
 /**
  * tracks one producer to consumer session / credit window.
  */
-class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SessionSinkMux[T])
extends SessionSink[T] {
+class Session[T](mux:SessionSinkMux[T], val producer_queue:DispatchQueue) extends SessionSink[T]
{
 
   var refiller:Task = NOOP
 
   private def sizer = mux.sizer
   private def downstream = mux.source
+  var delivery_credits = 0
+  var size_credits = 0
 
   @volatile
   var enqueue_item_counter = 0L
@@ -407,19 +401,31 @@ class Session[T](val producer_queue:Disp
   var enqueue_ts = mux.time_stamp
 
   // create a source to coalesce credit events back to the producer side...
-  val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
+  val credit_adder = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
+    def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
+      if( previous == null ) {
+        event
+      } else {
+        mergeEvents(previous, event)
+      }
+    }
+    def mergeEvents(previous:(Int, Int), event:(Int, Int)) = (previous._1+event._1, previous._2+event._2)
+  }, producer_queue)
+
   credit_adder.onEvent{
-    add_credits(credit_adder.getData.intValue)
+    val (count, size) = credit_adder.getData
+    add_credits(count, size)
+    if( (size > 0 || count>0) && !_full ) {
+      refiller.run
+    }
   }
   credit_adder.resume
 
   private var rejection_handler: (T)=>Unit = _
   
-  private def add_credits(value:Int) = {
-    credits += value;
-    if( value > 0 && !_full ) {
-      refiller.run
-    }
+  private def add_credits(count:Int, size:Int) = {
+    delivery_credits += count
+    size_credits += size
   }
 
   ///////////////////////////////////////////////////
@@ -427,14 +433,14 @@ class Session[T](val producer_queue:Disp
   // producer serial dispatch queue
   ///////////////////////////////////////////////////
 
-  def remaining_capacity = credits
+  def remaining_capacity = size_credits
 
   override def full = {
     producer_queue.assertExecuting()
     _full
   }
   
-  def _full = credits <= 0 && rejection_handler == null
+  def _full = ( size_credits <= 0 || delivery_credits<=0 ) && rejection_handler
== null
 
   override def offer(value: T) = {
     producer_queue.assertExecuting()
@@ -450,7 +456,7 @@ class Session[T](val producer_queue:Disp
         enqueue_size_counter += size
         enqueue_ts = mux.time_stamp
   
-        add_credits(-size)
+        add_credits(-1, -size)
         downstream.merge((this, value))
       }
       true

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1341729&r1=1341728&r2=1341729&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Wed May 23 02:25:29 2012
@@ -62,6 +62,10 @@ object OpenwireProtocolHandler extends L
   DEFAULT_WIREFORMAT_SETTINGS.setMaxFrameSize(OpenWireFormat.DEFAULT_MAX_FRAME_SIZE);
 
   val WAITING_ON_CLIENT_REQUEST = ()=> "client request"
+
+  object SessionDeliverySizer extends Sizer[(Session[Delivery], Delivery)] {
+    def size(value: (Session[Delivery], Delivery)) = Delivery.size(value._2)
+  }
 }
 
 /**
@@ -818,7 +822,9 @@ class OpenwireProtocolHandler extends Pr
     var addresses:Array[_ <: BindAddress] = _
 
     val consumer_sink = sink_manager.open()
-    val credit_window_filter = new CreditWindowFilter[Delivery](consumer_sink.map { delivery
=>
+    val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.map
{ event =>
+      val (session, delivery) = event
+      session_manager.delivered(session, delivery.size)
       val dispatch = new MessageDispatch
       dispatch.setConsumerId(info.getConsumerId)
       if( delivery.message eq EndOfBrowseMessage ) {
@@ -832,11 +838,11 @@ class OpenwireProtocolHandler extends Pr
       }
       messages_sent += 1
       dispatch
-    }, Delivery)
+    }, SessionDeliverySizer)
 
-    credit_window_filter.credit(0, info.getPrefetchSize)
+    credit_window_filter.credit(info.getPrefetchSize, 0)
 
-    val session_manager = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue,
Delivery) {
+    val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter,
dispatchQueue, Delivery) {
       override def time_stamp = broker.now
     }
 
@@ -943,7 +949,7 @@ class OpenwireProtocolHandler extends Pr
       producer.dispatch_queue.assertExecuting()
       retain
 
-      val downstream = session_manager.open(producer.dispatch_queue, buffer_size)
+      val downstream = session_manager.open(producer.dispatch_queue, info.getCurrentPrefetchSize.max(1),
buffer_size)
       var closed = false
 
       def consumer = ConsumerContext.this
@@ -1018,7 +1024,7 @@ class OpenwireProtocolHandler extends Pr
     val ack_source = createSource(EventAggregators.INTEGER_ADD, dispatch_queue)
     ack_source.setEventHandler(^ {
       val data = ack_source.getData
-      credit_window_filter.credit(0, data)
+      credit_window_filter.credit(data, 0)
     });
     ack_source.resume
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1341729&r1=1341728&r2=1341729&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed May 23 02:25:29 2012
@@ -69,6 +69,11 @@ object StompProtocolHandler extends Log 
   var inbound_heartbeat = DEFAULT_INBOUND_HEARTBEAT
 
   val WAITING_ON_CLIENT_REQUEST = ()=> "client request"
+
+  object SessionDeliverySizer extends Sizer[(Session[Delivery], Delivery)] {
+    def size(value: (Session[Delivery], Delivery)) = Delivery.size(value._2)
+  }
+
 }
 
 /**
@@ -129,6 +134,8 @@ class StompProtocolHandler extends Proto
     message.asInstanceOf[StompFrameMessage].id
   }
 
+  case class InitialCreditWindow(count:Int,size:Int,auto_credit:Boolean)
+
   class StompConsumer (
 
     val subscription_id:Option[AsciiBuffer],
@@ -137,7 +144,7 @@ class StompProtocolHandler extends Proto
     val selector:(String, BooleanExpression),
     override val browser:Boolean,
     override val exclusive:Boolean,
-    val initial_credit_window:(Int,Int, Boolean),
+    val initial_credit_window:InitialCreditWindow,
     val include_seq:Option[AsciiBuffer],
     val from_seq:Long,
     override val close_on_drain:Boolean
@@ -194,7 +201,7 @@ class StompProtocolHandler extends Proto
     credit_window_source.resume
 
     trait AckHandler {
-      def track(delivery:Delivery):Unit
+      def track(event:(Session[Delivery], Delivery)):Unit
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit
       def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
       def close:Unit
@@ -205,7 +212,9 @@ class StompProtocolHandler extends Proto
 
       def close = { closed  = true}
 
-      def track(delivery:Delivery) = {
+      def track(event:(Session[Delivery], Delivery)) = {
+        val (session, delivery) = event
+        session_manager.delivered(session, delivery.size)
         if( closed ) {
           if( delivery.ack!=null ) {
             delivery.ack(Undelivered, null)
@@ -215,7 +224,7 @@ class StompProtocolHandler extends Proto
             delivery.ack(Consumed, null)
           }
           if( !dead ) {
-            credit_window_source.merge((delivery.size, 1))
+            credit_window_source.merge((1, delivery.size))
           }
         }
       }
@@ -229,7 +238,7 @@ class StompProtocolHandler extends Proto
 
     }
 
-    class TrackedAck(var credit:Option[Int], val ack:(DeliveryResult, StoreUOW)=>Unit)
+    class TrackedAck(var credit:Option[(Session[Delivery], Int)], val ack:(DeliveryResult,
StoreUOW)=>Unit)
 
     class SessionAckHandler extends AckHandler{
       var consumer_acks = ListBuffer[(AsciiBuffer, TrackedAck)]()
@@ -244,8 +253,9 @@ class StompProtocolHandler extends Proto
         consumer_acks = null
       }
 
-      def track(delivery:Delivery) = {
+      def track(event:(Session[Delivery], Delivery)) = {
         queue.assertExecuting()
+        val (session, delivery) = event
         if( consumer_acks == null ) {
           // It can happen if we get closed.. but destination is still sending data..
           if( delivery.ack!=null ) {
@@ -256,13 +266,17 @@ class StompProtocolHandler extends Proto
             // register on the connection since 1.0 acks may not include the subscription
id
             connection_ack_handlers += ( id(delivery.message) -> this )
           }
-          consumer_acks += id(delivery.message) -> new TrackedAck(Some(delivery.size),
delivery.ack )
+          if( initial_credit_window.auto_credit ) {
+            consumer_acks += id(delivery.message) -> new TrackedAck(Some((session, delivery.size)),
delivery.ack )
+          } else {
+            session_manager.delivered(session, delivery.size)
+          }
         }
       }
 
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
         queue.assertExecuting()
-        if( initial_credit_window._3 ) {
+        if( initial_credit_window.auto_credit ) {
           var found = false
           val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
             if( id == msgid ) {
@@ -275,7 +289,8 @@ class StompProtocolHandler extends Proto
 
           for( (id, delivery) <- acked ) {
             for( credit <- delivery.credit ) {
-              credit_window_source.merge((credit, 1))
+              session_manager.delivered(credit._1, credit._2)
+              credit_window_source.merge((1, credit._2))
               delivery.credit = None
             }
           }
@@ -332,8 +347,9 @@ class StompProtocolHandler extends Proto
         consumer_acks = null
       }
 
-      def track(delivery:Delivery) = {
+      def track(event:(Session[Delivery], Delivery)) = {
         queue.assertExecuting();
+        val (session, delivery) = event
         if( consumer_acks == null ) {
           // It can happen if we get closed.. but destination is still sending data..
           if( delivery.ack!=null ) {
@@ -344,16 +360,21 @@ class StompProtocolHandler extends Proto
             // register on the connection since 1.0 acks may not include the subscription
id
             connection_ack_handlers += ( id(delivery.message) -> this )
           }
-          consumer_acks += id(delivery.message) -> new TrackedAck(Some(delivery.size),
delivery.ack)
+          if( initial_credit_window.auto_credit ) {
+            consumer_acks += id(delivery.message) -> new TrackedAck(Some((session, delivery.size)),
delivery.ack)
+          } else {
+            session_manager.delivered(session, delivery.size)
+          }
         }
       }
 
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
         queue.assertExecuting()
-        if( initial_credit_window._3 ) {
+        if( initial_credit_window.auto_credit ) {
           for( delivery <- consumer_acks.get(msgid)) {
             for( credit <- delivery.credit ) {
-              credit_window_source.merge((credit,1))
+              session_manager.delivered(credit._1, credit._2)
+              credit_window_source.merge((1, credit._2))
               delivery.credit = None
             }
           }
@@ -391,8 +412,9 @@ class StompProtocolHandler extends Proto
     }
 
     val consumer_sink = sink_manager.open()
-    val credit_window_filter = new CreditWindowFilter[Delivery](consumer_sink.map { delivery
=>
-      ack_handler.track(delivery)
+    val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.map
{ event =>
+      ack_handler.track(event)
+      val (_, delivery) = event
 
       val message = delivery.message
       var frame = if( message.protocol eq StompProtocol ) {
@@ -423,11 +445,11 @@ class StompProtocolHandler extends Proto
       }
       messages_sent += 1
       frame
-    }, Delivery)
+    }, SessionDeliverySizer)
 
-    credit_window_filter.credit(initial_credit_window._1, initial_credit_window._2)
+    credit_window_filter.credit(initial_credit_window.count, initial_credit_window.size)
 
-    val session_manager = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue,
Delivery) {
+    val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter,
dispatchQueue, Delivery) {
       override def time_stamp = broker.now
     }
 
@@ -479,7 +501,7 @@ class StompProtocolHandler extends Proto
       producer.dispatch_queue.assertExecuting()
       retain
 
-      val downstream = session_manager.open(producer.dispatch_queue, buffer_size)
+      val downstream = session_manager.open(producer.dispatch_queue, initial_credit_window.count.max(1),
buffer_size)
 
       override def toString = "connection to "+StompProtocolHandler.this.connection.transport.getRemoteAddress
 
@@ -613,9 +635,7 @@ class StompProtocolHandler extends Proto
     config.die_delay.getOrElse(DEFAULT_DIE_DELAY)
   }
 
-  def buffer_size = {
-    MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
-  }
+  lazy val buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
 
   override def set_connection(connection: BrokerConnection) = {
     super.set_connection(connection)
@@ -1276,15 +1296,16 @@ class StompProtocolHandler extends Proto
       case Some(value) =>
         value.toString.split(",").toList match {
           case x :: Nil =>
-            (buffer_size, x.toInt, true)
+            InitialCreditWindow(x.toInt, buffer_size, true)
           case x :: y :: Nil =>
-            (y.toInt, x.toInt, true)
+            InitialCreditWindow(x.toInt, y.toInt, true)
           case x :: y :: z :: _ =>
-            (y.toInt, x.toInt, z.toBoolean)
-          case _ => (buffer_size, 1, true)
+            InitialCreditWindow(x.toInt, y.toInt, z.toBoolean)
+          case _ =>
+            InitialCreditWindow(buffer_size, buffer_size, true)
         }
       case None =>
-        (buffer_size, 1, true)
+        InitialCreditWindow(buffer_size, buffer_size, true)
     }
 
     val selector = get(headers, SELECTOR) match {
@@ -1399,9 +1420,9 @@ class StompProtocolHandler extends Proto
       case Some(value) =>
         value.toString.split(",").toList match {
           case x :: Nil =>
-            (0, x.toInt)
+            (x.toInt, 0)
           case x :: y :: _ =>
-            (y.toInt, x.toInt)
+            (x.toInt, y.toInt)
           case _ => (0,0)
         }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1341729&r1=1341728&r2=1341729&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Wed May 23 02:25:29 2012
@@ -570,6 +570,27 @@ class StompPersistentQueueTest extends S
 
 class StompDestinationTest extends StompTestSupport {
 
+  test("APLO-206 - Load balance of job queues using small consumer credit windows") {
+    connect("1.1")
+
+    for( i <- 1 to 4) {
+      async_send("/queue/load-balanced2", i)
+    }
+
+    subscribe("1", "/queue/load-balanced2", "client", false, "credit:1,0\n")
+    val ack1 = assert_received(1, "1")
+
+    subscribe("2", "/queue/load-balanced2", "client", false, "credit:1,0\n")
+    val ack2 = assert_received(2, "2")
+
+    // Ok lets ack now..
+    ack1(true)
+    val ack3 = assert_received(3, "1")
+
+    ack2(true)
+    val ack4 = assert_received(4, "2")
+  }
+
   test("Browsing queues does not cause AssertionError.  Reported in APLO-156") {
     connect("1.1")
     subscribe("0", "/queue/TOOL.DEFAULT")
@@ -881,33 +902,11 @@ class StompDestinationTest extends Stomp
 
   test("Queues load balance across subscribers") {
     connect("1.1")
-
-    // Connect to subscribers
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/queue/load-balanced\n" +
-      "id:1\n" +
-      "\n")
-
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/queue/load-balanced\n" +
-      "receipt:0\n"+
-      "id:2\n" +
-      "\n")
-
-    wait_for_receipt("0")
-
-    def put(id:Int) = {
-      client.write(
-        "SEND\n" +
-        "destination:/queue/load-balanced\n" +
-        "\n" +
-        "message:"+id+"\n")
-    }
+    subscribe("1", "/queue/load-balanced")
+    subscribe("2", "/queue/load-balanced")
 
     for( i <- 0 until 4) {
-      put(i)
+      async_send("/queue/load-balanced", "message:"+i)
     }
 
     var sub1_counter=0



Mime
View raw message