activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961115 - in /activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker: BrokerDatabase.scala Delivery.scala Queue.scala Sink.scala VirtualHost.scala
Date Wed, 07 Jul 2010 03:59:25 GMT
Author: chirino
Date: Wed Jul  7 03:59:25 2010
New Revision: 961115

URL: http://svn.apache.org/viewvc?rev=961115&view=rev
Log:
Beefed up queue implementation to support browsers and consumers consuming at different positions
in the queue.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala?rev=961115&r1=961114&r2=961115&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
Wed Jul  7 03:59:25 2010
@@ -23,6 +23,9 @@ import org.apache.activemq.util.TreeMap
 import java.util.concurrent.atomic.{AtomicLong}
 import java.util.{HashSet}
 import collection.JavaConversions
+import org.fusesource.hawtdispatch.{BaseRetained, Retained}
+
+case class StoredMessageRef(id:Long) extends BaseRetained
 
 class Record
 case class QueueRecord(val id:Long, val name:AsciiBuffer, val parent:AsciiBuffer, val config:String)
extends Record
@@ -78,15 +81,15 @@ class BrokerDatabase(host:VirtualHost) {
   }
 
   private val msg_id_generator = new AtomicLong
+
   def createMessageRecord(msgId:AsciiBuffer, encoding:AsciiBuffer, message:Buffer) = {
     val record = new MessageRecord(msg_id_generator.incrementAndGet, msgId, encoding, message)
     messages.add(record)
+    StoredMessageRef(record.id)
   }
 
-
-
   case class QueueData(val record:QueueRecord) {
-    var messges:List[Long] = Nil
+    var messges = new TreeMap[Long, Long]()
   }
 
   private object queues {
@@ -116,7 +119,7 @@ class BrokerDatabase(host:VirtualHost) {
         if( qd.messges.isEmpty ) {
           QueueInfo(qd.record, -1, -1, 0)
         } else {
-          QueueInfo(qd.record, qd.messges.head, qd.messges.last, qd.messges.size)
+          QueueInfo(qd.record, qd.messges.firstKey, qd.messges.lastKey, qd.messges.size)
         }
       )
     }
@@ -133,4 +136,14 @@ class BrokerDatabase(host:VirtualHost) {
     }
   } >>: queues.dispatchQueue
 
+  def addMessageToQueue(queue:Long, seq:Long, msg:StoredMessageRef) = using(msg) {
+    val qd = queues.records.get(queue);
+    qd.messges.put(seq, msg.id)
+  } >>: queues.dispatchQueue
+
+  def removeMessageFromQueue(queue:Long, seq:Long, retained:Retained) = using(retained) {
+    val qd = queues.records.get(queue);
+    qd.messges.remove(seq)
+  } >>: queues.dispatchQueue
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961115&r1=961114&r2=961115&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Wed Jul  7 03:59:25 2010
@@ -46,7 +46,7 @@ trait DeliveryProducer {
  */
 trait DeliveryConsumer extends Retained {
   def dispatchQueue:DispatchQueue;
-  def matches(message:Delivery)
+  def matches(message:Delivery):Boolean
   def connect(producer:DeliveryProducer):Session
 }
 
@@ -110,8 +110,6 @@ trait Message {
 
 }
 
-case class StoredMessageRef(id:Long) extends BaseRetained
-
 /**
  * <p>
  * A new Delivery object is created every time a message is transfered between a producer
and

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961115&r1=961114&r2=961115&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed Jul  7 03:59:25 2010
@@ -17,53 +17,54 @@
 package org.apache.activemq.apollo.broker
 
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import java.util.{LinkedList}
+import org.apache.activemq.util.TreeMap
 import collection.{SortedMap}
-import java.util.LinkedList
-import org.fusesource.hawtdispatch.{EventAggregators, DispatchQueue, BaseRetained}
+import org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue, BaseRetained}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait QueueLifecyleListener {
 
-    /**
-     * A destination has bean created
-     *
-     * @param queue
-     */
-    def onCreate(queue:Queue);
-
-    /**
-     * A destination has bean destroyed
-     *
-     * @param queue
-     */
-    def onDestroy(queue:Queue);
+  /**
+   * A destination has bean created
+   *
+   * @param queue
+   */
+  def onCreate(queue: Queue);
 
-}
+  /**
+   * A destination has bean destroyed
+   *
+   * @param queue
+   */
+  def onDestroy(queue: Queue);
 
-object QueueEntry extends Sizer[QueueEntry] {
-  def size(value:QueueEntry):Int = value.delivery.size
 }
-class QueueEntry(val seq:Long, val delivery:Delivery)
+
 
 object Queue extends Log {
-  val maxOutboundSize = 1024*1204*5
+  val maxOutboundSize = 1024 * 1204 * 5
 }
 
 /**
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val destination:Destination, val storeId:Long) extends BaseRetained with Route
with DeliveryConsumer with DeliveryProducer with DispatchLogging {
+class Queue(val host: VirtualHost, val destination: Destination, val storeId: Long) extends
BaseRetained with Route with DeliveryConsumer with DispatchLogging {
   override protected def log = Queue
 
-  override val dispatchQueue:DispatchQueue = createQueue("queue:"+destination);
+  import Queue._
+
+  var consumerSubs = Map[DeliveryConsumer, Subscription]()
+
+  override val dispatchQueue: DispatchQueue = createQueue("queue:" + destination);
   dispatchQueue.setTargetQueue(getRandomThreadQueue)
   dispatchQueue {
-    debug("created queue for: "+destination)
+    debug("created queue for: " + destination)
   }
-  setDisposer(^{
+  setDisposer(^ {
     dispatchQueue.release
     session_manager.release
   })
@@ -71,62 +72,106 @@ class Queue(val destination:Destination,
   // sequence numbers.. used to track what's in the store.
   var first_seq = -1L
   var last_seq = -1L
-  var message_seq_counter=0L
+  var message_seq_counter = 0L
   var count = 0
 
-  val pending = new QueueSink[QueueEntry](QueueEntry)
-  val session_manager = new SinkMux[Delivery](MapSink(pending){ x=>accept(x) }, dispatchQueue,
Delivery)
-  pending.drainer = ^{ drain }
+  var maxSize: Int = 1024 * 32
 
+  val headEntry = new PagedEntry(this)
+  headEntry.seq = -1
+  var tailEntry = new PagedEntry(this)
 
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Implementation of the Route trait.  Allows consumers to bind/unbind
-  // from this queue so that it can send messages to them.
-  //
-  /////////////////////////////////////////////////////////////////////
+  object messages extends Sink[PagedEntry] {
+    var counter = 0
+    val entries = new TreeMap[Long, PagedEntry]()
+    entries.put(headEntry.seq, headEntry)
+
+    private var size = 0
+    var refiller: Runnable = null
 
-  class ConsumerState(val session:Session) extends Runnable {
-    session.refiller = this
-    var bound=true
-    var ready=true
-
-    def run() = {
-      if( bound && !ready ) {
-        ready = true
-        readyConsumers.addLast(this)
-        drain
+    def full = size >= maxSize
+
+    def offer(value: PagedEntry): Boolean = {
+
+      if (full) {
+        false
+      } else {
+
+        val ref = value.delivery.ref
+        if (ref != null) {
+          host.database.addMessageToQueue(storeId, value.seq, ref)
+          ref.release
+        }
+
+        size += value.delivery.size
+        entries.put(value.seq, value)
+        counter += 1;
+
+        if( !value.isEmpty ) {
+          value.dispatch
+        }
+        true
       }
     }
-  }
 
-  var allConsumers = Map[DeliveryConsumer,ConsumerState]()
-  val readyConsumers = new LinkedList[ConsumerState]()
-
-  def connected(consumers:List[DeliveryConsumer]) = bind(consumers)
-  def bind(consumers:List[DeliveryConsumer]) = retaining(consumers) {
-      for ( consumer <- consumers ) {
-        val cs = new ConsumerState(consumer.connect(Queue.this))
-        allConsumers += consumer->cs
-        readyConsumers.addLast(cs)
-      }
-      drain
-    } >>: dispatchQueue
-
-  def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
-      for ( consumer <- consumers ) {
-        allConsumers.get(consumer) match {
-          case Some(cs)=>
-            cs.bound = false
-            cs.session.close
-            allConsumers -= consumer
-            readyConsumers.remove(cs)
-          case None=>
-        }
+    def ack(value: PagedEntry) = {
+
+      if (value.delivery.ref != null) {
+        host.database.removeMessageFromQueue(storeId, value.seq, null)
       }
-    } >>: dispatchQueue
 
-  def disconnected() = throw new RuntimeException("unsupported")
+      counter -= 1
+      size -= value.delivery.size
+
+      value.delivery = null
+
+      // acked entries turn into a tombstone entry..  adjacent tombstones
+      // aggregate into a single entry.
+      var current = entries.getEntry(value.seq)
+      assert(current != null)
+
+      // Merge /w previous if possible
+      var adj = current.previous
+      if (adj.getValue.mergeTomestone(current.getValue)) {
+        entries.removeEntry(current)
+        current = adj
+      }
+
+      // Merge /w next if possible
+      adj = current.next
+      if (adj != null && current.getValue.mergeTomestone(adj.getValue)) {
+        entries.removeEntry(adj)
+      }
+
+
+      if (counter == 0) {
+        refiller.run
+      }
+    }
+
+
+    def nack(values: List[PagedEntry]) = {
+      for (v <- values) {
+        v.unaquire;
+        // TODO: re-dispatch em.
+      }
+    }
+
+  }
+
+  val session_manager = new SinkMux[Delivery](MapSink(messages) {x => accept(x)}, dispatchQueue,
Delivery)
+
+  val ack_source = createSource(new ListEventAggregator[(Subscription, Delivery)](), dispatchQueue)
+  ack_source.setEventHandler(^ {drain_acks});
+  ack_source.resume
+
+  def drain_acks = {
+    ack_source.getData.foreach {
+      event =>
+        event._1._ack(event._2)
+    }
+  }
+
 
   /////////////////////////////////////////////////////////////////////
   //
@@ -135,12 +180,13 @@ class Queue(val destination:Destination,
   //
   /////////////////////////////////////////////////////////////////////
 
-  def matches(message:Delivery) = { true }
+  def matches(message: Delivery) = {true}
 
-  def connect(p:DeliveryProducer) = new Session {
+  def connect(p: DeliveryProducer) = new Session {
     retain
 
     override def consumer = Queue.this
+
     override def producer = p
 
     val session = session_manager.open(producer.dispatchQueue)
@@ -152,8 +198,9 @@ class Queue(val destination:Destination,
 
     // Delegate all the flow control stuff to the session
     def full = session.full
-    def offer(value:Delivery) = {
-      if( session.full ) {
+
+    def offer(value: Delivery) = {
+      if (session.full) {
         false
       } else {
         val rc = session.offer(sent(value))
@@ -161,28 +208,41 @@ class Queue(val destination:Destination,
         true
       }
     }
-    
+
     def refiller = session.refiller
-    def refiller_=(value:Runnable) = { session.refiller=value }
-  }
 
+    def refiller_=(value: Runnable) = {session.refiller = value}
+  }
 
   /////////////////////////////////////////////////////////////////////
   //
-  // Implementation of the DeliveryProducer trait.
-  // It mainly deals with handling message acks from bound consumers.
+  // Implementation of the Route trait.  Allows consumers to bind/unbind
+  // from this queue so that it can send messages to them.
   //
   /////////////////////////////////////////////////////////////////////
 
-  val ack_source = createSource(EventAggregators.INTEGER_ADD, dispatchQueue)
-  ack_source.setEventHandler(^{drain_acks});
-  ack_source.resume
-  def drain_acks = {
-    pending.ack(ack_source.getData.intValue)
-  }
-  override def ack(ack:Delivery) = {
-    ack_source.merge(ack.size)
-  }
+  def connected(consumers: List[DeliveryConsumer]) = bind(consumers)
+
+  def bind(consumers: List[DeliveryConsumer]) = retaining(consumers) {
+    for (consumer <- consumers) {
+      val subscription = new Subscription(this)
+      subscription.connect(consumer)
+      consumerSubs += consumer -> subscription
+    }
+  } >>: dispatchQueue
+
+  def unbind(consumers: List[DeliveryConsumer]) = releasing(consumers) {
+    for (consumer <- consumers) {
+      consumerSubs.get(consumer) match {
+        case Some(cs) =>
+          cs.close
+          consumerSubs -= consumer
+        case None =>
+      }
+    }
+  } >>: dispatchQueue
+
+  def disconnected() = throw new RuntimeException("unsupported")
 
   /////////////////////////////////////////////////////////////////////
   //
@@ -195,8 +255,10 @@ class Queue(val destination:Destination,
    * processed by the queues' thread.. therefore we don't
    * yet know the order of the delivery in the queue.
    */
-  private def sent(delivery:Delivery) = {
-    if( delivery.ref !=null ) {
+  private def sent(delivery: Delivery) = {
+    if (delivery.ref != null) {
+      // retain the persistent ref so that the delivery is not
+      // considered completed until this queue stores it
       delivery.ref.retain
     }
     delivery
@@ -206,30 +268,16 @@ class Queue(val destination:Destination,
    * Called from the queue thread.  At this point we
    * know the order.  Converts the delivery to a QueueEntry
    */
-  private def accept(delivery:Delivery) = {
+  private def accept(delivery: Delivery) = {
     val d = delivery.copy
     d.ack = true
-    new QueueEntry(next_message_seq, d)
+    val rc = tailEntry
+    tailEntry = new PagedEntry(this)
+    rc.seq = next_message_seq
+    rc.delivery = d
+    rc
   }
 
-  /**
-   * Dispatches as many messages to ready consumers
-   * as possible.
-   */
-  private def drain: Unit = {
-    while (!readyConsumers.isEmpty && !pending.isEmpty) {
-      val cs = readyConsumers.removeFirst
-      val queueEntry = pending.poll
-      if( cs.session.offer(queueEntry.delivery) ) {
-        // consumer was not full.. keep him in the ready list
-        readyConsumers.addLast(cs)
-      } else {
-        // consumer full.
-        cs.ready = false
-        pending.unpoll(queueEntry)
-      }
-    }
-  }
 
   private def next_message_seq = {
     val rc = message_seq_counter
@@ -239,3 +287,242 @@ class Queue(val destination:Destination,
 
 
 }
+
+object PagedEntry extends Sizer[PagedEntry] {
+  def size(value: PagedEntry): Int = value.delivery.size
+
+}
+class PagedEntry(val queue:Queue) extends Comparable[PagedEntry] with Runnable {
+  def compareTo(o: PagedEntry) = {
+    (seq - o.seq).toInt
+  }
+
+  var delivery: Delivery = null
+  var seq: Long = -1
+  var count: Long = 1
+  var aquired = false
+  var competing:List[Subscription] = Nil
+  var browsing:List[Subscription] = Nil
+
+  def aquire() = {
+    if (aquired) {
+      false
+    } else {
+      aquired = true
+      true
+    }
+  }
+
+  def unaquire() = {
+    assert(aquired)
+    aquired = false
+  }
+
+
+  def mergeTomestone(next: PagedEntry): Boolean = {
+    if (tomestone && next.tomestone && seq + count == next.seq) {
+      count += next.count
+      if( next.browsing!=Nil || next.competing!=Nil ){
+        addBrowsing(next.browsing)
+        addCompeting(next.competing)
+        next.browsing = Nil
+        next.competing = Nil
+      }
+      true
+    } else {
+      false
+    }
+  }
+
+  def tomestone = {
+    delivery == null
+  }
+
+  def isEmpty = competing == Nil && browsing == Nil
+
+
+  def run() = {
+    var next = dispatch()
+    while( next!=null ) {
+      next = next.dispatch
+    }
+  }
+
+  def dispatch():PagedEntry = {
+
+    if( this == queue.tailEntry ) {
+
+      // The tail entry does not hold data..
+      null
+
+    } else if( this == queue.headEntry ) {
+
+      // The head entry does not hold any data.. so just move
+      // any assigned subs to the next entry.
+      
+      val p = nextEntry
+      p.addBrowsing(browsing)
+      p.addCompeting(competing)
+      browsing = Nil
+      competing = Nil
+      p
+
+    } else {
+
+      var browsingSlowSubs:List[Subscription] = Nil
+      var browsingFastSubs:List[Subscription] = Nil
+      var competingSlowSubs:List[Subscription] = Nil
+      var competingFastSubs:List[Subscription] = Nil
+
+      if( browsing!=Nil ) {
+        browsing.foreach { sub =>
+          if (sub.matches(this)) {
+            if (sub.offer(this)) {
+              browsingFastSubs ::= sub
+            } else {
+              browsingSlowSubs ::= sub
+            }
+          } else {
+            browsingFastSubs ::= sub
+          }
+        }
+      }
+
+      if( competing!=Nil ) {
+        if (!this.aquired) {
+          this.aquire()
+
+          var picked: Subscription = null
+          var remaining = competing
+          while( remaining!=Nil && picked == null ) {
+            val sub = remaining.head
+            remaining = remaining.drop(1)
+
+            if (sub.matches(this)) {
+              competingSlowSubs = competingSlowSubs ::: sub :: Nil
+              if (sub.offer(this)) {
+                picked = sub
+              }
+            } else {
+              competingFastSubs = competingFastSubs ::: sub :: Nil
+            }
+          }
+
+          if (picked == null) {
+            this.unaquire()
+          } else {
+            competingFastSubs = remaining ::: competingFastSubs ::: competingSlowSubs
+            competingSlowSubs = Nil
+          }
+        } else {
+          competingFastSubs = competing
+        }
+      }
+
+      // The slow subs stay on this entry..
+      browsing = browsingSlowSubs
+      competing = competingSlowSubs
+
+      // the fast subs move on to the next entry...
+      if ( browsingFastSubs!=null &&  competingFastSubs!=null) {
+        val p = nextEntry
+        p.addBrowsing(browsingFastSubs)
+        p.addCompeting(competingFastSubs)
+        p
+      } else {
+        null
+      }
+    }
+  }
+
+  def addBrowsing(l:List[Subscription]) = {
+    l.foreach(x=>x.position(this))
+    browsing :::= l
+  }
+
+  def addCompeting(l:List[Subscription]) = {
+    l.foreach(x=>x.position(this))
+    competing :::= l
+  }
+
+  def removeBrowsing(s:Subscription) = {
+    s.position(null)
+    browsing = browsing.filterNot(_ == s)
+  }
+
+  def removeCompeting(s:Subscription) = {
+    s.position(null)
+    competing = competing.filterNot(_ == s)
+  }
+
+  def nextEntry():PagedEntry = {
+    var entry = queue.messages.entries.get(this.seq + 1)
+    if (entry == null) {
+      entry = queue.tailEntry
+    }
+    entry
+  }
+
+}
+
+class Subscription(queue:Queue) extends DeliveryProducer {
+
+  def dispatchQueue = queue.dispatchQueue
+
+  var dispatched = List[PagedEntry]()
+  var session: Session = null
+  var pos:PagedEntry = null
+
+  def position(value:PagedEntry):Unit = {
+    pos = value
+    session.refiller = pos
+  }
+
+  def connect(consumer: DeliveryConsumer) = {
+    session = consumer.connect(this)
+    queue.headEntry.addCompeting(this :: Nil)
+    queue.dispatchQueue << queue.headEntry
+  }
+
+  def close() = {
+    pos.removeCompeting(this)
+    session.close
+    session = null
+    queue.messages.nack(dispatched)
+  }
+
+  def matches(entry:PagedEntry) = session.consumer.matches(entry.delivery)
+
+  def offer(entry:PagedEntry) = {
+    if (session.offer(entry.delivery)) {
+      dispatched = dispatched ::: entry :: Nil
+      true
+    } else {
+      false
+    }
+  }
+
+  // called from the consumer thread.. send it to the ack_source
+  // do that it calls _ack from the queue thread.
+  override def ack(delivery: Delivery) = {
+    queue.ack_source.merge((this, delivery))
+  }
+
+  def _ack(delivery: Delivery): Unit = {
+    assert(!dispatched.isEmpty)
+    val entry = if (dispatched.head.delivery == delivery) {
+      // this should be the common case...
+      val rc = dispatched.head
+      dispatched = dispatched.drop(1)
+      rc
+    } else {
+      // but lets also handle the case where we get an ack out of order.
+      val rc = dispatched.partition(_.delivery == delivery)
+      assert(rc._1.size == 1)
+      dispatched = rc._2
+      rc._1.head
+    }
+    queue.messages.ack(entry)
+  }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=961115&r1=961114&r2=961115&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Wed Jul  7 03:59:25 2010
@@ -124,7 +124,11 @@ object MapSink {
 
       def full = downstream.full
       def offer(value:Y) = {
-        downstream.offer(func(value))
+        if( full ) {
+          false
+        } else {
+          downstream.offer(func(value))
+        }
       }
     }
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961115&r1=961114&r2=961115&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Wed Jul  7 03:59:25 2010
@@ -120,7 +120,7 @@ class VirtualHost(val broker: Broker) ex
               val dest = DestinationParser.parse(info.record.name, destination_parser_options)
               if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
 
-                val queue = new Queue(dest, id)
+                val queue = new Queue(this, dest, id)
                 queue.first_seq = info.first
                 queue.last_seq = info.last
                 queue.message_seq_counter = info.last+1
@@ -185,7 +185,7 @@ class VirtualHost(val broker: Broker) ex
       rc match {
         case Some(id) =>
           dispatchQueue ^ {
-            val queue = new Queue(dest, id)
+            val queue = new Queue(this, dest, id)
             queues.put(name, queue)
             cb(queue)
           }



Mime
View raw message