Author: chirino
Date: Sun Feb 5 21:53:53 2012
New Revision: 1240838
URL: http://svn.apache.org/viewvc?rev=1240838&view=rev
Log:
Making the Router more consistent. It's expected that the caller with switch to the virtual
host's dispatch queue before accessing the router.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1240838&r1=1240837&r2=1240838&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Sun Feb 5 21:53:53 2012
@@ -1015,21 +1015,20 @@ class LocalRouter(val virtual_host:Virtu
}
def unbind(addresses: Array[_ <: BindAddress], consumer: DeliveryConsumer, persistent:Boolean,
security: SecurityContext) = {
+ dispatch_queue.assertExecuting()
consumer.retain
- dispatch_queue {
- addresses.foreach { address=>
- address.domain match {
- case "topic" =>
- topic_domain.unbind(address, consumer, persistent, security)
- case "queue" =>
- queue_domain.unbind(address, consumer, persistent, security)
- case "dsub" =>
- dsub_domain.unbind(address, consumer, persistent, security)
- case _ => sys.error("Unknown domain: "+address.domain)
- }
+ addresses.foreach { address=>
+ address.domain match {
+ case "topic" =>
+ topic_domain.unbind(address, consumer, persistent, security)
+ case "queue" =>
+ queue_domain.unbind(address, consumer, persistent, security)
+ case "dsub" =>
+ dsub_domain.unbind(address, consumer, persistent, security)
+ case _ => sys.error("Unknown domain: "+address.domain)
}
- consumer.release
}
+ consumer.release
}
def connect(addresses: Array[_ <: ConnectAddress], producer: BindableDeliveryProducer,
security: SecurityContext):Option[String] = {
@@ -1073,21 +1072,20 @@ class LocalRouter(val virtual_host:Virtu
}
def disconnect(addresses:Array[_ <: ConnectAddress], producer:BindableDeliveryProducer)
= {
- dispatch_queue {
- addresses.foreach { address=>
- address.domain match {
- case "topic" =>
- topic_domain.disconnect(address, producer)
- case "queue" =>
- queue_domain.disconnect(address, producer)
- case "dsub" =>
- dsub_domain.disconnect(address, producer)
- case _ => sys.error("Unknown domain: "+address.domain)
- }
+ dispatch_queue.assertExecuting()
+ addresses.foreach { address=>
+ address.domain match {
+ case "topic" =>
+ topic_domain.disconnect(address, producer)
+ case "queue" =>
+ queue_domain.disconnect(address, producer)
+ case "dsub" =>
+ dsub_domain.disconnect(address, producer)
+ case _ => sys.error("Unknown domain: "+address.domain)
}
- producer.disconnected()
- producer.release()
}
+ producer.disconnected()
+ producer.release()
}
def create(addresses:Array[_ <: DestinationAddress], security: SecurityContext):Option[String]
= {
@@ -1158,15 +1156,11 @@ class LocalRouter(val virtual_host:Virtu
}
}
-
- def get_or_create_destination(adress: DestinationAddress, security: SecurityContext) =
dispatch_queue ! {
- _get_or_create_destination(adress, security)
- }
-
/**
* Returns the previously created queue if it already existed.
*/
- def _get_or_create_destination(address: DestinationAddress, security:SecurityContext):
Result[DomainDestination, String] = {
+ def get_or_create_destination(address: DestinationAddress, security:SecurityContext): Result[DomainDestination,
String] = {
+ dispatch_queue.assertExecuting()
address.domain match {
case "queue" => queue_domain.get_or_create_destination(address, security)
case "topic" => topic_domain.get_or_create_destination(address, security)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1240838&r1=1240837&r2=1240838&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Sun Feb 5 21:53:53 2012
@@ -84,7 +84,7 @@ object VirtualHost extends Log {
class VirtualHost(val broker: Broker, val id:String) extends BaseService with SecuredResource
{
import VirtualHost._
- override val dispatch_queue:DispatchQueue = createQueue("virtual-host") // getGlobalQueue(DispatchPriority.HIGH).createQueue("virtual-host")
+ override val dispatch_queue:DispatchQueue = createQueue("virtual-host")
var config:VirtualHostDTO = _
val router:Router = new LocalRouter(this)
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1240838&r1=1240837&r2=1240838&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Sun Feb 5 21:53:53 2012
@@ -220,8 +220,11 @@ class OpenwireProtocolHandler extends Pr
heart_beat_monitor.stop
import collection.JavaConversions._
- producerRoutes.foreach{
- case (dests, route) => host.router.disconnect(dests.toArray, route)
+ producerRoutes.foreach{ case (dests, route) =>
+ val addresses = dests.toArray
+ host.dispatch_queue {
+ host.router.disconnect(addresses, route)
+ }
}
producerRoutes.clear
@@ -896,7 +899,9 @@ class OpenwireProtocolHandler extends Pr
}
def dettach = {
- host.router.unbind(addresses, this, false , security_context)
+ host.dispatch_queue {
+ host.router.unbind(addresses, this, false , security_context)
+ }
parent.consumers.remove(info.getConsumerId)
all_consumers.remove(info.getConsumerId)
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1240838&r1=1240837&r2=1240838&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Sun Feb 5 21:53:53 2012
@@ -702,13 +702,18 @@ class StompProtocolHandler extends Proto
dead = true;
import collection.JavaConversions._
- producerRoutes.foreach{
- case(dests,route)=> host.router.disconnect(dests.toArray, route)
+ producerRoutes.foreach{ case(dests,route)=>
+ val addresses = dests.toArray
+ host.dispatch_queue {
+ host.router.disconnect(addresses, route)
+ }
}
producerRoutes.clear
- consumers.foreach {
- case (_,consumer)=>
- host.router.unbind(consumer.addresses, consumer, false , security_context)
+ consumers.foreach { case (_,consumer)=>
+ val addresses = consumer.addresses
+ host.dispatch_queue {
+ host.router.unbind(addresses, consumer, false , security_context)
+ }
}
consumers = Map()
security_context.logout( e => {
@@ -1281,7 +1286,9 @@ class StompProtocolHandler extends Proto
// consumer gets disposed after all producer stop sending to it...
consumer.setDisposer(^{ send_receipt(headers) })
consumers -= id
- host.router.unbind(consumer.addresses, consumer, persistent, security_context)
+ host.dispatch_queue {
+ host.router.unbind(consumer.addresses, consumer, persistent, security_context)
+ }
}
}
|