activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1086622 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...
Date Tue, 29 Mar 2011 15:06:07 GMT
Author: chirino
Date: Tue Mar 29 15:06:07 2011
New Revision: 1086622

URL: http://svn.apache.org/viewvc?rev=1086622&view=rev
Log:
- rename host to virtual_host 
- added a Dispatched interface
- replaced the MapSink object with a map method on the sink class.
- Added SinkFilter and MutableSink helper classes.

Added:
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Tue Mar 29 15:06:07 2011
@@ -32,7 +32,10 @@ import java.util.concurrent.TimeUnit
 trait DomainDestination {
 
   def id:Long
-  def name:String
+  def virtual_host:VirtualHost
+
+  def destination_dto:DestinationDTO
+  def name:String = destination_dto.name
 
   def bind (destination:DestinationDTO, consumer:DeliveryConsumer)
   def unbind (consumer:DeliveryConsumer, persistent:Boolean)
@@ -91,14 +94,14 @@ object LocalRouter extends Log {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class LocalRouter(val host:VirtualHost) extends BaseService with Router {
+class LocalRouter(val virtual_host:VirtualHost) extends BaseService with Router with Dispatched
{
   import LocalRouter._
 
-  def dispatch_queue:DispatchQueue = host.dispatch_queue
+  def dispatch_queue:DispatchQueue = virtual_host.dispatch_queue
 
   def auto_create_destinations = {
     import OptionSupport._
-    host.config.auto_create_destinations.getOrElse(true)
+    virtual_host.config.auto_create_destinations.getOrElse(true)
   }
 
   private val ALL = new Path({
@@ -300,14 +303,14 @@ class LocalRouter(val host:VirtualHost) 
       import collection.JavaConversions._
       import DestinationParser.default._
       import AsciiBuffer._
-      host.config.topics.find( x=> parseFilter(ascii(x.name)).matches(name) ).getOrElse(new
TopicDTO)
+      virtual_host.config.topics.find( x=> parseFilter(ascii(x.name)).matches(name) ).getOrElse(new
TopicDTO)
     }
 
     def can_create_ds(config:DurableSubscriptionDTO, security:SecurityContext) = {
-      if( host.authorizer==null || security==null) {
+      if( virtual_host.authorizer==null || security==null) {
         true
       } else {
-        host.authorizer.can_create(security, host, config)
+        virtual_host.authorizer.can_create(security, virtual_host, config)
       }
     }
 
@@ -344,27 +347,27 @@ class LocalRouter(val host:VirtualHost) 
       // A new destination is being created...
       val dto = topic_config(path)
 
-      if(  host.authorizer!=null && security!=null && !host.authorizer.can_create(security,
host, dto)) {
+      if(  virtual_host.authorizer!=null && security!=null && !virtual_host.authorizer.can_create(security,
virtual_host, dto)) {
         return new Failure("Not authorized to create the destination")
       }
 
       val id = topic_id_counter.incrementAndGet
-      val topic = new Topic(LocalRouter.this, DestinationParser.encode_path(path), dto, id)
+      val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO],
dto, id)
       add_destination(path, topic)
       Success(topic)
     }
 
     def can_bind_one(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Boolean
= {
       val config = topic_config(path)
-      val authorizer = host.authorizer
-      if( authorizer!=null && security!=null && !authorizer.can_receive_from(security,
host, config) ) {
+      val authorizer = virtual_host.authorizer
+      if( authorizer!=null && security!=null && !authorizer.can_receive_from(security,
virtual_host, config) ) {
         return false;
       }
 
       destination match {
         case destination:DurableSubscriptionDestinationDTO=>
           // So the user can subscribe to the topic.. but can he create durable sub??
-          val qc = DurableSubscriptionQueueBinding.create(destination).config(host).asInstanceOf[DurableSubscriptionDTO]
+          val qc = DurableSubscriptionQueueBinding.create(destination).config(virtual_host).asInstanceOf[DurableSubscriptionDTO]
           if( !can_create_ds(qc, security) ) {
              return false;
           }
@@ -375,8 +378,8 @@ class LocalRouter(val host:VirtualHost) 
 
     def can_connect_one(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer,
security:SecurityContext):Boolean = {
       val config = topic_config(path)
-      val authorizer = host.authorizer
-      !(authorizer!=null && security!=null && !authorizer.can_send_to(security,
host, config) )
+      val authorizer = virtual_host.authorizer
+      !(authorizer!=null && security!=null && !authorizer.can_send_to(security,
virtual_host, config) )
     }
 
   }
@@ -385,10 +388,10 @@ class LocalRouter(val host:VirtualHost) 
   class QueueDomain extends Domain[Queue] {
 
     def can_create_queue(config:QueueDTO, security:SecurityContext) = {
-      if( host.authorizer==null || security==null) {
+      if( virtual_host.authorizer==null || security==null) {
         true
       } else {
-        host.authorizer.can_create(security, host, config)
+        virtual_host.authorizer.can_create(security, virtual_host, config)
       }
     }
 
@@ -423,7 +426,7 @@ class LocalRouter(val host:VirtualHost) 
       dto.name = DestinationParser.encode_path(path)
 
       val binding = QueueDomainQueueBinding.create(dto)
-      val config = binding.config(host)
+      val config = binding.config(virtual_host)
       if( can_create_queue(config, security) ) {
         Success(_create_queue(binding))
       } else {
@@ -434,14 +437,14 @@ class LocalRouter(val host:VirtualHost) 
 
     def can_bind_one(path:Path, dto:DestinationDTO, consumer:DeliveryConsumer, security:
SecurityContext):Boolean = {
       val binding = QueueDomainQueueBinding.create(dto)
-      val config = binding.config(host)
-      if(  host.authorizer!=null && security!=null ) {
+      val config = binding.config(virtual_host)
+      if(  virtual_host.authorizer!=null && security!=null ) {
         if( consumer.browser ) {
-          if( !host.authorizer.can_receive_from(security, host, config) ) {
+          if( !virtual_host.authorizer.can_receive_from(security, virtual_host, config) )
{
             return false;
           }
         } else {
-          if( !host.authorizer.can_consume_from(security, host, config) ) {
+          if( !virtual_host.authorizer.can_consume_from(security, virtual_host, config) )
{
             return false
           }
         }
@@ -451,9 +454,9 @@ class LocalRouter(val host:VirtualHost) 
 
     def can_connect_one(path:Path, dto:DestinationDTO, producer:BindableDeliveryProducer,
security:SecurityContext):Boolean = {
       val binding = QueueDomainQueueBinding.create(dto)
-      val config = binding.config(host)
-      val authorizer = host.authorizer
-      !( authorizer!=null && security!=null && !authorizer.can_send_to(security,
host, config) )
+      val config = binding.config(virtual_host)
+      val authorizer = virtual_host.authorizer
+      !( authorizer!=null && security!=null && !authorizer.can_send_to(security,
virtual_host, config) )
     }
 
   }
@@ -465,21 +468,21 @@ class LocalRouter(val host:VirtualHost) 
   /////////////////////////////////////////////////////////////////////////////
 
   protected def _start(on_completed: Runnable) = {
-    val tracker = new LoggingTracker("router startup", host.console_log, dispatch_queue)
-    if( host.store!=null ) {
+    val tracker = new LoggingTracker("router startup", virtual_host.console_log, dispatch_queue)
+    if( virtual_host.store!=null ) {
       val task = tracker.task("list_queues")
-      host.store.list_queues { queue_keys =>
+      virtual_host.store.list_queues { queue_keys =>
         for( queue_key <- queue_keys) {
           val task = tracker.task("load queue: "+queue_key)
           // Use a global queue to so we concurrently restore
           // the queues.
           globalQueue {
-            host.store.get_queue(queue_key) { x =>
+            virtual_host.store.get_queue(queue_key) { x =>
               x match {
                 case Some(record)=>
                   if( record.binding_kind == TempQueueBinding.TEMP_KIND ) {
                     // Drop temp queues on restart..
-                    host.store.remove_queue(queue_key){x=> task.run}
+                    virtual_host.store.remove_queue(queue_key){x=> task.run}
                   } else {
                     dispatch_queue {
                       _create_queue(QueueBinding.create(record.binding_kind, record.binding_data),
queue_key)
@@ -496,7 +499,7 @@ class LocalRouter(val host:VirtualHost) 
     }
 
     import OptionSupport._
-    if(host.config.regroup_connections.getOrElse(false)) {
+    if(virtual_host.config.regroup_connections.getOrElse(false)) {
       schedule_connection_regroup
     }
 
@@ -504,7 +507,7 @@ class LocalRouter(val host:VirtualHost) 
   }
 
   protected def _stop(on_completed: Runnable) = {
-    val tracker = new LoggingTracker("router shutdown", host.console_log, dispatch_queue)
+    val tracker = new LoggingTracker("router shutdown", virtual_host.console_log, dispatch_queue)
     queues_by_id.valuesIterator.foreach { queue=>
       tracker.stop(queue)
     }
@@ -568,7 +571,6 @@ class LocalRouter(val host:VirtualHost) 
 
   def domain(destination: DestinationDTO):Domain[_ <: DomainDestination] = destination
match {
     case x:TopicDestinationDTO => topic_domain
-    case x:DurableSubscriptionDestinationDTO => topic_domain
     case x:QueueDestinationDTO => queue_domain
     case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
   }
@@ -673,10 +675,10 @@ class LocalRouter(val host:VirtualHost) 
 
     var qid = id
     if( qid == -1 ) {
-      qid = host.queue_id_counter.incrementAndGet
+      qid = virtual_host.queue_id_counter.incrementAndGet
     }
 
-    val config = binding.config(host)
+    val config = binding.config(virtual_host)
 
     val queue = new Queue(this, qid, binding, config)
     if( queue.tune_persistent && id == -1 ) {
@@ -686,7 +688,7 @@ class LocalRouter(val host:VirtualHost) 
       record.binding_data = binding.binding_data
       record.binding_kind = binding.binding_kind
 
-      host.store.add_queue(record) { rc => Unit }
+      virtual_host.store.add_queue(record) { rc => Unit }
 
     }
 
@@ -731,7 +733,7 @@ class LocalRouter(val host:VirtualHost) 
   def _destroy_queue(queue:Queue, security:SecurityContext):Result[Zilch, String] = {
 
     if( security!=null && queue.config.acl!=null ) {
-      if( !host.authorizer.can_destroy(security, host, queue.config) ) {
+      if( !virtual_host.authorizer.can_destroy(security, virtual_host, queue.config) ) {
         return Failure("Not authorized to destroy")
       }
     }
@@ -742,7 +744,7 @@ class LocalRouter(val host:VirtualHost) 
     queue.stop
     if( queue.tune_persistent ) {
       queue.dispatch_queue {
-        host.store.remove_queue(queue.id){x=> Unit}
+        virtual_host.store.remove_queue(queue.id){x=> Unit}
       }
     }
     Success(Zilch)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Tue Mar 29 15:06:07 2011
@@ -44,9 +44,9 @@ import Queue._
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val router: LocalRouter, val id:Long, val binding:QueueBinding, var config:QueueDTO)
extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService
with DomainDestination {
+class Queue(val router: LocalRouter, val id:Long, val binding:QueueBinding, var config:QueueDTO)
extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService
with DomainDestination with Dispatched {
 
-  def host = router.host
+  def virtual_host = router.virtual_host
 
   var inbound_sessions = Set[DeliverySession]()
   var all_subscriptions = Map[DeliveryConsumer, Subscription]()
@@ -55,12 +55,15 @@ class Queue(val router: LocalRouter, val
   val filter = binding.message_filter
 
   override val dispatch_queue: DispatchQueue = createQueue(binding.label);
-  host.broker.init_dispatch_queue(dispatch_queue)
+  virtual_host.broker.init_dispatch_queue(dispatch_queue)
+
+  def destination_dto: DestinationDTO = binding.binding_dto
 
   dispatch_queue {
     debug("created queue for: " + binding.label)
   }
 
+
   override def dispose: Unit = {
     ack_source.cancel
   }
@@ -123,7 +126,7 @@ class Queue(val router: LocalRouter, val
 
   def configure(c:QueueDTO) = {
     config = c
-    tune_persistent = host.store !=null && config.persistent.getOrElse(true)
+    tune_persistent = virtual_host.store !=null && config.persistent.getOrElse(true)
     tune_swap = tune_persistent && config.swap.getOrElse(true)
     tune_swap_range_size = config.swap_range_size.getOrElse(10000)
     tune_consumer_buffer = config.consumer_buffer.getOrElse(32*1024)
@@ -187,7 +190,7 @@ class Queue(val router: LocalRouter, val
 
     if( tune_persistent ) {
 
-      host.store.list_queue_entry_ranges(id, tune_swap_range_size) { ranges=>
+      virtual_host.store.list_queue_entry_ranges(id, tune_swap_range_size) { ranges=>
         dispatch_queue {
           if( ranges!=null && !ranges.isEmpty ) {
 
@@ -502,13 +505,13 @@ class Queue(val router: LocalRouter, val
   def connected() = {}
 
   def bind(value: DeliveryConsumer, security:SecurityContext): Result[Zilch, String] = {
-    if(  host.authorizer!=null && security!=null ) {
+    if(  virtual_host.authorizer!=null && security!=null ) {
       if( value.browser ) {
-        if( !host.authorizer.can_receive_from(security, host, config) ) {
+        if( !virtual_host.authorizer.can_receive_from(security, virtual_host, config) ) {
           return new Failure("Not authorized to browse the queue")
         }
       } else {
-        if( !host.authorizer.can_consume_from(security, host, config) ) {
+        if( !virtual_host.authorizer.can_consume_from(security, virtual_host, config) ) {
           return new Failure("Not authorized to consume from the queue")
         }
       }
@@ -561,8 +564,6 @@ class Queue(val router: LocalRouter, val
     producer.unbind(this::Nil)
   }
 
-  def name: String = binding.label
-
   override def connection:Option[BrokerConnection] = None
 
 
@@ -946,7 +947,7 @@ class QueueEntry(val queue:Queue, val se
               // Are swapping out a non-persistent message?
               if( delivery.storeKey == -1 ) {
                 
-                delivery.uow = queue.host.store.create_uow
+                delivery.uow = queue.virtual_host.store.create_uow
                 val uow = delivery.uow
                 delivery.storeKey = uow.store(delivery.createMessageRecord)
                 store
@@ -959,7 +960,7 @@ class QueueEntry(val queue:Queue, val se
               } else {
                   
                 if( asap ) {
-                  queue.host.store.flush_message(message_key) {
+                  queue.virtual_host.store.flush_message(message_key) {
                     queue.swap_out_completes_source.merge(this)
                   }
                 }
@@ -1138,7 +1139,7 @@ class QueueEntry(val queue:Queue, val se
         // start swapping in...
         swapping_in = true
         queue.swapping_in_size += size
-        queue.host.store.load_message(message_key) { delivery =>
+        queue.virtual_host.store.load_message(message_key) { delivery =>
           // pass off to a source so it can aggregate multiple
           // loads to reduce cross thread synchronization
           if( delivery.isDefined ) {
@@ -1242,7 +1243,7 @@ class QueueEntry(val queue:Queue, val se
     override def swap_in() = {
       if( !swapping_in ) {
         swapping_in = true
-        queue.host.store.list_queue_entries(queue.id, seq, last) { records =>
+        queue.virtual_host.store.list_queue_entries(queue.id, seq, last) { records =>
           if( !records.isEmpty ) {
             queue.dispatch_queue {
 
@@ -1501,7 +1502,7 @@ class Subscription(val queue:Queue, val 
       total_ack_count += 1
       if (entry.messageKey != -1) {
         val storeBatch = if( sb == null ) {
-          queue.host.store.create_uow
+          queue.virtual_host.store.create_uow
         } else {
           sb
         }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Tue Mar 29 15:06:07 2011
@@ -31,7 +31,7 @@ import ReporterLevel._
  */
 trait Router extends Service {
 
-  def host:VirtualHost
+  def virtual_host:VirtualHost
 
   def get_queue(dto:Long):Option[Queue] @suspendable
 
@@ -197,7 +197,7 @@ abstract class DeliveryProducerRoute(val
 
           if( copy.storeKey == -1L && target.consumer.is_persistent && copy.message.persistent
) {
             if( copy.uow==null ) {
-              copy.uow = router.host.store.create_uow
+              copy.uow = router.virtual_host.store.create_uow
             } else {
               copy.uow.retain
             }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Tue Mar 29 15:06:07 2011
@@ -49,24 +49,44 @@ trait Sink[T] {
    * Sets a refiller on the sink.  The refiller is executed
    * when the sink is interested in receiving more deliveries.
    */
-  var refiller:Runnable
+  def refiller:Runnable
+  def refiller_=(value:Runnable)
 
+  def map[Y](func: Y=>T ):Sink[Y] = {
+    def outer = Sink.this
+    new Sink[Y]() with SinkFilter {
+      def downstream = outer
+      def offer(value:Y) = {
+        if( full ) {
+          false
+        } else {
+          outer.offer(func(value))
+        }
+      }
+    }
+  }
 
 }
 
+trait SinkFilter {
+  def downstream:Sink[_]
+  def refiller:Runnable = downstream.refiller
+  def refiller_=(value:Runnable) { downstream.refiller=value }
+  def full: Boolean = downstream.full
+}
 
 /**
  * <p>
  * A delivery sink which is connected to a transport. It expects the caller's dispatch
- * queue to be the same as the transport's/
+ * queue to be the same as the transport's
  * <p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class TransportSink(val transport:Transport) extends Sink[AnyRef] {
+  var refiller:Runnable = NOOP
   def full:Boolean = transport.full
   def offer(value:AnyRef) =  transport.offer(value)
-  var refiller:Runnable = null
 }
 
 /**
@@ -78,23 +98,24 @@ class TransportSink(val transport:Transp
  */
 class OverflowSink[T](val downstream:Sink[T]) extends Sink[T] {
 
-  private var overflow = new LinkedList[T]()
-  var refiller: Runnable = null
+  var refiller:Runnable = NOOP
+
+  private var overflow = collection.mutable.Queue[T]()
 
   def overflowed = !overflow.isEmpty
 
   def full = overflowed || downstream.full
 
+  def clear = overflow.clear
+
   downstream.refiller = ^{ drain }
 
   protected def drain:Unit = {
     while( overflowed ) {
-      val delivery = overflow.removeFirst
-      if( !downstream.offer(delivery) ) {
-        overflow.addFirst(delivery)
+      if( !downstream.offer(overflow.front) ) {
         return
       } else {
-        onDelivered(delivery)
+        onDelivered(overflow.dequeue)
       }
     }
     // request a refill once the overflow is empty...
@@ -107,7 +128,7 @@ class OverflowSink[T](val downstream:Sin
    */
   def offer(value:T) = {
     if( overflowed || !downstream.offer(value)) {
-      overflow.addLast(value)
+      overflow.enqueue(value)
     } else {
       onDelivered(value)
     }
@@ -122,24 +143,37 @@ class OverflowSink[T](val downstream:Sin
 
 }
 
-object MapSink {
-  def apply[X,Y](downstream:Sink[X])(func: Y=>X ) = {
-    new Sink[Y] {
-      def refiller = downstream.refiller
-      def refiller_=(value:Runnable) = downstream.refiller=value
 
-      def full = downstream.full
-      def offer(value:Y) = {
-        if( full ) {
-          false
-        } else {
-          downstream.offer(func(value))
-        }
-      }
+/**
+ * A sink that allows the downstream sink to set to an
+ * optional sink.
+ */
+class MutableSink[T] extends Sink[T] {
+
+  var refiller:Runnable = NOOP
+  private var _downstream:Option[Sink[T]] =_
+
+  def downstream_=(value: Option[Sink[T]]) {
+    _downstream.foreach(d => d.refiller = NOOP )
+    _downstream = value
+    _downstream.foreach{d =>
+      d.refiller = refiller
+      refiller.run
     }
   }
+
+  def downstream = _downstream
+
+  def full = _downstream.map(_.full).getOrElse(true)
+
+  /**
+   * @return true always even when full since those messages just get stored in a
+   *         overflow list
+   */
+  def offer(value:T) = ! _downstream.map(_.offer(value)).getOrElse(false)
 }
 
+
 /**
  *  <p>
  * A SinkMux multiplexes access to a target sink so that multiple
@@ -156,8 +190,7 @@ class SinkMux[T](val downstream:Sink[T],
   var sessions = HashSet[Session[T]]()
   var session_max_credits = 1024*32;
 
-  val overflow = new OverflowSink[(Session[T],T)](MapSink(downstream){_._2}) {
-
+  val overflow = new OverflowSink[(Session[T],T)](downstream.map(_._2)) {
     // Once a value leaves the overflow, then we can credit the
     // session so that more messages can be accepted.
     override protected def onDelivered(event:(Session[T],T)) = {
@@ -223,6 +256,8 @@ class SinkMux[T](val downstream:Sink[T],
  */
 class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SinkMux[T]) extends
Sink[T] {
 
+  var refiller:Runnable = NOOP
+
   private def session_max_credits = mux.session_max_credits
   private def sizer = mux.sizer
   private def downstream = mux.source
@@ -255,7 +290,6 @@ class Session[T](val producer_queue:Disp
   // producer serial dispatch queue
   ///////////////////////////////////////////////////
 
-  var refiller:Runnable = ^{}
 
   override def full = {
     assert(getCurrentQueue eq producer_queue)
@@ -311,11 +345,12 @@ trait Sizer[T] {
  */
 class QueueSink[T](val sizer:Sizer[T], var maxSize:Int=1024*32) extends Sink[T] {
 
+  var refiller:Runnable = NOOP
+
   var buffer = new LinkedList[T]()
   private var size = 0
 
   var drainer: Runnable = null
-  var refiller: Runnable = null
 
   def full = size >= maxSize
   def poll = buffer.poll

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Tue Mar 29 15:06:07 2011
@@ -30,7 +30,7 @@ import collection.mutable.{HashMap, List
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Topic(val router:LocalRouter, val name:String, val config:TopicDTO, val id:Long) extends
DomainDestination {
+class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, val config:TopicDTO,
val id:Long) extends DomainDestination {
 
   var producers = ListBuffer[BindableDeliveryProducer]()
   var consumers = ListBuffer[DeliveryConsumer]()
@@ -39,6 +39,8 @@ class Topic(val router:LocalRouter, val 
 
   import OptionSupport._
 
+  def virtual_host: VirtualHost = router.virtual_host
+
   def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
 
   def is_same_ds(sub1:DurableSubscriptionDestinationDTO, sub2:DurableSubscriptionDestinationDTO)
= {
@@ -55,6 +57,23 @@ class Topic(val router:LocalRouter, val 
           r.bind(list)
         })
 
+      case destination:DurableSubscriptionDestinationDTO=>
+
+        val queue = router.topic_domain.get_or_create_durable_subscription(destination)
+        if( !durable_subscriptions.contains(queue) ) {
+          durable_subscriptions += queue
+          val list = List(queue)
+          producers.foreach({ r=>
+            r.bind(list)
+          })
+        }
+
+        // Typically durable subs are only consumed by on connection at a time. So collocate
the
+        // queue onto the consumer's dispatch queue.
+        queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
+        queue.bind(destination, consumer)
+        consumer_queues += consumer->queue
+
       case destination:TopicDestinationDTO=>
         var target = consumer
         slow_consumer_policy match {
@@ -78,22 +97,6 @@ class Topic(val router:LocalRouter, val 
           r.bind(list)
         })
 
-      case destination:DurableSubscriptionDestinationDTO=>
-
-        val queue = router.topic_domain.get_or_create_durable_subscription(destination)
-        if( !durable_subscriptions.contains(queue) ) {
-          durable_subscriptions += queue
-          val list = List(queue)
-          producers.foreach({ r=>
-            r.bind(list)
-          })
-        }
-
-        // Typically durable subs are only consumed by on connection at a time. So collocate
the
-        // queue onto the consumer's dispatch queue.
-        queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
-        queue.bind(destination, consumer)
-        consumer_queues += consumer->queue
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Tue Mar 29 15:06:07 2011
@@ -400,7 +400,7 @@ class StompProtocolHandler extends Proto
 
   override def on_transport_connected() = {
 
-    session_manager = new SinkMux[StompFrame]( MapSink(connection.transport_sink){x=>
+    session_manager = new SinkMux[StompFrame]( connection.transport_sink.map {x=>
       trace("sending frame: %s", x)
       x
     }, dispatchQueue, StompFrame)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
Tue Mar 29 15:06:07 2011
@@ -23,100 +23,108 @@ import _root_.org.fusesource.hawtbuf.{By
 import java.io._
 import org.apache.activemq.apollo.broker.{KeyStorage, ProtocolException}
 import javax.net.ssl.{SSLSocket, SSLContext}
+import org.scalatest.matchers.ShouldMatchers
 
 /**
  * A simple Stomp client used for testing purposes
  */
-  class StompClient {
+class StompClient extends ShouldMatchers {
 
-    var socket:Socket = new Socket
-    var out:OutputStream = null
-    var in:InputStream = null
-    val bufferSize = 64*1204
-    var key_storeage:KeyStorage=null
-
-    def open(host: String, port: Int) = {
-
-      socket = if( key_storeage!=null ) {
-        val context = SSLContext.getInstance("TLS")
-        context.init(key_storeage.create_key_managers, key_storeage.create_trust_managers,
null)
-        context.getSocketFactory().createSocket()
-        // socket.asInstanceOf[SSLSocket].setEnabledCipherSuites(Array("SSL_RSA_WITH_RC4_128_MD5"))
-        // socket
-      } else {
-        new Socket
+  var socket:Socket = new Socket
+  var out:OutputStream = null
+  var in:InputStream = null
+  val bufferSize = 64*1204
+  var key_storeage:KeyStorage=null
+
+  def open(host: String, port: Int) = {
+
+    socket = if( key_storeage!=null ) {
+      val context = SSLContext.getInstance("TLS")
+      context.init(key_storeage.create_key_managers, key_storeage.create_trust_managers,
null)
+      context.getSocketFactory().createSocket()
+      // socket.asInstanceOf[SSLSocket].setEnabledCipherSuites(Array("SSL_RSA_WITH_RC4_128_MD5"))
+      // socket
+    } else {
+      new Socket
+    }
+    socket.connect(new InetSocketAddress(host, port))
+    socket.setSoLinger(true, 0)
+    out = new BufferedOutputStream(socket.getOutputStream, bufferSize)
+    in = new BufferedInputStream(socket.getInputStream, bufferSize)
+  }
+
+  def close() = {
+    socket.close
+  }
+
+  def write(frame:String) = {
+    out.write(frame.getBytes("UTF-8"))
+    out.write(0)
+    out.write('\n')
+    out.flush
+  }
+
+  def write(frame:Array[Byte]) = {
+    out.write(frame)
+    out.write(0)
+    out.write('\n')
+    out.flush
+  }
+
+  def skip():Unit = {
+    var c = in.read
+    while( c >= 0 ) {
+      if( c==0 ) {
+        return
       }
-      socket.connect(new InetSocketAddress(host, port))
-      socket.setSoLinger(true, 0)
-      out = new BufferedOutputStream(socket.getOutputStream, bufferSize)
-      in = new BufferedInputStream(socket.getInputStream, bufferSize)
+      c = in.read()
     }
+    throw new EOFException()
+  }
 
-    def close() = {
-      socket.close
-    }
-
-    def write(frame:String) = {
-      out.write(frame.getBytes("UTF-8"))
-      out.write(0)
-      out.write('\n')
-      out.flush
-    }
-
-    def write(frame:Array[Byte]) = {
-      out.write(frame)
-      out.write(0)
-      out.write('\n')
-      out.flush
-    }
-
-    def skip():Unit = {
-      var c = in.read
-      while( c >= 0 ) {
-        if( c==0 ) {
-          return
-        }
-        c = in.read()
+  def receive():String = {
+    var start = true;
+    val buffer = new BAOS()
+    var c = in.read
+    while( c >= 0 ) {
+      if( c==0 ) {
+        return new String(buffer.toByteArray, "UTF-8")
       }
-      throw new EOFException()
-    }
-
-    def receive():String = {
-      var start = true;
-      val buffer = new BAOS()
-      var c = in.read
-      while( c >= 0 ) {
-        if( c==0 ) {
-          return new String(buffer.toByteArray, "UTF-8")
-        }
-        if( !start || c!= Stomp.NEWLINE) {
-          start = false
-          buffer.write(c)
-        }
-        c = in.read()
+      if( !start || c!= Stomp.NEWLINE) {
+        start = false
+        buffer.write(c)
       }
-      throw new EOFException()
+      c = in.read()
     }
+    throw new EOFException()
+  }
 
-    def receiveAscii():AsciiBuffer = {
-      val buffer = new BAOS()
-      var c = in.read
-      while( c >= 0 ) {
-        if( c==0 ) {
-          return buffer.toBuffer.ascii
-        }
-        buffer.write(c)
-        c = in.read()
+  def wait_for_receipt(id:String): Unit = {
+    val frame = receive()
+    frame should startWith("RECEIPT\n")
+    frame should include("receipt-id:"+id+"\n")
+  }
+
+
+  def receiveAscii():AsciiBuffer = {
+    val buffer = new BAOS()
+    var c = in.read
+    while( c >= 0 ) {
+      if( c==0 ) {
+        return buffer.toBuffer.ascii
       }
-      throw new EOFException()
+      buffer.write(c)
+      c = in.read()
     }
+    throw new EOFException()
+  }
 
-    def receive(expect:String):String = {
-      val rc = receive()
-      if( !rc.startsWith(expect) ) {
-        throw new ProtocolException("Expected "+expect)
-      }
-      rc
+  def receive(expect:String):String = {
+    val rc = receive()
+    if( !rc.startsWith(expect) ) {
+      throw new ProtocolException("Expected "+expect)
     }
+    rc
+  }
 
-  }
\ No newline at end of file
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1086622&r1=1086621&r2=1086622&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Tue Mar 29 15:06:07 2011
@@ -35,7 +35,7 @@ class StompRemoteConsumer extends Remote
   var outboundSink: OverflowSink[StompFrame] = null
 
   def onConnected() = {
-    outboundSink = new OverflowSink[StompFrame](MapSink(transport_sink) {x => x})
+    outboundSink = new OverflowSink[StompFrame](transport_sink.map(x=>x))
     outboundSink.refiller = ^ {}
 
     val stompDestination = destination match {
@@ -145,7 +145,7 @@ class StompRemoteProducer extends Remote
   }
 
   override def onConnected() = {
-    outboundSink = new OverflowSink[StompFrame](MapSink(transport_sink) {x => x})
+    outboundSink = new OverflowSink[StompFrame](transport_sink.map(x=>x))
     outboundSink.refiller = ^ {drain}
 
     stompDestination = destination match {

Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala?rev=1086622&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Tue Mar 29 15:06:07 2011
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.util
+import org.fusesource.hawtdispatch._
+
+/**
+ * <p>
+ * Trait that exposes the {@link DispatchQueue} used to guard
+ * mutable access to the state of the object implementing this interface.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait Dispatched {
+  def dispatch_queue:DispatchQueue
+
+  protected def assert_dispatched = assert( getCurrentQueue == dispatch_queue )
+}
\ No newline at end of file



Mime
View raw message