Author: chirino Date: Wed Jul 7 03:57:24 2010 New Revision: 961106 URL: http://svn.apache.org/viewvc?rev=961106&view=rev Log: renamed queue field to dispatchQueue for consistency Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961106&r1=961105&r2=961106&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:57:24 2010 @@ -260,7 +260,7 @@ trait QueueLifecyleListener { -object Queue { +object Queue extends Log { val maxOutboundSize = 1024*1204*5 } @@ -268,18 +268,22 @@ object Queue { * * @author Hiram Chirino */ -class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer { +class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer with DispatchLogging { + override protected def log = Queue - override val queue:DispatchQueue = createQueue("queue:"+destination); - queue.setTargetQueue(getRandomThreadQueue) + override val dispatchQueue:DispatchQueue = createQueue("queue:"+destination); + dispatchQueue.setTargetQueue(getRandomThreadQueue) + dispatchQueue { + debug("created queue for: "+destination) + } val delivery_buffer = new DeliveryBuffer delivery_buffer.eventHandler = ^{ drain_delivery_buffer } - val session_manager = new DeliverySessionManager(delivery_buffer, queue) + val session_manager = new DeliverySessionManager(delivery_buffer, dispatchQueue) setDisposer(^{ - queue.release + dispatchQueue.release session_manager.release }) @@ -289,7 +293,7 @@ class Queue(val destination:Destination) def deliver(value:Delivery):Unit = { val delivery = Delivery(value) delivery.setDisposer(^{ - ^{ completed(value) } >>:queue + ^{ completed(value) } >>:dispatchQueue }) consumer.deliver(delivery); delivery.release @@ -310,12 +314,12 @@ class Queue(val destination:Destination) def connected(consumers:List[DeliveryConsumer]) = bind(consumers) def bind(consumers:List[DeliveryConsumer]) = retaining(consumers) { for ( consumer <- consumers ) { - val cs = new ConsumerState(consumer.open_session(queue)) + val cs = new ConsumerState(consumer.open_session(dispatchQueue)) allConsumers += consumer->cs readyConsumers.addLast(cs) } drain_delivery_buffer - } >>: queue + } >>: dispatchQueue def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) { for ( consumer <- consumers ) { @@ -328,14 +332,14 @@ class Queue(val destination:Destination) case None=> } } - } >>: queue + } >>: dispatchQueue def disconnected() = throw new RuntimeException("unsupported") def collocate(value:DispatchQueue):Unit = { - if( value.getTargetQueue ne queue.getTargetQueue ) { - println(queue.getLabel+" co-locating with: "+value.getLabel); - this.queue.setTargetQueue(value.getTargetQueue) + if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) { + println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel); + this.dispatchQueue.setTargetQueue(value.getTargetQueue) } } Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961106&r1=961105&r2=961106&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:57:24 2010 @@ -46,24 +46,21 @@ abstract class Connection() extends Tran import Connection._ val id = next_id val dispatchQueue = createQueue(id) - - def stopped = serviceState match { - case STOPPED => true - case x:STOPPING => true - case _ => false - } + var stopped = true var transport:Transport = null override def toString = id override protected def _start(onCompleted:Runnable) = { + stopped = false transport.setDispatchQueue(dispatchQueue); transport.setTransportListener(Connection.this); transport.start(onCompleted) } override protected def _stop(onCompleted:Runnable) = { + stopped = true transport.stop(onCompleted) } Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961106&r1=961105&r2=961106&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:57:24 2010 @@ -45,7 +45,7 @@ trait DeliverySession { */ trait DeliveryConsumer extends Retained { def matches(message:Delivery) - val queue:DispatchQueue; + val dispatchQueue:DispatchQueue; def open_session(producer_queue:DispatchQueue):DeliverySession } Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961106&r1=961105&r2=961106&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 03:57:24 2010 @@ -198,7 +198,7 @@ class Router(var queue:DispatchQueue) ex trait Route extends Retained { val destination:Destination - val queue:DispatchQueue + val dispatchQueue:DispatchQueue val metric = new AtomicLong(); def connected(targets:List[DeliveryConsumer]):Unit @@ -211,15 +211,14 @@ trait Route extends Retained { /** * @author Hiram Chirino */ -class DeliveryProducerRoute(val destination:Destination, val queue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route with DispatchLogging { +class DeliveryProducerRoute(val destination:Destination, override val dispatchQueue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route with DispatchLogging { override protected def log = Router - protected def dispatchQueue:DispatchQueue = queue // Retain the queue while we are retained. - queue.retain + dispatchQueue.retain setDisposer(^{ - queue.release + dispatchQueue.release }) var targets = List[DeliverySession]() @@ -227,16 +226,16 @@ class DeliveryProducerRoute(val destinat def connected(targets:List[DeliveryConsumer]) = retaining(targets) { internal_bind(targets) on_connected - } >>: queue + } >>: dispatchQueue def bind(targets:List[DeliveryConsumer]) = retaining(targets) { internal_bind(targets) - } >>: queue + } >>: dispatchQueue private def internal_bind(values:List[DeliveryConsumer]) = { values.foreach{ x=> debug("producer route attaching to conusmer.") - targets = x.open_session(queue) :: targets + targets = x.open_session(dispatchQueue) :: targets } } @@ -249,7 +248,7 @@ class DeliveryProducerRoute(val destinat } rc } - } >>: queue + } >>: dispatchQueue def disconnected() = ^ { this.targets.foreach { x=> @@ -257,7 +256,7 @@ class DeliveryProducerRoute(val destinat x.close x.consumer.release } - } >>: queue + } >>: dispatchQueue protected def on_connected = {} protected def on_disconnected = {} Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961106&r1=961105&r2=961106&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul 7 03:57:24 2010 @@ -37,7 +37,7 @@ import org.apache.activemq.util.{IOHelpe import scala.util.matching.Regex object BaseBrokerPerfSupport { - var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "2")) + var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "5")) var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "3000")) // Set to use tcp IO Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961106&r1=961105&r2=961106&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:57:24 2010 @@ -59,13 +59,13 @@ class StompProtocolHandler extends Proto class SimpleConsumer(val destination:Destination) extends BaseRetained with DeliveryConsumer { - val queue = StompProtocolHandler.this.dispatchQueue - val session_manager = new DeliverySessionManager(outboundChannel, queue) + val dispatchQueue = StompProtocolHandler.this.dispatchQueue + val session_manager = new DeliverySessionManager(outboundChannel, dispatchQueue) - queue.retain + dispatchQueue.retain setDisposer(^{ session_manager.release - queue.release + dispatchQueue.release }) def matches(message:Delivery) = true