activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1240838 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Date Sun, 05 Feb 2012 21:53:54 GMT
Author: chirino
Date: Sun Feb  5 21:53:53 2012
New Revision: 1240838

URL: http://svn.apache.org/viewvc?rev=1240838&view=rev
Log:
Making the Router more consistent. It's expected that the caller with switch to the virtual
host's dispatch queue before accessing the router.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1240838&r1=1240837&r2=1240838&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Sun Feb  5 21:53:53 2012
@@ -1015,21 +1015,20 @@ class LocalRouter(val virtual_host:Virtu
   }
 
   def unbind(addresses: Array[_ <: BindAddress], consumer: DeliveryConsumer, persistent:Boolean,
security: SecurityContext) = {
+    dispatch_queue.assertExecuting()
     consumer.retain
-    dispatch_queue {
-      addresses.foreach { address=>
-        address.domain match {
-          case "topic" =>
-            topic_domain.unbind(address, consumer, persistent, security)
-          case "queue" =>
-            queue_domain.unbind(address, consumer, persistent, security)
-          case "dsub" =>
-            dsub_domain.unbind(address, consumer, persistent, security)
-          case _ => sys.error("Unknown domain: "+address.domain)
-        }
+    addresses.foreach { address=>
+      address.domain match {
+        case "topic" =>
+          topic_domain.unbind(address, consumer, persistent, security)
+        case "queue" =>
+          queue_domain.unbind(address, consumer, persistent, security)
+        case "dsub" =>
+          dsub_domain.unbind(address, consumer, persistent, security)
+        case _ => sys.error("Unknown domain: "+address.domain)
       }
-      consumer.release
     }
+    consumer.release
   }
 
   def connect(addresses: Array[_ <: ConnectAddress], producer: BindableDeliveryProducer,
security: SecurityContext):Option[String] = {
@@ -1073,21 +1072,20 @@ class LocalRouter(val virtual_host:Virtu
   }
 
   def disconnect(addresses:Array[_ <: ConnectAddress], producer:BindableDeliveryProducer)
= {
-    dispatch_queue {
-      addresses.foreach { address=>
-        address.domain match {
-          case "topic" =>
-            topic_domain.disconnect(address, producer)
-          case "queue" =>
-            queue_domain.disconnect(address, producer)
-          case "dsub" =>
-            dsub_domain.disconnect(address, producer)
-          case _ => sys.error("Unknown domain: "+address.domain)
-        }
+    dispatch_queue.assertExecuting()
+    addresses.foreach { address=>
+      address.domain match {
+        case "topic" =>
+          topic_domain.disconnect(address, producer)
+        case "queue" =>
+          queue_domain.disconnect(address, producer)
+        case "dsub" =>
+          dsub_domain.disconnect(address, producer)
+        case _ => sys.error("Unknown domain: "+address.domain)
       }
-      producer.disconnected()
-      producer.release()
     }
+    producer.disconnected()
+    producer.release()
   }
 
   def create(addresses:Array[_ <: DestinationAddress], security: SecurityContext):Option[String]
= {
@@ -1158,15 +1156,11 @@ class LocalRouter(val virtual_host:Virtu
     }
   }
 
-
-  def get_or_create_destination(adress: DestinationAddress, security: SecurityContext) =
dispatch_queue ! {
-    _get_or_create_destination(adress, security)
-  }
-
   /**
    * Returns the previously created queue if it already existed.
    */
-  def _get_or_create_destination(address: DestinationAddress, security:SecurityContext):
Result[DomainDestination, String] = {
+  def get_or_create_destination(address: DestinationAddress, security:SecurityContext): Result[DomainDestination,
String] = {
+    dispatch_queue.assertExecuting()
     address.domain match {
       case "queue" => queue_domain.get_or_create_destination(address, security)
       case "topic" => topic_domain.get_or_create_destination(address, security)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1240838&r1=1240837&r2=1240838&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Sun Feb  5 21:53:53 2012
@@ -84,7 +84,7 @@ object VirtualHost extends Log {
 class VirtualHost(val broker: Broker, val id:String) extends BaseService with SecuredResource
{
   import VirtualHost._
   
-  override val dispatch_queue:DispatchQueue = createQueue("virtual-host") // getGlobalQueue(DispatchPriority.HIGH).createQueue("virtual-host")
+  override val dispatch_queue:DispatchQueue = createQueue("virtual-host")
 
   var config:VirtualHostDTO = _
   val router:Router = new LocalRouter(this)

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1240838&r1=1240837&r2=1240838&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Sun Feb  5 21:53:53 2012
@@ -220,8 +220,11 @@ class OpenwireProtocolHandler extends Pr
       heart_beat_monitor.stop
 
       import collection.JavaConversions._
-      producerRoutes.foreach{
-        case (dests, route) => host.router.disconnect(dests.toArray, route)
+      producerRoutes.foreach{ case (dests, route) =>
+        val addresses = dests.toArray
+        host.dispatch_queue {
+          host.router.disconnect(addresses, route)
+        }
       }
       producerRoutes.clear
 
@@ -896,7 +899,9 @@ class OpenwireProtocolHandler extends Pr
     }
 
     def dettach = {
-      host.router.unbind(addresses, this, false , security_context)
+      host.dispatch_queue {
+        host.router.unbind(addresses, this, false , security_context)
+      }
       parent.consumers.remove(info.getConsumerId)
       all_consumers.remove(info.getConsumerId)
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1240838&r1=1240837&r2=1240838&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Sun Feb  5 21:53:53 2012
@@ -702,13 +702,18 @@ class StompProtocolHandler extends Proto
       dead = true;
 
       import collection.JavaConversions._
-      producerRoutes.foreach{
-        case(dests,route)=> host.router.disconnect(dests.toArray, route)
+      producerRoutes.foreach{ case(dests,route)=>
+        val addresses = dests.toArray
+        host.dispatch_queue {
+          host.router.disconnect(addresses, route)
+        }
       }
       producerRoutes.clear
-      consumers.foreach {
-        case (_,consumer)=>
-          host.router.unbind(consumer.addresses, consumer, false , security_context)
+      consumers.foreach { case (_,consumer)=>
+        val addresses = consumer.addresses
+        host.dispatch_queue {
+          host.router.unbind(addresses, consumer, false , security_context)
+        }
       }
       consumers = Map()
       security_context.logout( e => {
@@ -1281,7 +1286,9 @@ class StompProtocolHandler extends Proto
         // consumer gets disposed after all producer stop sending to it...
         consumer.setDisposer(^{ send_receipt(headers) })
         consumers -= id
-        host.router.unbind(consumer.addresses, consumer, persistent, security_context)
+        host.dispatch_queue {
+          host.router.unbind(consumer.addresses, consumer, persistent, security_context)
+        }
     }
   }
 



Mime
View raw message