activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1081828 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Date Tue, 15 Mar 2011 16:07:52 GMT
Author: chirino
Date: Tue Mar 15 16:07:51 2011
New Revision: 1081828

URL: http://svn.apache.org/viewvc?rev=1081828&view=rev
Log:
Much better session management.  First off, fix a concurrent bug caused by modifying the sessions
list from different thread.  Switch to using a set to track sessions to get constant insert
and remove time.  Don't credit sessions until the overflow sink gets drained.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1081828&r1=1081827&r2=1081828&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Tue Mar 15 16:07:51 2011
@@ -20,6 +20,7 @@ import _root_.org.fusesource.hawtdispatc
 import org.fusesource.hawtdispatch._
 import java.util.{LinkedList}
 import org.apache.activemq.apollo.transport.Transport
+import collection.mutable.{HashSet, ListBuffer}
 
 /**
  * <p>
@@ -49,8 +50,11 @@ trait Sink[T] {
    * when the sink is interested in receiving more deliveries.
    */
   var refiller:Runnable
+
+
 }
 
+
 /**
  * <p>
  * A delivery sink which is connected to a transport. It expects the caller's dispatch
@@ -147,9 +151,9 @@ object MapSink {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class SinkMux[T](val downstream:Sink[T], val queue:DispatchQueue, val sizer:Sizer[T]) {
+class SinkMux[T](val downstream:Sink[T], val consumer_queue:DispatchQueue, val sizer:Sizer[T])
{
 
-  var sessions = List[Session[T]]()
+  var sessions = HashSet[Session[T]]()
   var session_max_credits = 1024*32;
 
   val overflow = new OverflowSink[(Session[T],T)](MapSink(downstream){_._2}) {
@@ -162,48 +166,56 @@ class SinkMux[T](val downstream:Sink[T],
       session.credit_adder.merge(sizer.size(value));
     }
   }
-  // As messages are delivered, and we credit the sessions,
-  // that triggers the sessions to refill the overflow.  No
-  // need to have a refiller action.
-  overflow.refiller = NOOP
 
   // use a event aggregating source to coalesce multiple events from the same thread.
   // all the sessions send to the same source.
-  val source = createSource(new ListEventAggregator[(Session[T],T)](), queue)
+  val source = createSource(new ListEventAggregator[(Session[T],T)](), consumer_queue)
   source.setEventHandler(^{drain_source});
   source.resume
 
   def drain_source = {
     source.getData.foreach { event =>
       // overflow sinks can always accept more values.
+      val f1 = overflow.full
       overflow.offer(event)
+      if( !f1 && overflow.full ) {
+        // once we fill, we stop the credit adder sources
+        // this should stop them from sending us more messages.
+        sessions.foreach(_.credit_adder.suspend)
+      }
     }
   }
 
-  def open(producer_queue:DispatchQueue,allow_overflow:Boolean=false):Sink[T] = {
-    val session = createSession(producer_queue, session_max_credits)
-    sessions ::= session
-    if( allow_overflow ) {
-      new OverflowSink(session)
-    } else {
-      session
+  overflow.refiller = ^{
+    assert(getCurrentQueue eq consumer_queue)
+    // overflow is not full anymore.. lets release those credits so we can get more messages.
+    sessions.foreach(_.credit_adder.resume)
+  }
+
+  def open(producer_queue:DispatchQueue):Sink[T] = {
+    val session = new Session[T](producer_queue, 0, this)
+    consumer_queue <<| ^{
+      if( overflow.full ) {
+        session.credit_adder.suspend
+      }
+      session.credit_adder.merge(session_max_credits);
+      sessions += session
     }
+    session
   }
 
   def close(session:Sink[T]):Unit = {
-    session match {
-      case s:OverflowSink[T] => close(s.downstream)
-      case s:Session[T] =>
-        sessions = sessions.filterNot( _ == s )
-        s.producer_queue {
-          s.close
-        }
+    consumer_queue <<| ^{
+      session match {
+        case s:Session[T] =>
+          sessions -= s
+          s.producer_queue {
+            s.close
+          }
+      }
     }
   }
 
-  protected def createSession(producer_queue:DispatchQueue, capacity:Int) = new Session[T](producer_queue,
capacity, this)
-
-
 }
 
 /**
@@ -223,7 +235,7 @@ class Session[T](val producer_queue:Disp
   credit_adder.resume
 
   private var closed = false
-  private var _full = false
+  private var _full = credits <= 0
 
   private def add_credits(value:Int) = {
     credits += value;



Mime
View raw message