activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1374667 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Date Sat, 18 Aug 2012 22:12:57 GMT
Author: chirino
Date: Sat Aug 18 22:12:57 2012
New Revision: 1374667

URL: http://svn.apache.org/viewvc?rev=1374667&view=rev
Log:
Adjust session sinks so that they credit in larger batches when the sink is under load by
multiple producers.  It also will round robin the messages from multiple sessions to improve
delivery fairness.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1374667&r1=1374666&r2=1374667&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Sat Aug 18 22:12:57 2012
@@ -19,7 +19,8 @@ package org.apache.activemq.apollo.broke
 import org.fusesource.hawtdispatch._
 import java.util.LinkedList
 import org.fusesource.hawtdispatch.transport.Transport
-import collection.mutable.HashSet
+import collection.mutable.{ListBuffer, HashSet}
+import org.apache.activemq.apollo.util.list.{LinkedNodeList, LinkedNode}
 
 /**
  * <p>
@@ -343,31 +344,14 @@ trait SessionSinkFilter[T] extends Sessi
 class SessionSinkMux[T](val downstream:Sink[(Session[T], T)], val consumer_queue:DispatchQueue,
val sizer:Sizer[T]) {
 
   var sessions = HashSet[Session[T]]()
-  val overflow = new OverflowSink[(Session[T],T)](downstream)
-
-  def delivered(session:Session[Delivery], size:Int) = {
-    consumer_queue.assertExecuting()
-    session.credit_adder.merge((1, size));
-  }
-
-  // use a event aggregating source to coalesce multiple events from the same thread.
-  // all the sessions send to the same source.
-  val source = createSource(new ListEventAggregator[(Session[T],T)](), consumer_queue)
-  source.setEventHandler(^{drain_source});
-  source.resume
-
-  def drain_source = {
-    source.getData.foreach { event =>
-      // overflow sinks can always accept more values.
-      val f1 = overflow.full
-      overflow.offer(event)
-    }
-  }
+  var overflowed_sessions = new LinkedNodeList[SessionLinkedNode[T]]()
+  var overflow_size = 0L
+  var high_overflow_size = 64*1024
 
   def open(producer_queue:DispatchQueue, delivery_credits:Int, size_credits:Int):SessionSink[T]
= {
     val session = new Session[T](this, producer_queue)
     consumer_queue <<| ^{
-      session.credit_adder.merge((delivery_credits, size_credits));
+      session.credit(delivery_credits, size_credits);
       sessions += session
     }
     session
@@ -378,25 +362,81 @@ class SessionSinkMux[T](val downstream:S
       session match {
         case s:Session[T] =>
           sessions -= s
-          s.producer_queue {
-            s.close(rejection_handler)
-          }
+          s.close(rejection_handler)
       }
     }
   }
 
   def time_stamp = 0L
+
+  downstream.refiller = ^{ drain_overflow }
+
+  def drain_overflow:Unit = {
+    while( !overflowed_sessions.isEmpty) {
+      val session = overflowed_sessions.getHead.session
+      val value = session.overflow.getFirst()
+      if( downstream.offer((session, value)) ) {
+        session.overflow.removeFirst()
+        overflow_size -= sizer.size(value)
+        if( session.overflow.isEmpty ) {
+          session.overflow_node.unlink()
+          session.on_overflow_drain()
+          if( session.pending_delivery_credits!=0 || session.pending_size_credits!=0 ) {
+            session.credit(session.pending_delivery_credits, session.pending_size_credits)
+            session.pending_delivery_credits = 0
+            session.pending_size_credits = 0
+          }
+        } else {
+          // to fairly consume values from all sessions.
+          overflowed_sessions.rotate()
+        }
+      } else {
+        return
+      }
+    }
+  }
+
+  def delivered(session:Session[Delivery], size:Int) = {
+    if( overflow_size >= high_overflow_size && !session.overflow.isEmpty) {
+      session.pending_delivery_credits += 1
+      session.pending_size_credits += size
+    } else {
+      session.credit(1, size)
+    }
+  }
+
 }
 
+case class SessionLinkedNode[T](session:Session[T]) extends LinkedNode[SessionLinkedNode[T]]
+
 /**
  * tracks one producer to consumer session / credit window.
  */
 class Session[T](mux:SessionSinkMux[T], val producer_queue:DispatchQueue) extends SessionSink[T]
{
 
+  val overflow = new LinkedList[T]()
+  var pending_delivery_credits = 0
+  var pending_size_credits = 0
+
+  // use a event aggregating source to coalesce multiple events from the same thread.
+  val overflow_source = createSource(new ListEventAggregator[T](), mux.consumer_queue)
+  overflow_source.setEventHandler(^{
+    for( value <- overflow_source.getData ) {
+      if( overflow.isEmpty ) {
+        mux.overflowed_sessions.addLast(overflow_node);
+      }
+      overflow.add(value)
+      mux.overflow_size += sizer.size(value)
+    }
+    mux.drain_overflow
+  });
+  overflow_source.resume
+
   var refiller:Task = NOOP
+  var rejection_handler: (T)=>Unit = _
+  val overflow_node = SessionLinkedNode[T](this)
 
   private def sizer = mux.sizer
-  private def downstream = mux.source
   var delivery_credits = 0
   var size_credits = 0
 
@@ -407,6 +447,11 @@ class Session[T](mux:SessionSinkMux[T], 
   @volatile
   var enqueue_ts = mux.time_stamp
 
+  def credit(delivery_credits:Int, size_credits:Int) = {
+    mux.consumer_queue.assertExecuting()
+    credit_adder.merge((delivery_credits, size_credits))
+  }
+
   // create a source to coalesce credit events back to the producer side...
   val credit_adder = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
     def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
@@ -428,8 +473,7 @@ class Session[T](mux:SessionSinkMux[T], 
   }
   credit_adder.resume
 
-  private var rejection_handler: (T)=>Unit = _
-  
+
   private def add_credits(count:Int, size:Int) = {
     delivery_credits += count
     size_credits += size
@@ -464,17 +508,19 @@ class Session[T](mux:SessionSinkMux[T], 
         enqueue_ts = mux.time_stamp
   
         add_credits(-1, -size)
-        downstream.merge((this, value))
+        overflow_source.merge(value)
       }
       true
     }
   }
 
+  var on_overflow_drain = ()=>{}
+
   def close(rejection_handler:(T)=>Unit) = {
-    producer_queue.assertExecuting()
-    assert(this.rejection_handler==null)
-    this.rejection_handler=rejection_handler
-    refiller.run
+    producer_queue {
+      this.rejection_handler=rejection_handler
+      refiller.run
+    }
   }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1374667&r1=1374666&r2=1374667&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Sat Aug 18 22:12:57 2012
@@ -201,8 +201,9 @@ class StompParallelTest extends StompTes
     client.socket.setSoTimeout(1 * 1000)
     var block_count = 0
     try {
+      receipt_counter.set(0L)
       while (true) {
-        sync_send("/queue/quota.assured1", "%01024d".format(block_count))
+        sync_send("/queue/quota.assured1", "%01024d".format(block_count), "message-id:"+block_count+"\n")
         block_count += 1
       }
     } catch {
@@ -213,9 +214,11 @@ class StompParallelTest extends StompTes
     // Send 5 more messages which do not fit in the queue, they will be
     // held in the producer connection's delivery session buffer..
     connect("1.1")
-    for (i <- 0 until (block_count + 5)) {
-      async_send("/queue/quota.assured2", "%01024d".format(i))
+    receipt_counter.set(0L)
+    for (i <- 0 until (block_count-1)) {
+      sync_send("/queue/quota.assured2", "%01024d".format(i), "message-id:"+i+"\n")
     }
+    async_send("/queue/quota.assured2", "%01024d".format(block_count-1))
 
     // Even though we disconnect, those 5 that did not fit should still
     // get delivered once the queue unblocks..
@@ -224,7 +227,7 @@ class StompParallelTest extends StompTes
     // Lets make sure non of the messages were dropped.
     connect("1.1")
     subscribe("0", "/queue/quota.assured2")
-    for (i <- 0 until (block_count + 5)) {
+    for (i <- 0 until block_count) {
       assert_received("%01024d".format(i))
     }
 
@@ -242,8 +245,9 @@ class StompParallelTest extends StompTes
     client.socket.setSoTimeout(1 * 1000)
     var block_count = 0
     try {
+      receipt_counter.set(0L)
       while (true) {
-        sync_send("/topic/quota.assured1", "%01024d".format(block_count))
+        sync_send("/topic/quota.assured1", "%01024d".format(block_count), "message-id:"+block_count+"\n")
         block_count += 1
       }
     } catch {
@@ -253,21 +257,23 @@ class StompParallelTest extends StompTes
     close(consumer)
 
     connect("1.1", consumer)
-    subscribe("0", "/topic/quota.assured2", "client", headers = "credit:1,0\n", c = consumer)
+    subscribe("1", "/topic/quota.assured2", "client", headers = "credit:1,0\n", c = consumer)
 
     // Send 5 more messages which do not fit in the consumer buffer, they will be
     // held in the producer connection's delivery session buffer..
     connect("1.1")
-    for (i <- 0 until (block_count + 5)) {
-      async_send("/topic/quota.assured2", "%01024d".format(i))
+    receipt_counter.set(0L)
+    for (i <- 0 until (block_count-1)) {
+      sync_send("/topic/quota.assured2", "%01024d".format(i), "message-id:"+i+"\n")
     }
+    async_send("/topic/quota.assured2", "%01024d".format(block_count-1))
 
     // Even though we disconnect, those 5 that did not fit should still
     // get delivered once the queue unblocks..
     disconnect()
 
     // Lets make sure non of the messages were dropped.
-    for (i <- 0 until (block_count + 5)) {
+    for (i <- 0 until block_count) {
       assert_received("%01024d".format(i), c = consumer)(true)
     }
 



Mime
View raw message