activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1305912 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/br...
Date Tue, 27 Mar 2012 17:29:25 GMT
Author: chirino
Date: Tue Mar 27 17:29:24 2012
New Revision: 1305912

URL: http://svn.apache.org/viewvc?rev=1305912&view=rev
Log:
Switch from using Runnables to using Tasks since they are slightly more efficient.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/web/WebServerFactory.scala
    activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Run.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileCache.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/JavaBaseService.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Service.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceStopper.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceSupport.java
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Tue Mar 27 17:29:24 2012
@@ -65,7 +65,7 @@ class BDBStore(var config:BDBStoreDTO) e
     }
   }
 
-  protected def _start(on_completed: Runnable) = {
+  protected def _start(on_completed: Task) = {
     write_executor = Executors.newFixedThreadPool(1, new ThreadFactory(){
       def newThread(r: Runnable) = {
         val rc = new Thread(r, "bdb store io write")
@@ -90,7 +90,7 @@ class BDBStore(var config:BDBStoreDTO) e
     }
   }
 
-  protected def _stop(on_completed: Runnable) = {
+  protected def _stop(on_completed: Task) = {
     new Thread() {
       override def run = {
         write_executor.shutdown

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Tue Mar 27 17:29:24 2012
@@ -276,7 +276,7 @@ class Broker() extends BaseService with 
   /**
    * Validates and then applies the configuration.
    */
-  def update(config: BrokerDTO, on_completed:Runnable) = dispatch_queue {
+  def update(config: BrokerDTO, on_completed:Task) = dispatch_queue {
     dispatch_queue.assertExecuting()
     this.config = config
 
@@ -287,7 +287,7 @@ class Broker() extends BaseService with 
     tracker.callback(on_completed)
   }
 
-  override def _start(on_completed:Runnable) = {
+  override def _start(on_completed:Task) = {
 
     // create the runtime objects from the config
     init_logs
@@ -304,7 +304,7 @@ class Broker() extends BaseService with 
 
   }
 
-  def _stop(on_completed:Runnable): Unit = {
+  def _stop(on_completed:Task): Unit = {
     val tracker = new LoggingTracker("broker shutdown", console_log, SERVICE_TIMEOUT)
 
     // Stop the services...

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Tue Mar 27 17:29:24 2012
@@ -52,7 +52,7 @@ abstract class Connection() extends Base
   var transport:Transport = null
   var transport_sink:TransportSink = null
 
-  override protected def _start(on_completed:Runnable) = {
+  override protected def _start(on_completed:Task) = {
     stopped = false
     transport_sink = new TransportSink(transport)
     transport.setDispatchQueue(dispatch_queue);
@@ -66,7 +66,7 @@ abstract class Connection() extends Base
     transport.start(on_completed)
   }
 
-  override protected def _stop(on_completed:Runnable) = {
+  override protected def _stop(on_completed:Task) = {
     stopped = true
     transport.stop(on_completed)
   }
@@ -112,12 +112,12 @@ class BrokerConnection(var connector: Co
   
   override def toString = "id: "+id.toString
 
-  protected override  def _start(on_completed:Runnable) = {
+  protected override  def _start(on_completed:Task) = {
     protocol_handler.set_connection(this);
     super._start(on_completed)
   }
 
-  protected override def _stop(on_completed:Runnable) = {
+  protected override def _stop(on_completed:Task) = {
     connector.stopped(this)
     super._stop(on_completed)
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Tue Mar 27 17:29:24 2012
@@ -44,7 +44,7 @@ trait Connector extends BaseService with
   def config:ConnectorTypeDTO
   def accepted:LongCounter
   def connected:LongCounter
-  def update(config: ConnectorTypeDTO, on_complete:Runnable):Unit
+  def update(config: ConnectorTypeDTO, on_complete:Task):Unit
   def socket_address:SocketAddress
   def status:ServiceStatusDTO
   def resource_kind = SecuredResource.ConnectorKind
@@ -153,7 +153,7 @@ class AcceptingConnector(val broker:Brok
 
       broker.connections.put(connection.id, connection)
       try {
-        connection.start()
+        connection.start(NOOP)
       } catch {
         case e1: Exception => {
           onAcceptError(e1)
@@ -174,7 +174,7 @@ class AcceptingConnector(val broker:Brok
 
   /**
    */
-  def update(config: ConnectorTypeDTO, on_completed:Runnable) = dispatch_queue {
+  def update(config: ConnectorTypeDTO, on_completed:Task) = dispatch_queue {
     if ( !service_state.is_started || this.config == config ) {
       this.config = config.asInstanceOf[AcceptingConnectorDTO]
       on_completed.run
@@ -189,7 +189,7 @@ class AcceptingConnector(val broker:Brok
   }
 
 
-  override def _start(on_completed:Runnable) = {
+  override def _start(on_completed:Task) = {
     assert(config!=null, "Connector must be configured before it is started.")
 
     accepted.set(0)
@@ -220,7 +220,7 @@ class AcceptingConnector(val broker:Brok
   }
 
 
-  override def _stop(on_completed:Runnable): Unit = {
+  override def _stop(on_completed:Task): Unit = {
     transport_server.stop(^{
       broker.console_log.info("Stopped connector at: "+config.bind)
       transport_server = null

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Tue Mar 27 17:29:24 2012
@@ -129,7 +129,7 @@ trait DomainDestination extends SecuredR
   def connect (connect_address:ConnectAddress, producer:BindableDeliveryProducer):Unit
   def disconnect (producer:BindableDeliveryProducer):Unit
 
-  def update(on_completed:Runnable):Unit
+  def update(on_completed:Task):Unit
 
 }
 
@@ -843,7 +843,7 @@ class LocalRouter(val virtual_host:Virtu
     }
   }
 
-  protected def _start(on_completed: Runnable) = {
+  protected def _start(on_completed: Task) = {
     val tracker = new LoggingTracker("router startup", virtual_host.console_log)
     if( virtual_host.store!=null ) {
       val task = tracker.task("list_queues")
@@ -924,10 +924,10 @@ class LocalRouter(val virtual_host:Virtu
     }
   }
 
-  protected def _stop(on_completed: Runnable) = {
+  protected def _stop(on_completed: Task) = {
 //    val tracker = new LoggingTracker("router shutdown", virtual_host.console_log, dispatch_queue)
     queues_by_store_id.valuesIterator.foreach { queue=>
-      queue.stop
+      queue.stop(NOOP)
 //      tracker.stop(queue)
     }
 //    tracker.callback(on_completed)
@@ -1231,7 +1231,7 @@ class LocalRouter(val virtual_host:Virtu
       virtual_host.store.add_queue(record) { rc => Unit }
     }
 
-    queue.start
+    queue.start(NOOP)
 //    queues_by_binding.put(binding, queue)
     queues_by_store_id.put(qid, queue)
 
@@ -1294,7 +1294,7 @@ class LocalRouter(val virtual_host:Virtu
     })
   }
 
-  def apply_update(on_completed:Runnable) = {
+  def apply_update(on_completed:Task) = {
     val tracker = new LoggingTracker("domain update", virtual_host.broker.console_log)
     local_topic_domain.apply_update(tracker)
     local_queue_domain.apply_update(tracker)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Mar 27 17:29:24 2012
@@ -366,7 +366,7 @@ class Queue(val router: LocalRouter, val
     rc
   }
 
-  def update(on_completed:Runnable) = dispatch_queue {
+  def update(on_completed:Task) = dispatch_queue {
 
     val prev_persistent = tune_persistent
 
@@ -425,7 +425,7 @@ class Queue(val router: LocalRouter, val
     }
   }
 
-  protected def _start(on_completed: Runnable) = {
+  protected def _start(on_completed: Task) = {
     restore_from_store {
 
 
@@ -440,14 +440,14 @@ class Queue(val router: LocalRouter, val
       // kick off dispatching to the consumers.
       check_idle
       trigger_swap
-      dispatch_queue << head_entry
+      dispatch_queue << head_entry.task
 
     }
   }
 
-  var stop_listener_waiting_for_flush:Runnable = _
+  var stop_listener_waiting_for_flush:Task = _
 
-  protected def _stop(on_completed: Runnable) = {
+  protected def _stop(on_completed: Task) = {
 
     // Now that we are stopping the queue will no longer be 'full'
     // draining will nack all enqueue attempts.
@@ -505,7 +505,7 @@ class Queue(val router: LocalRouter, val
 
   object messages extends Sink[Delivery] {
 
-    var refiller: Runnable = null
+    var refiller: Task = null
 
     def is_quota_exceeded = (tune_quota >= 0 && queue_size > tune_quota)
     def is_enqueue_throttled = (enqueues_remaining!=null && enqueues_remaining.get() <= 0)
@@ -1046,7 +1046,7 @@ class Queue(val router: LocalRouter, val
     rc
   }
 
-  val swap_out_completes_source = createSource(new ListEventAggregator[Runnable](), dispatch_queue)
+  val swap_out_completes_source = createSource(new ListEventAggregator[Task](), dispatch_queue)
   swap_out_completes_source.setEventHandler(^ {drain_swap_out_completes});
   swap_out_completes_source.resume
 
@@ -1070,7 +1070,7 @@ class Queue(val router: LocalRouter, val
 
     data.foreach { case (swapped,_) =>
       if( swapped.entry.hasSubs ) {
-        swapped.entry.run
+        swapped.entry.task.run
       }
     }
   }
@@ -1081,9 +1081,11 @@ object QueueEntry extends Sizer[QueueEnt
   def size(value: QueueEntry): Int = value.size
 }
 
-class QueueEntry(val queue:Queue, val seq:Long) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
+class QueueEntry(val queue:Queue, val seq:Long) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] {
   import QueueEntry._
 
+
+
   // Subscriptions waiting to dispatch this entry.
   var parked:List[Subscription] = Nil
 
@@ -1134,15 +1136,17 @@ class QueueEntry(val queue:Queue, val se
    * Dispatches this entry to the consumers and continues dispatching subsequent
    * entries as long as the dispatch results in advancing in their dispatch position.
    */
-  def run() = {
-    queue.assert_executing
-    var cur = this;
-    while( cur!=null && cur.isLinked ) {
-      val next = cur.getNext
-      cur = if( cur.dispatch ) {
-        next
-      } else {
-        null
+  final val task = new Task {
+    def run() {
+      queue.assert_executing
+      var cur = QueueEntry.this;
+      while( cur!=null && cur.isLinked ) {
+        val next = cur.getNext
+        cur = if( cur.dispatch ) {
+          next
+        } else {
+          null
+        }
       }
     }
   }
@@ -2029,7 +2033,7 @@ class Subscription(val queue:Queue, val 
         check_consumer_stall
       }
       if( pos!=null ) {
-        pos.run
+        pos.task.run
       }
     }
     pos ::= this
@@ -2045,7 +2049,7 @@ class Subscription(val queue:Queue, val 
     if( queue.service_state.is_started ) {
       // kick off the initial dispatch.
       refill_prefetch
-      queue.dispatch_queue << pos
+      queue.dispatch_queue << pos.task
     }
     queue.check_idle
   }
@@ -2117,7 +2121,7 @@ class Subscription(val queue:Queue, val 
     value ::= this
     pos = value
     check_load_stall
-    queue.dispatch_queue << value // queue up the entry to get dispatched..
+    queue.dispatch_queue << value.task // queue up the entry to get dispatched..
   }
 
   def tail_parked = pos eq queue.tail_entry
@@ -2251,7 +2255,7 @@ class Subscription(val queue:Queue, val 
       entry.remove // entry size changes to 0
 
       queue.trigger_swap
-      next.run
+      next.task.run
       check_finish_close
       
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Tue Mar 27 17:29:24 2012
@@ -175,7 +175,7 @@ trait Router extends Service {
 
   def create(destinations:Array[_ <: DestinationAddress], security:SecurityContext): Option[String]
 
-  def apply_update(on_completed:Runnable):Unit
+  def apply_update(on_completed:Task):Unit
 
   def remove_temp_destinations(active_connections:scala.collection.Set[String]):Unit
 }
@@ -269,7 +269,7 @@ abstract class DeliveryProducerRoute(rou
   var pendingAck: (DeliveryResult, StoreUOW)=>Unit = null
   var overflow:Delivery=null
   var overflowSessions = List[DeliverySession]()
-  var refiller:Runnable=null
+  var refiller:Task=null
 
   def full = overflow!=null
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Tue Mar 27 17:29:24 2012
@@ -48,8 +48,8 @@ trait Sink[T] {
    * Sets a refiller on the sink.  The refiller is executed
    * when the sink is interested in receiving more deliveries.
    */
-  def refiller:Runnable
-  def refiller_=(value:Runnable)
+  def refiller:Task
+  def refiller_=(value:Task)
 
   def map[Y](func: Y=>T ):Sink[Y] = new SinkMapper[Y,T] {
     def passing(value: Y) = func(value)
@@ -75,8 +75,8 @@ trait Sink[T] {
 
 trait SinkFilter[T] {
   def downstream:Sink[T]
-  def refiller:Runnable = downstream.refiller
-  def refiller_=(value:Runnable) { downstream.refiller=value }
+  def refiller:Task = downstream.refiller
+  def refiller_=(value:Task) { downstream.refiller=value }
   def full: Boolean = downstream.full
 }
 
@@ -100,7 +100,7 @@ trait SinkMapper[T,X] extends Sink[T] wi
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class TransportSink(val transport:Transport) extends Sink[AnyRef] {
-  var refiller:Runnable = NOOP
+  var refiller:Task = NOOP
   def full:Boolean = transport.full
   def offer(value:AnyRef) =  transport.offer(value)
 }
@@ -114,7 +114,7 @@ class TransportSink(val transport:Transp
  */
 class OverflowSink[T](val downstream:Sink[T]) extends Sink[T] {
 
-  var refiller:Runnable = NOOP
+  var refiller:Task = NOOP
 
   val overflow = new LinkedList[T]()
 
@@ -186,7 +186,7 @@ class OverflowSink[T](val downstream:Sin
  */
 class MutableSink[T] extends Sink[T] {
 
-  var refiller:Runnable = NOOP
+  var refiller:Task = NOOP
   private var _downstream:Option[Sink[T]] = None
 
   def downstream_=(value: Option[Sink[T]]) {
@@ -227,7 +227,7 @@ class SinkMux[T](val downstream:Sink[T])
   class ManagedSink extends Sink[T] {
 
     var rejection_handler:(T)=>Unit = _
-    var refiller:Runnable = NOOP
+    var refiller:Task = NOOP
 
     def full = downstream.full && rejection_handler==null
 
@@ -394,7 +394,7 @@ class SessionSinkMux[T](val downstream:S
  */
 class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SessionSinkMux[T]) extends SessionSink[T] {
 
-  var refiller:Runnable = NOOP
+  var refiller:Task = NOOP
 
   private def sizer = mux.sizer
   private def downstream = mux.source
@@ -495,12 +495,12 @@ trait Sizer[T] {
  */
 class QueueSink[T](val sizer:Sizer[T], var maxSize:Int=1024*32) extends Sink[T] {
 
-  var refiller:Runnable = NOOP
+  var refiller:Task = NOOP
 
   var buffer = new LinkedList[T]()
   private var size = 0
 
-  var drainer: Runnable = null
+  var drainer: Task = null
 
   def full = size >= maxSize
   def poll = buffer.poll

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Tue Mar 27 17:29:24 2012
@@ -86,7 +86,7 @@ class Topic(val router:LocalRouter, val 
     var enqueue_ts = 0L
     var enqueue_size_counter = 0L
     var enqueue_item_counter = 0L
-    var refiller:Runnable = null
+    var refiller:Task = null
 
 
     def offer(value: Delivery) = {
@@ -294,7 +294,7 @@ class Topic(val router:LocalRouter, val 
   }
 
 
-  def update(on_completed:Runnable) = {
+  def update(on_completed:Task) = {
     refresh_config
     on_completed.run
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Tue Mar 27 17:29:24 2012
@@ -146,7 +146,7 @@ class VirtualHost(val broker: Broker, va
   /**
    * Validates and then applies the configuration.
    */
-  def update(config: VirtualHostDTO, on_completed:Runnable) = dispatch_queue {
+  def update(config: VirtualHostDTO, on_completed:Task) = dispatch_queue {
     if ( !service_state.is_started ) {
       this.config = config
       on_completed.run
@@ -194,7 +194,7 @@ class VirtualHost(val broker: Broker, va
     }
   }
 
-  override protected def _start(on_completed:Runnable):Unit = {
+  override protected def _start(on_completed:Task):Unit = {
     apply_update
 
     if ( Option(config.heap_bypass).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(0) > 0 ) {
@@ -257,7 +257,7 @@ class VirtualHost(val broker: Broker, va
   }
 
 
-  override protected def _stop(on_completed:Runnable):Unit = {
+  override protected def _stop(on_completed:Task):Unit = {
 
     val tracker = new LoggingTracker("virtual host shutdown", console_log)
     tracker.stop(router);

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala Tue Mar 27 17:29:24 2012
@@ -139,7 +139,7 @@ class JettyWebServer(val broker:Broker) 
   val dispatch_queue = createQueue()
   var web_admins = List[WebAdminDTO]()
 
-  protected def _start(on_completed: Runnable) = Broker.BLOCKABLE_THREAD_POOL {
+  protected def _start(on_completed: Task) = Broker.BLOCKABLE_THREAD_POOL {
     this.synchronized {
       import OptionSupport._
       import FileSupport._
@@ -260,7 +260,7 @@ class JettyWebServer(val broker:Broker) 
     }
   }
 
-  protected def _stop(on_completed: Runnable) = Broker.BLOCKABLE_THREAD_POOL {
+  protected def _stop(on_completed: Task) = Broker.BLOCKABLE_THREAD_POOL {
     this.synchronized {
       if( server!=null ) {
         server.stop
@@ -270,7 +270,7 @@ class JettyWebServer(val broker:Broker) 
     }
   }
 
-  def update(on_complete: Runnable) = dispatch_queue {
+  def update(on_complete: Task) = dispatch_queue {
     import collection.JavaConversions._
     val new_list = broker.config.web_admins.toList
     if( new_list != web_admins ) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala Tue Mar 27 17:29:24 2012
@@ -80,7 +80,10 @@ object WebSocketTransportFactory extends
 
     def dispatch_queue = dispatchQueue
 
-    protected def _start(on_completed: Runnable) = Broker.BLOCKABLE_THREAD_POOL {
+    def start(on_completed: Runnable):Unit = super.start(new TaskWrapper(on_completed))
+    def stop(on_completed: Runnable):Unit = super.stop(new TaskWrapper(on_completed))
+
+    protected def _start(on_completed: Task) = Broker.BLOCKABLE_THREAD_POOL {
       this.synchronized {
 
         IntrospectionSupport.setProperties(this, URISupport.parseParamters(uri));
@@ -142,7 +145,7 @@ object WebSocketTransportFactory extends
       }
     }
 
-    def _stop(on_complete: Runnable) = Broker.BLOCKABLE_THREAD_POOL {
+    def _stop(on_complete: Task) = Broker.BLOCKABLE_THREAD_POOL {
       this.synchronized {
         if (server != null) {
           try {
@@ -231,7 +234,10 @@ object WebSocketTransportFactory extends
 
     def dispatch_queue = dispatchQueue
 
-    protected def _start(on_completed: Runnable) = {
+    def start(on_completed: Runnable):Unit = super.start(new TaskWrapper(on_completed))
+    def stop(on_completed: Runnable):Unit = super.stop(new TaskWrapper(on_completed))
+
+    protected def _start(on_completed: Task) = {
       inbound_dispatch_queue = dispatchQueue.createQueue(null);
       inbound_dispatch_queue.suspend();
       drain_outbound_events.setTargetQueue(dispatchQueue)
@@ -245,7 +251,7 @@ object WebSocketTransportFactory extends
       on_completed.run()
     }
   
-    protected def _stop(on_completed: Runnable) = {
+    protected def _stop(on_completed: Task) = {
       inbound_dispatch_queue.resume()
       outbound_executor {
         // Wakes up any blocked reader thread..

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala Tue Mar 27 17:29:24 2012
@@ -204,7 +204,7 @@ class AnyProtocolHandler extends Protoco
 
   def assert_discriminated = {
     if( connection.service_state.is_started && !discriminated ) {
-      connection.stop
+      connection.stop(NOOP)
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala Tue Mar 27 17:29:24 2012
@@ -18,7 +18,7 @@ package org.apache.activemq.apollo.broke
 
 import java.io.IOException
 import org.apache.activemq.apollo.broker.store.MessageRecord
-import org.fusesource.hawtdispatch.transport._
+import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.dto.ConnectionStatusDTO
 import org.apache.activemq.apollo.util.{Log, ClassFinder}
 import org.apache.activemq.apollo.broker.{Broker, Message, BrokerConnection}
@@ -79,7 +79,7 @@ trait ProtocolHandler {
 
   def on_transport_failure(error:IOException) = {
     trace(error)
-    connection.stop()
+    connection.stop(NOOP)
   }
 
   def on_transport_disconnected = {}

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Tue Mar 27 17:29:24 2012
@@ -185,7 +185,7 @@ class UdpProtocolHandler extends Protoco
     } catch {
       case x =>
         warn(x)
-        connection.stop()
+        connection.stop(NOOP)
         new DefaultUdpDecoder
     }
     buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
@@ -199,7 +199,7 @@ class UdpProtocolHandler extends Protoco
         connection.transport.resumeRead()
         if(host==null) {
           warn("Could not find default virtual host")
-          connection.stop()
+          connection.stop(NOOP)
         }
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala Tue Mar 27 17:29:24 2012
@@ -16,18 +16,19 @@
  */
 package org.apache.activemq.apollo.broker.transport
 
-import _root_.java.io.IOException
-import _root_.java.net.URI
-import _root_.java.util.concurrent.atomic.AtomicBoolean
-import _root_.java.util.concurrent.atomic.AtomicInteger
+import java.io.IOException
+import java.net.URI
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicInteger
 
-import _root_.org.apache.activemq.apollo.broker._
+import org.apache.activemq.apollo.broker._
 
-import _root_.scala.collection.JavaConversions._
+import scala.collection.JavaConversions._
 import org.fusesource.hawtdispatch.transport._
 import org.apache.activemq.apollo.util._
 import java.lang.String
 import org.apache.activemq.apollo.dto.AcceptingConnectorDTO
+import org.fusesource.hawtdispatch._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -79,7 +80,7 @@ class VMTransportFactory extends Logging
 
     def stopBroker() = {
       try {
-        this.broker.stop();
+        this.broker.stop(NOOP);
         unbind(this);
       } catch {
         case e: Exception =>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/web/WebServerFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/web/WebServerFactory.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/web/WebServerFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/web/WebServerFactory.scala Tue Mar 27 17:29:24 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.apollo.broke
 
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.broker.Broker
+import org.fusesource.hawtdispatch.Task
 
 /**
  * <p>
@@ -26,7 +27,7 @@ import org.apache.activemq.apollo.broker
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait WebServer extends Service {
-  def update(on_complete:Runnable):Unit
+  def update(on_complete:Task):Unit
 }
 
 trait WebServerFactory {

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Run.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Run.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Run.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Run.scala Tue Mar 27 17:29:24 2012
@@ -115,7 +115,7 @@ class Run extends Action {
       }
 
       broker.tmp = tmp
-      broker.start()
+      broker.start(NOOP)
 
       val broker_config_monitor = new FileMonitor(conf,broker.dispatch_queue {
         broker.console_log.info("Reloading configuration file '%s'.".format(conf))
@@ -137,9 +137,9 @@ class Run extends Action {
         null
       }
       
-      if(jul_config_monitor!=null) jul_config_monitor.start
-      log4j_config_monitor.start
-      broker_config_monitor.start
+      if(jul_config_monitor!=null) jul_config_monitor.start(NOOP)
+      log4j_config_monitor.start(NOOP)
+      broker_config_monitor.start(NOOP)
 
       Runtime.getRuntime.addShutdownHook(new Thread(){
         override def run: Unit = {

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java Tue Mar 27 17:29:24 2012
@@ -62,13 +62,15 @@ abstract public class DestinationDTO {
         DestinationDTO that = (DestinationDTO) o;
 
         if (!name.equals(that.name)) return false;
+        if (path != null ? !path.equals(that.path) : that.path != null) return false;
 
         return true;
     }
 
     @Override
     public int hashCode() {
-        int result = name.hashCode();
+        int result = name != null ? name.hashCode() : 0;
+        result = 31 * result + (path != null ? path.hashCode() : 0);
         return result;
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java Tue Mar 27 17:29:24 2012
@@ -40,23 +40,6 @@ public class QueueDestinationDTO extends
     }
 
     @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        QueueDestinationDTO that = (QueueDestinationDTO) o;
-
-        if (path != null ? !path.equals(that.path) : that.path != null) return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return path != null ? path.hashCode() : 0;
-    }
-
-    @Override
     public String toString() {
         return "queue:"+name;
     }

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala Tue Mar 27 17:29:24 2012
@@ -84,7 +84,7 @@ class LevelDBStore(val config: LevelDBSt
     }
   }
 
-  protected def _start(on_completed: Runnable) = {
+  protected def _start(on_completed: Task) = {
     try {
       client = create_client
       write_executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
@@ -123,7 +123,7 @@ class LevelDBStore(val config: LevelDBSt
     }
   }
 
-  protected def _stop(on_completed: Runnable) = {
+  protected def _stop(on_completed: Task) = {
     new Thread() {
       override def run = {
         write_executor.shutdown

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala Tue Mar 27 17:29:24 2012
@@ -18,9 +18,7 @@ package org.apache.activemq.apollo.broke
  */
 
 import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport}
-import org.apache.activemq.apollo.broker.store.leveldb.LevelDBStore
 import org.apache.activemq.apollo.broker.store.leveldb.dto.LevelDBStoreDTO
-import org.apache.activemq.apollo.util.FileSupport._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue Mar 27 17:29:24 2012
@@ -186,7 +186,7 @@ class OpenwireProtocolHandler extends Pr
     if (!connection.stopped) {
       suspend_read("shutdown")
       connection_log.info(error, "Shutting connection '%s'  down due to: %s", security_context.remote_address, error)
-      connection.stop
+      connection.stop(NOOP)
     }
   }
 
@@ -275,7 +275,7 @@ class OpenwireProtocolHandler extends Pr
           case info:ConnectionInfo=> on_connection_info(info)
           case info:RemoveInfo=> on_remove_info(info)
           case info:KeepAliveInfo=> ack(info)
-          case info:ShutdownInfo=> ack(info); connection.stop
+          case info:ShutdownInfo=> ack(info); connection.stop(NOOP)
           case info:FlushCommand=> ack(info)
           case info:DestinationInfo=> on_destination_info(info)
 
@@ -372,7 +372,7 @@ class OpenwireProtocolHandler extends Pr
       // TODO: if there are too many open connections we should just close the connection
       // without waiting for the error to get sent to the client.
       queue.after(die_delay, TimeUnit.MILLISECONDS) {
-        connection.stop()
+        connection.stop(NOOP)
       }
       fail(msg, actual)
     }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala Tue Mar 27 17:29:24 2012
@@ -20,9 +20,8 @@ package org.apache.activemq.apollo.openw
 import org.scalatest.matchers.ShouldMatchers
 import org.scalatest.BeforeAndAfterEach
 import java.lang.String
-import org.apache.activemq.apollo.broker.{KeyStorage, Broker, BrokerFactory}
-import org.apache.activemq.apollo.util.{FileSupport, Logging, FunSuiteSupport, ServiceControl}
-import FileSupport._
+import org.apache.activemq.apollo.broker.{Broker, BrokerFactory}
+import org.apache.activemq.apollo.util.{Logging, FunSuiteSupport, ServiceControl}
 import javax.jms.Connection
 import org.apache.activemq.ActiveMQConnectionFactory
 import org.apache.activemq.command.{ActiveMQTopic, ActiveMQQueue}
@@ -47,7 +46,7 @@ class OpenwireTestSupport extends FunSui
   var connections = List[Connection]()
 
   override protected def afterAll() {
-    broker.stop()
+    ServiceControl.stop(broker)
   }
 
   override protected def afterEach() {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Mar 27 17:29:24 2012
@@ -703,7 +703,7 @@ class StompProtocolHandler extends Proto
       // TODO: if there are too many open connections we should just close the connection
       // without waiting for the error to get sent to the client.
       queue.after(die_delay, TimeUnit.MILLISECONDS) {
-        connection.stop()
+        connection.stop(NOOP)
       }
     }
     throw new Break()
@@ -804,12 +804,12 @@ class StompProtocolHandler extends Proto
                 on_transport_disconnected
                 if( delay ) {
                   queue.after(die_delay, TimeUnit.MILLISECONDS) {
-                    connection.stop()
+                    connection.stop(NOOP)
                   }
                 } else {
                   // no point in delaying the connection shutdown
                   // if the client does not want a receipt..
-                  connection.stop()
+                  connection.stop(NOOP)
                 }
 
               case _ =>

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala Tue Mar 27 17:29:24 2012
@@ -58,9 +58,6 @@ trait BaseService extends Service with D
   protected class STOPPING extends State { override def is_stopping = true  }
   protected class STOPPED extends State { override def is_stopped = true  }
 
-  final def start() = start(null)
-  final def stop() = stop(null)
-
   @volatile
   protected var _service_state:State = new CREATED
 
@@ -70,10 +67,10 @@ trait BaseService extends Service with D
   protected var _serviceFailure:Exception = null
   def serviceFailure = _serviceFailure
 
-  private val pending_actions = ListBuffer[Runnable]()
+  private val pending_actions = ListBuffer[Task]()
 
-  final def start(on_completed:Runnable) = {
-    def start_task:Runnable = ^{
+  final def start(on_completed:Task):Unit = {
+    def start_task:Task = ^{
       def done = {
         pending_actions.foreach(dispatch_queue.execute _)
         pending_actions.clear()
@@ -119,8 +116,8 @@ trait BaseService extends Service with D
     start_task >>: dispatch_queue
   }
 
-  final def stop(on_completed:Runnable) = {
-    def stop_task:Runnable = ^{
+  final def stop(on_completed:Task):Unit = {
+    def stop_task:Task = ^{
       def done = {
         val tmp = pending_actions.toArray
         pending_actions.clear
@@ -162,7 +159,7 @@ trait BaseService extends Service with D
     stop_task >>: dispatch_queue
   }
 
-  protected def _start(on_completed:Runnable)
-  protected def _stop(on_completed:Runnable)
+  protected def _start(on_completed:Task)
+  protected def _stop(on_completed:Task)
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileCache.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileCache.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileCache.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileCache.scala Tue Mar 27 17:29:24 2012
@@ -38,12 +38,12 @@ class FileMonitor(file:File, change_list
   var last_modified = 0L
   var state_ver = 0
 
-  protected def _stop(on_completed: Runnable) = {
+  protected def _stop(on_completed: Task) = {
     state_ver+=1
     on_completed.run()
   }
 
-  protected def _start(on_completed: Runnable) = {
+  protected def _start(on_completed: Task) = {
     last_data = file.read_bytes
     last_modified = file.lastModified()
     state_ver+=1

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/JavaBaseService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/JavaBaseService.java?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/JavaBaseService.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/JavaBaseService.java Tue Mar 27 17:29:24 2012
@@ -17,6 +17,7 @@
 package org.apache.activemq.apollo.util;
 
 import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
 
 import java.util.LinkedList;
 
@@ -39,16 +40,16 @@ public abstract class JavaBaseService im
     }
 
     static class CallbackSupport extends State {
-        LinkedList<Runnable> callbacks = new LinkedList<Runnable>();
+        LinkedList<Task> callbacks = new LinkedList<Task>();
 
-        void add(Runnable r) {
+        void add(Task r) {
             if (r != null) {
                 callbacks.add(r);
             }
         }
 
         void done() {
-            for (Runnable callback : callbacks) {
+            for (Task callback : callbacks) {
                 callback.run();
             }
         }
@@ -70,23 +71,15 @@ public abstract class JavaBaseService im
 
     protected State _serviceState = CREATED;
 
-    final public void start() {
-        start(null);
-    }
-
-    final public void stop() {
-        stop(null);
-    }
-
-    final public void start(final Runnable onCompleted) {
-        getDispatchQueue().execute(new Runnable() {
+    final public void start(final Task onCompleted) {
+        getDispatchQueue().execute(new Task() {
             public void run() {
                 if (_serviceState == CREATED ||
                         _serviceState == STOPPED) {
                     final STARTING state = new STARTING();
                     state.add(onCompleted);
                     _serviceState = state;
-                    _start(new Runnable() {
+                    _start(new Task() {
                         public void run() {
                             _serviceState = STARTED;
                             state.done();
@@ -108,14 +101,14 @@ public abstract class JavaBaseService im
         });
     }
 
-    final public void stop(final Runnable onCompleted) {
-        getDispatchQueue().execute(new Runnable() {
+    final public void stop(final Task onCompleted) {
+        getDispatchQueue().execute(new Task() {
             public void run() {
                 if (_serviceState == STARTED) {
                     final STOPPING state = new STOPPING();
                     state.add(onCompleted);
                     _serviceState = state;
-                    _stop(new Runnable() {
+                    _stop(new Task() {
                         public void run() {
                             _serviceState = STOPPED;
                             state.done();
@@ -151,8 +144,8 @@ public abstract class JavaBaseService im
 
     abstract protected DispatchQueue getDispatchQueue();
 
-    abstract protected void _start(Runnable onCompleted);
+    abstract protected void _start(Task onCompleted);
 
-    abstract protected void _stop(Runnable onCompleted);
+    abstract protected void _stop(Task onCompleted);
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Service.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Service.java?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Service.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Service.java Tue Mar 27 17:29:24 2012
@@ -17,6 +17,8 @@
 package org.apache.activemq.apollo.util;
 
 
+import org.fusesource.hawtdispatch.Task;
+
 /**
  * The core lifecyle interface for ActiveMQ components.
  *
@@ -25,29 +27,17 @@ package org.apache.activemq.apollo.util;
 public interface Service {
 
     /**
-     * Starts the service.  No guarantee is given that the service has fully started
-     * by the time this method returns.
-     */
-    void start() throws Exception;
-
-    /**
      * Starts the service.  Executes the onComplete runnable once the service has fully started up.
      *
      * @param onComplete my be set to null if not interested in a callback.
      */
-    void start(Runnable onComplete) throws Exception;
-
-    /**
-     * Stops the service.  No guarantee is given that the service has fully stopped
-     * by the time this method returns.
-     */
-    void stop() throws Exception;
+    void start(Task onComplete) throws Exception;
 
     /**
      * Stops the service.  Executes the onComplete runnable once the service has fully stopped.
      *
      * @param onComplete my be set to null if not interested in a callback.
      */
-    void stop(Runnable onComplete) throws Exception;
+    void stop(Task onComplete) throws Exception;
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala Tue Mar 27 17:29:24 2012
@@ -17,7 +17,6 @@
 
 package org.apache.activemq.apollo.util
 
-import collection.mutable.ListBuffer
 
 /*
   Simple trait to cut down on the code necessary to manage BaseService instances
@@ -55,5 +54,12 @@ object ServiceControl {
     controlService(false, service, action)
   }
 
+  def start(service: Service) = {
+    controlService(true, service, "Starting "+service)
+  }
+
+  def stop(service: Service) = {
+    controlService(false, service, "Stopping "+service)
+  }
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceStopper.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceStopper.java?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceStopper.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceStopper.java Tue Mar 27 17:29:24 2012
@@ -16,16 +16,16 @@
  */
 package org.apache.activemq.apollo.util;
 
-import java.util.Iterator;
-import java.util.List;
-
+import org.fusesource.hawtdispatch.internal.util.RunnableCountDownLatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 /**
  * A helper class used to stop a bunch of services, catching and logging any
  * exceptions and then throwing the first exception when everything is stoped.
- * 
+ *
  * @version $Revision: 1.1 $
  */
 public class ServiceStopper {
@@ -37,7 +37,9 @@ public class ServiceStopper {
     public void stop(Service service) {
         try {
             if (service != null) {
-                service.stop();
+                RunnableCountDownLatch latch = new RunnableCountDownLatch(1);
+                service.stop(latch);
+                latch.await();
             }
         } catch (Exception e) {
             onException(service, e);
@@ -59,10 +61,18 @@ public class ServiceStopper {
     /**
      * Stops a list of services
      */
-    public void stopServices(List services) {
-        for (Iterator iter = services.iterator(); iter.hasNext();) {
-            Service service = (Service)iter.next();
-            stop(service);
+    public void stopServices(List<Service> services) throws Exception {
+        RunnableCountDownLatch latch = new RunnableCountDownLatch(services.size());
+        for (int i = 0; i < services.size(); i++) {
+            Service service =  services.get(i);
+            try {
+                service.stop(latch);
+            } catch (Exception e) {
+                onException(service, e);
+            }
+        }
+        if( firstException==null ) {
+            latch.await();
         }
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceSupport.java?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceSupport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceSupport.java Tue Mar 27 17:29:24 2012
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.fusesource.hawtdispatch.internal.util.RunnableCountDownLatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +40,9 @@ public abstract class ServiceSupport imp
 
     public static void dispose(Service service) {
         try {
-            service.stop();
+            RunnableCountDownLatch latch = new RunnableCountDownLatch(1);
+            service.stop(latch);
+            latch.await();
         } catch (Exception e) {
             LOG.debug("Could not stop service: " + service + ". Reason: " + e, e);
         }

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1305912&r1=1305911&r2=1305912&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Tue Mar 27 17:29:24 2012
@@ -687,7 +687,7 @@ case class BrokerResource() extends Reso
   def post_connector_stop(@PathParam("id") id : String):Unit = unwrap_future_result {
     with_connector(id) { connector =>
       admining(connector.broker) {
-        connector.stop
+        connector.stop(NOOP)
       }
     }
     result(strip_resolve(".."))
@@ -697,7 +697,7 @@ case class BrokerResource() extends Reso
   def post_connector_start(@PathParam("id") id : String):Unit = unwrap_future_result {
     with_connector(id) { connector =>
       admining(connector.broker) {
-        connector.start
+        connector.start(NOOP)
       }
     }
     result(strip_resolve(".."))
@@ -762,7 +762,7 @@ case class BrokerResource() extends Reso
   def connection_delete(@PathParam("id") id : Long):Unit = unwrap_future_result {
     with_connection(id){ connection=>
       admining(connection.connector.broker) {
-        connection.stop
+        connection.stop(NOOP)
       }
     }
   }



Mime
View raw message