activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1233181 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apol...
Date Thu, 19 Jan 2012 05:16:15 GMT
Author: chirino
Date: Thu Jan 19 05:16:15 2012
New Revision: 1233181

URL: http://svn.apache.org/viewvc?rev=1233181&view=rev
Log:
Fixes APLO-127: Refactored the LocalRouter implementation so that the durable sub logic in the topic domain is now maintained in a dsub domain.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=1233181&r1=1233180&r2=1233181&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Thu Jan 19 05:16:15 2012
@@ -144,11 +144,11 @@ class QueueDomainQueueBinding(val bindin
   def binding_kind = POINT_TO_POINT_KIND
 
   def unbind(node: LocalRouter, queue: Queue) = {
-    node.queue_domain.unbind(queue)
+    node.local_queue_domain.unbind(queue)
   }
 
   def bind(node: LocalRouter, queue: Queue) = {
-    node.queue_domain.bind(queue)
+    node.local_queue_domain.bind(queue)
   }
 
   val id = binding_dto.name(LocalRouter.destination_parser.path_separator)
@@ -215,17 +215,17 @@ object DurableSubscriptionQueueBinding e
 class DurableSubscriptionQueueBinding(val binding_data:Buffer, val binding_dto:DurableSubscriptionDestinationDTO) extends Binding {
   import DurableSubscriptionQueueBinding._
 
-  val destination = LocalRouter.destination_parser.decode_path(binding_dto.path)
+  val destination = Path(binding_dto.subscription_id)
 
   def binding_kind = DURABLE_SUB_KIND
 
 
   def unbind(router: LocalRouter, queue: Queue) = {
-    router.topic_domain.unbind_dsub(queue)
+    router.local_dsub_domain.unbind(queue)
   }
 
   def bind(router: LocalRouter, queue: Queue) = {
-    router.topic_domain.bind_dsub(queue)
+    router.local_dsub_domain.bind(queue)
   }
 
   def id = binding_dto.subscription_id

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala?rev=1233181&r1=1233180&r2=1233181&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala Thu Jan 19 05:16:15 2012
@@ -131,7 +131,7 @@ class DestinationParser extends PathPars
       return new TopicDestinationDTO(parts(name))
     } else if (dsub_prefix != null && value.startsWith(dsub_prefix)) {
       var name = value.substring(dsub_prefix.length)
-      return new DurableSubscriptionDestinationDTO(name)
+      return new DurableSubscriptionDestinationDTO(name).direct();
     } else if (temp_topic_prefix != null && value.startsWith(temp_topic_prefix)) {
       var name = value.substring(temp_topic_prefix.length)
       return new TopicDestinationDTO(parts(name)).temp(true)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1233181&r1=1233180&r2=1233181&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Thu Jan 19 05:16:15 2012
@@ -23,12 +23,12 @@ import path._
 import path.PathParser.PathException
 import java.util.concurrent.TimeUnit
 import scala.Array
-import org.apache.activemq.apollo.dto._
 import java.util.{Arrays, ArrayList}
 import collection.mutable.{LinkedHashMap, HashMap}
 import collection.{Iterable, JavaConversions}
 import security.SecuredResource.{TopicKind, QueueKind}
 import security.{SecuredResource, SecurityContext}
+import org.apache.activemq.apollo.dto._
 
 object DestinationMetricsSupport {
 
@@ -220,7 +220,7 @@ class LocalRouter(val virtual_host:Virtu
     }
   }
 
-  trait Domain[D <: DomainDestination] {
+  trait Domain[D <: DomainDestination, DTO <: DestinationDTO] {
 
     // holds all the destinations in the domain by id
     var destination_by_id = LinkedHashMap[String, D]()
@@ -244,10 +244,13 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def can_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Option[String]
-    def create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[D,String]
+    def auto_create_on_connect = auto_create_destinations
+    def auto_create_on_bind = auto_create_destinations
+
+    def can_create_destination(path:Path, destination:DTO, security:SecurityContext):Option[String]
+    def create_destination(path:Path, destination:DTO, security:SecurityContext):Result[D,String]
 
-    def get_or_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[D,String] = {
+    def get_or_create_destination(path:Path, destination:DTO, security:SecurityContext):Result[D,String] = {
       Option(destination_by_path.chooseValue(path)).
       map(Success(_)).
       getOrElse( create_destination(path, destination, security))
@@ -276,7 +279,7 @@ class LocalRouter(val virtual_host:Virtu
       destination_by_id.remove(dest.id)
     }
 
-    def can_destroy_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Option[String] = {
+    def can_destroy_destination(path:Path, destination:DTO, security:SecurityContext):Option[String] = {
       if( security==null ) {
         return None
       }
@@ -297,11 +300,11 @@ class LocalRouter(val virtual_host:Virtu
       }
       None
     }
-    def destroy_destination(path:Path, destination:DestinationDTO, security: SecurityContext):Unit
+    def destroy_destination(path:Path, destination:DTO, security: SecurityContext):Unit
 
     def bind_action(consumer:DeliveryConsumer):String
 
-    def can_bind_all(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Option[String] = {
+    def can_bind_all(path:Path, destination:DTO, consumer:DeliveryConsumer, security:SecurityContext):Option[String] = {
       if( security==null ) {
         return None
       }
@@ -325,7 +328,7 @@ class LocalRouter(val virtual_host:Virtu
 
       // Should we attempt to auto create the destination?
       if( !wildcard ) {
-        if ( matches.isEmpty && auto_create_destinations ) {
+        if ( matches.isEmpty && auto_create_on_bind ) {
           val rc = create_destination(path, destination, security)
           if( rc.failed ) {
             return Some(rc.failure)
@@ -346,7 +349,7 @@ class LocalRouter(val virtual_host:Virtu
       None
     }
 
-    def bind(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Unit = {
+    def bind(path:Path, destination:DTO, consumer:DeliveryConsumer, security:SecurityContext):Unit = {
       var matches = get_destination_matches(path)
       matches.foreach { dest=>
         if( authorizer.can(security, bind_action(consumer), dest) ) {
@@ -360,7 +363,7 @@ class LocalRouter(val virtual_host:Virtu
       consumers_by_path.put(path, new ConsumerContext(destination, consumer, security))
     }
 
-    def unbind(destination:DestinationDTO, consumer:DeliveryConsumer, persistent:Boolean, security: SecurityContext) = {
+    def unbind(destination:DTO, consumer:DeliveryConsumer, persistent:Boolean, security: SecurityContext) = {
       val path = destination_parser.decode_path(destination.path)
       if( consumers_by_path.remove(path, new ConsumerContext(destination, consumer, null) ) ) {
         get_destination_matches(path).foreach{ dest=>
@@ -373,7 +376,7 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def can_connect_all(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Option[String] = {
+    def can_connect_all(path:Path, destination:DTO, producer:BindableDeliveryProducer, security:SecurityContext):Option[String] = {
 
       val wildcard = PathParser.containsWildCards(path)
       var matches = get_destination_matches(path)
@@ -387,7 +390,7 @@ class LocalRouter(val virtual_host:Virtu
       } else {
 
         // Should we attempt to auto create the destination?
-        if ( matches.isEmpty && auto_create_destinations ) {
+        if ( matches.isEmpty && auto_create_on_connect ) {
           val rc = create_destination(path, destination, security)
           if( rc.failed ) {
             return Some(rc.failure)
@@ -411,7 +414,7 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def connect(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
+    def connect(path:Path, destination:DTO, producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
       get_destination_matches(path).foreach { dest=>
         if( authorizer.can(security, "send", dest) ) {
           dest.connect(destination, producer)
@@ -423,7 +426,7 @@ class LocalRouter(val virtual_host:Virtu
       producers_by_path.put(path, new ProducerContext(destination, producer, security))
     }
 
-    def disconnect(destination:DestinationDTO, producer:BindableDeliveryProducer) = {
+    def disconnect(destination:DTO, producer:BindableDeliveryProducer) = {
       val path = destination_parser.decode_path(destination.path)
       producers_by_path.remove(path, new ProducerContext(destination, producer, null))
       get_destination_matches(path).foreach { dest=>
@@ -435,34 +438,9 @@ class LocalRouter(val virtual_host:Virtu
     }
 
   }
-  val topic_domain = new TopicDomain
-  class TopicDomain extends Domain[Topic] {
 
-    // Stores durable subscription queues.
-    val durable_subscriptions_by_path = new PathMap[Queue]()
-    val durable_subscriptions_by_id = HashMap[String, Queue]()
 
-    def get_or_create_durable_subscription(destination:DurableSubscriptionDestinationDTO):Queue = {
-      val key = destination.subscription_id
-      durable_subscriptions_by_id.get( key ).getOrElse {
-        val queue = _create_queue(BindingFactory.create(destination))
-        durable_subscriptions_by_id.put(key, queue)
-        queue
-      }
-    }
-
-    def destroy_durable_subscription(queue:Queue):Unit = {
-      val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
-      if( durable_subscriptions_by_id.remove( destination.subscription_id ).isDefined ) {
-        val path = queue.binding.destination
-        durable_subscriptions_by_path.remove(path, queue)
-        var matches = get_destination_matches(path)
-        matches.foreach( _.unbind_durable_subscription(destination, queue) )
-        _destroy_queue(queue)
-      }
-    }
-
-    def dsub_config(subid:String) = DurableSubscriptionQueueBinding.dsub_config(virtual_host, subid)
+  class TopicDomain extends Domain[Topic, TopicDestinationDTO] {
 
     def topic_config(name:Path):TopicDTO = {
       import collection.JavaConversions._
@@ -472,36 +450,7 @@ class LocalRouter(val virtual_host:Virtu
       }.getOrElse(new TopicDTO)
     }
 
-    override def connect(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
-      destination match {
-        case destination:DurableSubscriptionDestinationDTO =>
-
-          // Connects a producer directly to a durable subscription..
-          durable_subscriptions_by_id.get(destination.subscription_id).foreach { dest=>
-            dest.connect(destination, producer)
-            for( l <- router_listeners) {
-              l.on_connect(dest, producer, security)
-            }
-          }
-
-        case _ => super.connect(path, destination, producer, security)
-      }
-    }
-
-    override def disconnect(destination:DestinationDTO, producer:BindableDeliveryProducer) = {
-      destination match {
-        case destination:DurableSubscriptionDestinationDTO =>
-          durable_subscriptions_by_id.get(destination.subscription_id).foreach { dest=>
-            dest.disconnect(producer)
-            for( l <- router_listeners) {
-              l.on_disconnect(dest, producer)
-            }
-          }
-        case _ => super.disconnect(destination, producer)
-      }
-    }
-
-    def destroy_destination(path:Path, destination: DestinationDTO, security: SecurityContext): Unit = {
+    def destroy_destination(path:Path, destination: TopicDestinationDTO, security: SecurityContext): Unit = {
       val matches = get_destination_matches(path)
       matches.foreach { dest =>
         for( l <- router_listeners) {
@@ -533,7 +482,7 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def can_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Option[String] = {
+    def can_create_destination(path:Path, destination:TopicDestinationDTO, security:SecurityContext):Option[String] = {
       if (security==null) {
         return None;
       }
@@ -553,7 +502,7 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[Topic,String] = {
+    def create_destination(path:Path, destination:TopicDestinationDTO, security:SecurityContext):Result[Topic,String] = {
       // We can't create a wild card destination.. only wild card subscriptions.
       assert( !PathParser.containsWildCards(path) )
 
@@ -579,181 +528,209 @@ class LocalRouter(val virtual_host:Virtu
 
     def bind_action(consumer:DeliveryConsumer):String = "receive"
 
-    def bind_dsub(queue:Queue) = {
-      assert_executing
-      val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
-      val path = queue.binding.destination
-      val wildcard = PathParser.containsWildCards(path)
-      var matches = get_destination_matches(path)
+  }
 
-      // We may need to create the topic...
-      if( !wildcard && matches.isEmpty ) {
-        create_destination(path, destination, null)
-        matches = get_destination_matches(path)
-      }
+  class DsubDomain extends Domain[Queue, DurableSubscriptionDestinationDTO] {
 
-      durable_subscriptions_by_path.put(path, queue)
-      durable_subscriptions_by_id.put(destination.subscription_id, queue)
+    override def auto_create_on_connect = false
 
-      matches.foreach( _.bind_durable_subscription(destination, queue) )
-    }
+    def dsub_config(subid:String) = DurableSubscriptionQueueBinding.dsub_config(virtual_host, subid)
 
-    def unbind_dsub(queue:Queue) = {
+    def bind(queue:Queue) = {
       assert_executing
-      val destination = queue.destination_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
+      val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
       val path = queue.binding.destination
-      var matches = get_destination_matches(path)
 
-      durable_subscriptions_by_path.remove(path, queue)
-      durable_subscriptions_by_id.remove(destination.subscription_id)
-
-      matches.foreach( _.unbind_durable_subscription(destination, queue) )
-    }
-
-    override def bind(path: Path, destination: DestinationDTO, consumer: DeliveryConsumer, security: SecurityContext) {
-      destination match {
-        case destination:DurableSubscriptionDestinationDTO =>
-
-          val key = destination.subscription_id
-          val queue = durable_subscriptions_by_id.get( key ) match {
-            case Some(queue) =>
-              // We may need to update the bindings...
-              if( !destination.path.isEmpty && queue.destination_dto != destination) {
-
-                val binding = BindingFactory.create(destination)
-                if( queue.tune_persistent && queue.store_id == -1 ) {
-                  val record = QueueRecord(queue.store_id, binding.binding_kind, binding.binding_data)
-                  // Update the bindings
-                  virtual_host.store.add_queue(record) { rc => Unit }
-                }
-
-                // and then rebind the queue in the router.
-                unbind_dsub(queue)
-                queue.binding = binding
-                bind_dsub(queue)
-
-                // Make sure the update is visible in the queue's thread context..
-                queue.dispatch_queue {
-                  queue.binding = binding
-                }
-              }
-              queue
-            case None =>
-              _create_queue(BindingFactory.create(destination))
-          }
-
-
-          // Typically durable subs are only consumed by one connection at a time. So collocate the
-          // queue onto the consumer's dispatch queue.
-          queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
-          queue.bind(destination, consumer)
-
-          for( l <- router_listeners) {
-            l.on_bind(queue, consumer, security)
-          }
+      add_destination(path, queue)
 
-        case _ =>
-          super.bind(path, destination, consumer, security)
+      import collection.JavaConversions._
+      destination.topics.foreach { topic =>
+        val path = destination_parser.decode_path(topic.path)
+        val wildcard = PathParser.containsWildCards(path)
+        var matches = local_topic_domain.get_destination_matches(path)
+
+        // We may need to create the topic...
+        if( !wildcard && matches.isEmpty ) {
+          local_topic_domain.create_destination(path, topic, null)
+          matches = local_topic_domain.get_destination_matches(path)
+        }
+        matches.foreach( _.bind_durable_subscription(destination, queue) )
       }
     }
 
-    override def unbind(destination: DestinationDTO, consumer: DeliveryConsumer, persistent: Boolean, security: SecurityContext) = {
-      destination match {
-        case destination:DurableSubscriptionDestinationDTO =>
-          durable_subscriptions_by_id.get(destination.subscription_id).foreach { queue =>
-            queue.unbind(consumer, persistent)
-            if( persistent ) {
-              _destroy_queue(queue, security)
-            }
-            for( l <- router_listeners) {
-              l.on_unbind(queue, consumer, persistent)
-            }
-          }
-        case _ =>
-          super.unbind( destination, consumer, persistent, security)
-      }
-    }
+    def unbind(queue:Queue) = {
+      assert_executing
+      val destination = queue.destination_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
 
-    override def can_bind_all(path: Path, destination: DestinationDTO, consumer: DeliveryConsumer, security: SecurityContext) = {
-      destination match {
-        case destination:DurableSubscriptionDestinationDTO =>
-          if( !path.parts.isEmpty ) {
-            super.can_bind_all(path, destination, consumer, security) orElse {
-              if( !durable_subscriptions_by_id.contains(destination.subscription_id) ) {
-                can_create_dsub(destination, security)
-              } else {
-                None
-              } orElse {
-                can_bind_dsub(destination, consumer, security)
-              }
-            }
-          } else {
-            // User is trying to directly receive from a durable subscription.. has to allready exist.
-            if( !durable_subscriptions_by_id.contains(destination.subscription_id) ) {
-              Some("Durable subscription does not exist")
-            } else {
-              can_bind_dsub(destination, consumer, security)
-            }
-          }
-        case _ =>
-          super.can_bind_all(path, destination, consumer, security)
+      import collection.JavaConversions._
+      destination.topics.foreach { topic =>
+        val path = destination_parser.decode_path(topic.path)
+        var matches = local_topic_domain.get_destination_matches(path)
+        matches.foreach( _.unbind_durable_subscription(destination, queue) )
       }
-    }
 
+      val path = destination_parser.decode_path(destination.path)
+      remove_destination(path, queue)
+    }
 
-    override def can_connect_all(path: Path, destination: DestinationDTO, producer: BindableDeliveryProducer, security: SecurityContext) = {
-      destination match {
-        case destination:DurableSubscriptionDestinationDTO =>
-            // User is trying to directly send to a durable subscription.. has to allready exist.
-          if( !durable_subscriptions_by_id.contains(destination.subscription_id) ) {
-            Some("Durable subscription does not exist")
-          } else {
-            can_connect_dsub(destination, security)
-          }
-        case _ =>
-          super.can_connect_all(path, destination, producer, security)
+    def destroy_destination(path:Path, destination: DurableSubscriptionDestinationDTO, security: SecurityContext): Unit = {
+      destination_by_id.get(destination.subscription_id).foreach { sub=>
+        for( l <- router_listeners) {
+          l.on_destroy(sub, security)
+        }
+        _destroy_queue(sub)
       }
     }
 
-
     def get_dsub_secured_resource(config: DurableSubscriptionDestinationDTO):SecuredResource = {
-      durable_subscriptions_by_id.get(config.subscription_id).getOrElse(new SecuredResource() {
+      destination_by_id.get(config.subscription_id).getOrElse(new SecuredResource() {
         def resource_kind = SecuredResource.DurableSubKind
         def id = config.subscription_id
       })
     }
 
-    def can_create_dsub(config:DurableSubscriptionDestinationDTO, security:SecurityContext) = {
-      val resource = get_dsub_secured_resource(config)
-      if( !authorizer.can(security, "create", resource) ) {
-        Some("Not authorized to create the durable subscription '%s'. Principals=%s".format(resource.id, security.principal_dump))
+    def can_create_destination(path:Path, destination:DurableSubscriptionDestinationDTO, security:SecurityContext):Option[String] = {
+      // It's it's directly addressed durable sub, then it must already exist.
+      if( destination.is_direct && !destination_by_id.contains(destination.subscription_id)) {
+        Some("Durable subscription does not exist")
       } else {
-        None
+        val resource = get_dsub_secured_resource(destination)
+        if( !authorizer.can(security, "create", resource)) {
+          Some("Not authorized to create the dsub '%s'. Principals=%s".format(resource.id, security.principal_dump))
+        } else {
+          None
+        }
       }
     }
 
-    def can_connect_dsub(config:DurableSubscriptionDestinationDTO, security:SecurityContext):Option[String] = {
-      val resource = get_dsub_secured_resource(config)
-      if( !authorizer.can(security, "send", resource) ) {
-        Some("Not authorized to send to durable subscription '%s'. Principals=%s".format(resource.id, security.principal_dump))
-      } else {
-        None
+    def create_destination(path:Path, destination:DurableSubscriptionDestinationDTO, security:SecurityContext):Result[Queue,String] = {
+      can_create_destination(path, destination, security).map(Failure(_)).getOrElse {
+        val dsub = _create_queue(BindingFactory.create(destination))
+        add_destination(path, dsub)
+        for( l <- router_listeners) {
+          l.on_create(dsub, security)
+        }
+        Success(dsub)
       }
     }
 
-    def can_bind_dsub(config:DurableSubscriptionDestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Option[String] = {
-      val resource = get_dsub_secured_resource(config)
-      val action = if ( consumer.browser ) "receive" else "consume"
-      if( !authorizer.can(security, action, resource) ) {
-        Some("Not authorized to %s from durable subscription '%s'. Principals=%s".format(action, resource.id, security.principal_dump))
-      } else {
-        None
+    def bind_action(consumer:DeliveryConsumer):String = if(consumer.browser) {
+      "receive"
+    } else {
+      "consume"
+    }
+
+
+//    override def connect(path:Path, destination:DurableSubscriptionDestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
+//      // Connects a producer directly to a durable subscription..
+//      durable_subscriptions_by_id.get(destination.subscription_id).foreach { dest=>
+//        dest.connect(destination, producer)
+//        for( l <- router_listeners) {
+//          l.on_connect(dest, producer, security)
+//        }
+//      }
+//    }
+
+//    override def disconnect(destination:DurableSubscriptionDestinationDTO, producer:BindableDeliveryProducer) = {
+//      durable_subscriptions_by_id.get(destination.subscription_id).foreach { dest=>
+//        dest.disconnect(producer)
+//        for( l <- router_listeners) {
+//          l.on_disconnect(dest, producer)
+//        }
+//      }
+//    }
+
+    override def bind(path: Path, destination: DurableSubscriptionDestinationDTO, consumer: DeliveryConsumer, security: SecurityContext) {
+      destination_by_id.get(destination.subscription_id).foreach { queue =>
+        // We may need to update the bindings...
+        if( !destination.is_direct() && queue.destination_dto != destination && authorizer.can(security, "consume", queue)) {
+
+          val binding = BindingFactory.create(destination)
+          if( queue.tune_persistent && queue.store_id == -1 ) {
+            val record = QueueRecord(queue.store_id, binding.binding_kind, binding.binding_data)
+            // Update the bindings
+            virtual_host.store.add_queue(record) { rc => Unit }
+          }
+
+          // and then rebind the queue in the router.
+          unbind(queue)
+          queue.binding = binding
+          bind(queue)
+
+          // Make sure the update is visible in the queue's thread context..
+          queue.dispatch_queue {
+            queue.binding = binding
+          }
+        }
+
+        if( authorizer.can(security, bind_action(consumer), queue) ) {
+          queue.bind(destination, consumer)
+          for( l <- router_listeners) {
+            l.on_bind(queue, consumer, security)
+          }
+        }
+      }
+    }
+
+    override def unbind(destination: DurableSubscriptionDestinationDTO, consumer: DeliveryConsumer, persistent: Boolean, security: SecurityContext) = {
+      destination_by_id.get(destination.subscription_id).foreach { queue =>
+        queue.unbind(consumer, persistent)
+        if( persistent ) {
+          _destroy_queue(queue, security)
+        }
+        for( l <- router_listeners) {
+          l.on_unbind(queue, consumer, persistent)
+        }
       }
     }
+
+//    override def can_connect_all(path: Path, destination: DurableSubscriptionDestinationDTO, producer: BindableDeliveryProducer, security: SecurityContext) = {
+//      // User is trying to directly send to a durable subscription.. has to already exist.
+//      if( !durable_subscriptions_by_id.contains(destination.subscription_id) ) {
+//        Some("Durable subscription does not exist")
+//      } else {
+//        can_connect_dsub(destination, security)
+//      }
+//    }
+//
+//    def can_connect_dsub(config:DurableSubscriptionDestinationDTO, security:SecurityContext):Option[String] = {
+//      val resource = get_dsub_secured_resource(config)
+//      if( !authorizer.can(security, "send", resource) ) {
+//        Some("Not authorized to send to durable subscription '%s'. Principals=%s".format(resource.id, security.principal_dump))
+//      } else {
+//        None
+//      }
+//    }
+
+//    override def can_bind_all(path: Path, destination: DurableSubscriptionDestinationDTO, consumer: DeliveryConsumer, security: SecurityContext) = {
+//      super.can_bind_all(path, destination, consumer, security) orElse {
+//        if( !destination_by_id.contains(destination.subscription_id) ) {
+//          val resource = get_dsub_secured_resource(config)
+//          if( !authorizer.can(security, "create", resource) ) {
+//            Some("Not authorized to create the durable subscription '%s'. Principals=%s".format(resource.id, security.principal_dump))
+//          } else {
+//            None
+//          }
+//        } else {
+//          None
+//        } orElse {
+//          can_bind_dsub(destination, consumer, security)
+//        }
+//      }
+//    }
+//
+//    def can_bind_dsub(config:DurableSubscriptionDestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Option[String] = {
+//      val resource = get_dsub_secured_resource(config)
+//      val action = if ( consumer.browser ) "receive" else "consume"
+//      if( !authorizer.can(security, action, resource) ) {
+//        Some("Not authorized to %s from durable subscription '%s'. Principals=%s".format(action, resource.id, security.principal_dump))
+//      } else {
+//        None
+//      }
+//    }
   }
 
-  val queue_domain = new QueueDomain
-  class QueueDomain extends Domain[Queue] {
+  class QueueDomain extends Domain[Queue, QueueDestinationDTO] {
 
     def bind(queue:Queue) = {
       val path = queue.binding.destination
@@ -764,7 +741,7 @@ class LocalRouter(val virtual_host:Virtu
       if( queue.config.unified.getOrElse(false) ) {
         // hook up the queue to be a subscriber of the topic.
 
-        val topic = topic_domain.get_or_create_destination(path, new TopicDestinationDTO(queue.binding.binding_dto.path), null).success
+        val topic = local_topic_domain.get_or_create_destination(path, new TopicDestinationDTO(queue.binding.binding_dto.path), null).success
         topic.bind(null, queue)
       }
     }
@@ -776,12 +753,12 @@ class LocalRouter(val virtual_host:Virtu
       import OptionSupport._
       if( queue.config.unified.getOrElse(false) ) {
         // unhook the queue from the topic
-        val topic = topic_domain.get_or_create_destination(path, new TopicDestinationDTO(queue.binding.binding_dto.path), null).success
+        val topic = local_topic_domain.get_or_create_destination(path, new TopicDestinationDTO(queue.binding.binding_dto.path), null).success
         topic.unbind(queue, false)
       }
     }
 
-    def destroy_destination(path:Path, destination: DestinationDTO, security: SecurityContext): Unit = {
+    def destroy_destination(path:Path, destination: QueueDestinationDTO, security: SecurityContext): Unit = {
       val matches = get_destination_matches(path)
       matches.foreach { queue =>
         for( l <- router_listeners) {
@@ -791,7 +768,7 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def can_create_destination(path: Path, destination:DestinationDTO, security: SecurityContext):Option[String] = {
+    def can_create_destination(path: Path, destination:QueueDestinationDTO, security: SecurityContext):Option[String] = {
       val resource = new SecuredResource() {
         def resource_kind = QueueKind
         def id = destination_parser.encode_path(path)
@@ -803,7 +780,7 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def create_destination(path: Path, destination:DestinationDTO, security: SecurityContext) = {
+    def create_destination(path: Path, destination:QueueDestinationDTO, security: SecurityContext) = {
       val dto = new QueueDestinationDTO
       dto.path.addAll(destination.path)
       val binding = QueueDomainQueueBinding.create(dto)
@@ -831,6 +808,9 @@ class LocalRouter(val virtual_host:Virtu
 
   }
 
+
+
+
   /////////////////////////////////////////////////////////////////////////////
   //
   // life cycle methods.
@@ -839,7 +819,7 @@ class LocalRouter(val virtual_host:Virtu
 
   protected def create_configure_destinations {
     import collection.JavaConversions._
-    def create_configured_dests(list: ArrayList[_ <: StringIdDTO], d: Domain[_], f: (Array[String]) => DestinationDTO) = {
+    def create_configured_dests[DTO <:DestinationDTO](list: ArrayList[_ <: StringIdDTO], d: Domain[_, DTO], f: (Array[String]) => DTO) = {
       list.foreach { dto =>
         if (dto.id != null) {
           try {
@@ -854,21 +834,24 @@ class LocalRouter(val virtual_host:Virtu
         }
       }
     }
-    create_configured_dests(virtual_host.config.queues, queue_domain, (parts) => new QueueDestinationDTO(parts))
-    create_configured_dests(virtual_host.config.topics, topic_domain, (parts) => new TopicDestinationDTO(parts))
+    create_configured_dests(virtual_host.config.queues, local_queue_domain, (parts) => new QueueDestinationDTO(parts))
+    create_configured_dests(virtual_host.config.topics, local_topic_domain, (parts) => new TopicDestinationDTO(parts))
 
     virtual_host.config.dsubs.foreach { dto =>
-      if (dto.id != null && dto.topic!=null ) {
+      if (dto.id != null && ( dto.topic!=null || !dto.topics.isEmpty) ) {
 
         // We will create the durable sub if it does not exist yet..
-        if( !topic_domain.durable_subscriptions_by_id.contains(dto.id) ) {
-          val destination = new DurableSubscriptionDestinationDTO()
-          destination.subscription_id = dto.id
-          destination.path = Arrays.asList(destination_parser.parts(dto.topic) : _ *)
+        if( !local_dsub_domain.destination_by_id.contains(dto.id) ) {
+          val destination = new DurableSubscriptionDestinationDTO(dto.id)
           destination.selector = dto.selector
+          if( dto.topic!=null ) {
+            destination.topics.add(new TopicDestinationDTO(destination_parser.parts(dto.topic)))
+          }
+          dto.topics.foreach { n =>
+            destination.topics.add(new TopicDestinationDTO(destination_parser.parts(n)))
+          }
           _create_queue(BindingFactory.create(destination))
         }
-
       }
     }
   }
@@ -933,7 +916,7 @@ class LocalRouter(val virtual_host:Virtu
     val min_create_time = virtual_host.broker.now - 1000;
 
     // Auto delete temp destinations..
-    queue_domain.destinations.filter(x=> is_temp(x.destination_dto)).foreach { queue=>
+    local_queue_domain.destinations.filter(x=> is_temp(x.destination_dto)).foreach { queue=>
       val owner = temp_owner(queue.destination_dto).get
       if( owner._1==virtual_host.broker.id // are we the broker that owns the temp destination?
           && !active_connections.contains(owner._2) // Has the connection not around?
@@ -942,13 +925,13 @@ class LocalRouter(val virtual_host:Virtu
         _destroy_queue(queue)
       }
     }
-    topic_domain.destinations.filter(x=> is_temp(x.destination_dto)).foreach { topic =>
+    local_topic_domain.destinations.filter(x=> is_temp(x.destination_dto)).foreach { topic =>
       val owner = temp_owner(topic.destination_dto).get
       if( owner._1==virtual_host.broker.id // are we the broker that owns the temp destination?
           && !active_connections.contains(owner._2) // Has the connection not around?
           && topic.created_at < min_create_time // It's not a recently created destination?
       ) {
-        topic_domain.destroy_destination(topic.path, topic.destination_dto, null)
+        local_topic_domain.destroy_destination(topic.path, topic.destination_dto, null)
       }
     }
   }
@@ -982,7 +965,7 @@ class LocalRouter(val virtual_host:Virtu
 
 
     // For the topics, just collocate the producers onto the first consumer's thread.
-    topic_domain.destinations.foreach { node =>
+    local_topic_domain.destinations.foreach { node =>
 
       node.consumers.keys.headOption.foreach{ consumer =>
         node.producers.keys.foreach { r=>
@@ -992,7 +975,7 @@ class LocalRouter(val virtual_host:Virtu
     }
 
 
-    queue_domain.destinations.foreach { queue=>
+    local_queue_domain.destinations.foreach { queue=>
       queue.dispatch_queue {
 
         // Collocate the queue's with the first consumer
@@ -1017,35 +1000,53 @@ class LocalRouter(val virtual_host:Virtu
   // destination/domain management methods.
   //
   /////////////////////////////////////////////////////////////////////////////
+  final val local_queue_domain = new QueueDomain
+  final val local_topic_domain = new TopicDomain
+  final val local_dsub_domain = new DsubDomain
+
+  def queue_domain: Domain[_ <: DomainDestination, QueueDestinationDTO] = local_queue_domain
+  def topic_domain:Domain[_ <: DomainDestination, TopicDestinationDTO] = local_topic_domain
+  def dsub_domain:Domain[_ <: DomainDestination, DurableSubscriptionDestinationDTO] = local_dsub_domain
 
-  def domain(destination: DestinationDTO):Domain[_ <: DomainDestination] = destination match {
-    case x:TopicDestinationDTO => topic_domain
-    case x:QueueDestinationDTO => queue_domain
-    case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
-  }
-
-  def bind(destination: Array[DestinationDTO], consumer: DeliveryConsumer, security: SecurityContext) = {
+  def bind(destinations: Array[DestinationDTO], consumer: DeliveryConsumer, security: SecurityContext) = {
     consumer.retain
     dispatch_queue ! {
       var rc:Option[String] = None
-      val paths = try {
-        destination.map(x => (destination_parser.decode_path(x.path), x))
-      } catch {
-        case x:PathException =>
-          rc = Some(x.getMessage)
-          null
-      }
       if(rc.isEmpty && !virtual_host.service_state.is_started) {
         rc = Some("virtual host stopped.")
       } else if (rc.isEmpty) {
-        val failures = paths.flatMap(x=> domain(x._2).can_bind_all(x._1, x._2, consumer, security) )
-        rc = if( !failures.isEmpty ) {
-          Some(failures.mkString("; "))
-        } else {
-          paths.foreach { x=>
-            domain(x._2).bind(x._1, x._2, consumer, security)
+        try {
+          val actions = destinations.map { destination =>
+            destination match {
+              case destination:TopicDestinationDTO =>
+                val path = destination_parser.decode_path(destination.path)
+                val allowed = topic_domain.can_bind_all(path, destination, consumer, security)
+                def perform() = topic_domain.bind(path, destination, consumer, security)
+                (allowed, perform _)
+              case destination:QueueDestinationDTO =>
+                val path = destination_parser.decode_path(destination.path)
+                val allowed = queue_domain.can_bind_all(path, destination, consumer, security)
+                def perform() = queue_domain.bind(path, destination, consumer, security)
+                (allowed, perform _)
+              case destination:DurableSubscriptionDestinationDTO =>
+                val path = Path(destination.subscription_id())
+                val allowed = dsub_domain.can_bind_all(path, destination, consumer, security)
+                def perform() = dsub_domain.bind(path, destination, consumer, security)
+                (allowed, perform _)
+              case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+            }
           }
-          None
+
+          val failures = actions.flatMap(_._1)
+          rc = if( !failures.isEmpty ) {
+            Some(failures.mkString("; "))
+          } else {
+            actions.foreach(_._2())
+            None
+          }
+        } catch {
+          case x:PathException =>
+            rc = Some(x.getMessage)
         }
       }
       consumer.release
@@ -1057,7 +1058,15 @@ class LocalRouter(val virtual_host:Virtu
     consumer.retain
     dispatch_queue {
       destinations.foreach { destination=>
-        domain(destination).unbind(destination, consumer, persistent, security)
+        destination match {
+          case destination:TopicDestinationDTO =>
+            topic_domain.unbind(destination, consumer, persistent, security)
+          case destination:QueueDestinationDTO =>
+            queue_domain.unbind(destination, consumer, persistent, security)
+          case destination:DurableSubscriptionDestinationDTO =>
+            dsub_domain.unbind(destination, consumer, persistent, security)
+          case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+        }
       }
       consumer.release
     }
@@ -1067,31 +1076,42 @@ class LocalRouter(val virtual_host:Virtu
     producer.retain
     dispatch_queue ! {
       var rc:Option[String] = None
-      val paths = try {
-        destinations.map(x=> (destination_parser.decode_path(x.path), x) )
-      } catch {
-        case x:PathException =>
-          rc = Some(x.getMessage)
-          null
-      }
       if(rc.isEmpty && !virtual_host.service_state.is_started) {
         rc = Some("virtual host stopped.")
       } else if(rc.isEmpty) {
-        val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
-        val failures = paths.flatMap(x=> domain(x._2).can_connect_all(x._1, x._2, producer, security) )
+
+        val actions = destinations.map { destination =>
+          destination match {
+            case destination:TopicDestinationDTO =>
+              val path = destination_parser.decode_path(destination.path)
+              val allowed = topic_domain.can_connect_all(path, destination, producer, security)
+              def perform() = topic_domain.connect(path, destination, producer, security)
+              (allowed, perform _)
+            case destination:QueueDestinationDTO =>
+              val path = destination_parser.decode_path(destination.path)
+              val allowed = queue_domain.can_connect_all(path, destination, producer, security)
+              def perform() = queue_domain.connect(path, destination, producer, security)
+              (allowed, perform _)
+            case destination:DurableSubscriptionDestinationDTO =>
+              val path = Path(destination.subscription_id())
+              val allowed = dsub_domain.can_connect_all(path, destination, producer, security)
+              def perform() = dsub_domain.connect(path, destination, producer, security)
+              (allowed, perform _)
+            case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+          }
+        }
+
+        val failures = actions.flatMap(_._1)
         rc = if( !failures.isEmpty ) {
           Some(failures.mkString("; "))
         } else {
-          paths.foreach { x=>
-            domain(x._2).connect(x._1, x._2, producer, security)
-          }
+          actions.foreach(_._2())
           producer.connected()
+          producer.retain()
           None
         }
       }
-      if(rc.isDefined) {
-        producer.release
-      }
+      producer.release
       rc
     }
   }
@@ -1099,7 +1119,15 @@ class LocalRouter(val virtual_host:Virtu
   def disconnect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer) = {
     dispatch_queue {
       destinations.foreach { destination=>
-        domain(destination).disconnect(destination, producer)
+        destination match {
+          case destination:TopicDestinationDTO =>
+            topic_domain.disconnect(destination, producer)
+          case destination:QueueDestinationDTO =>
+            queue_domain.disconnect(destination, producer)
+          case destination:DurableSubscriptionDestinationDTO =>
+            dsub_domain.disconnect(destination, producer)
+          case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+        }
       }
       producer.disconnected()
       producer.release()
@@ -1110,14 +1138,33 @@ class LocalRouter(val virtual_host:Virtu
     if(!virtual_host.service_state.is_started) {
       Some("virtual host stopped.")
     } else {
-      val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
-      val failures = paths.flatMap(x=> domain(x._2).can_create_destination(x._1, x._2, security) )
+
+      val actions = destinations.map { destination =>
+        destination match {
+          case destination:TopicDestinationDTO =>
+            val path = destination_parser.decode_path(destination.path)
+            val allowed = topic_domain.can_create_destination(path, destination, security)
+            def perform() = topic_domain.create_destination(path, destination, security)
+            (allowed, perform _)
+          case destination:QueueDestinationDTO =>
+            val path = destination_parser.decode_path(destination.path)
+            val allowed = queue_domain.can_create_destination(path, destination, security)
+            def perform() = queue_domain.create_destination(path, destination, security)
+            (allowed, perform _)
+          case destination:DurableSubscriptionDestinationDTO =>
+            val path = Path(destination.subscription_id())
+            val allowed = dsub_domain.can_create_destination(path, destination, security)
+            def perform() = dsub_domain.create_destination(path, destination, security)
+            (allowed, perform _)
+          case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+        }
+      }
+
+      val failures = actions.flatMap(_._1)
       if( !failures.isEmpty ) {
         Some(failures.mkString("; "))
       } else {
-        paths.foreach { x=>
-          domain(x._2).create_destination(x._1, x._2, security)
-        }
+        actions.foreach(_._2())
         None
       }
     }
@@ -1127,16 +1174,36 @@ class LocalRouter(val virtual_host:Virtu
     if(!virtual_host.service_state.is_started) {
       Some("virtual host stopped.")
     } else {
-      val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
-      val failures = paths.flatMap(x=> domain(x._2).can_destroy_destination(x._1, x._2, security) )
+
+      val actions = destinations.map { destination =>
+        destination match {
+          case destination:TopicDestinationDTO =>
+            val path = destination_parser.decode_path(destination.path)
+            val allowed = topic_domain.can_destroy_destination(path, destination, security)
+            def perform() = topic_domain.destroy_destination(path, destination, security)
+            (allowed, perform _)
+          case destination:QueueDestinationDTO =>
+            val path = destination_parser.decode_path(destination.path)
+            val allowed = queue_domain.can_destroy_destination(path, destination, security)
+            def perform() = queue_domain.destroy_destination(path, destination, security)
+            (allowed, perform _)
+          case destination:DurableSubscriptionDestinationDTO =>
+            val path = Path(destination.subscription_id())
+            val allowed = dsub_domain.can_destroy_destination(path, destination, security)
+            def perform() = dsub_domain.destroy_destination(path, destination, security)
+            (allowed, perform _)
+          case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+        }
+      }
+
+      val failures = actions.flatMap(_._1)
       if( !failures.isEmpty ) {
         Some(failures.mkString("; "))
       } else {
-        paths.foreach { x=>
-          domain(x._2).destroy_destination(x._1, x._2, security)
-        }
+        actions.foreach(_._2())
         None
       }
+
     }
   }
 
@@ -1148,9 +1215,17 @@ class LocalRouter(val virtual_host:Virtu
   /**
    * Returns the previously created queue if it already existed.
    */
-  def _get_or_create_destination(dto: DestinationDTO, security:SecurityContext): Result[DomainDestination, String] = {
-    val path = destination_parser.decode_path(dto.path)
-    domain(dto).get_or_create_destination(path, dto, security)
+  def _get_or_create_destination(destination: DestinationDTO, security:SecurityContext): Result[DomainDestination, String] = {
+    val path = destination_parser.decode_path(destination.path)
+    destination match {
+      case destination:TopicDestinationDTO =>
+        topic_domain.get_or_create_destination(path, destination, security)
+      case destination:QueueDestinationDTO =>
+        queue_domain.get_or_create_destination(path, destination, security)
+      case destination:DurableSubscriptionDestinationDTO =>
+        dsub_domain.get_or_create_destination(path, destination, security)
+      case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+    }
   }
 
 
@@ -1255,8 +1330,9 @@ class LocalRouter(val virtual_host:Virtu
 
   def apply_update(on_completed:Runnable) = {
     val tracker = new LoggingTracker("domain update", virtual_host.broker.console_log)
-    topic_domain.apply_update(tracker)
-    queue_domain.apply_update(tracker)
+    local_topic_domain.apply_update(tracker)
+    local_queue_domain.apply_update(tracker)
+    local_dsub_domain.apply_update(tracker)
     // we may need to create some more destinations.
     create_configure_destinations
     tracker.callback(on_completed)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1233181&r1=1233180&r2=1233181&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Jan 19 05:16:15 2012
@@ -29,9 +29,9 @@ import OptionSupport._
 import java.util.concurrent.atomic.{AtomicReference, AtomicInteger}
 import org.fusesource.hawtbuf.Buffer
 import java.lang.UnsupportedOperationException
-import org.apache.activemq.apollo.dto._
 import security.SecuredResource._
 import security.{SecuredResource, SecurityContext}
+import org.apache.activemq.apollo.dto._
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -952,7 +952,8 @@ class Queue(val router: LocalRouter, val
     import OptionSupport._
     if( config.unified.getOrElse(false) ) {
       // this is a unified queue.. actually have the produce bind to the topic, instead of the
-      val topic = router.topic_domain.get_or_create_destination(binding.destination, binding.binding_dto, null).success
+      val topic_dto = new TopicDestinationDTO(binding.binding_dto.path)
+      val topic = router.local_topic_domain.get_or_create_destination(binding.destination, topic_dto, null).success
       topic.connect(destination, producer)
     } else {
       dispatch_queue {
@@ -967,7 +968,8 @@ class Queue(val router: LocalRouter, val
   def disconnect (producer:BindableDeliveryProducer) = {
     import OptionSupport._
     if( config.unified.getOrElse(false) ) {
-      val topic = router.topic_domain.get_or_create_destination(binding.destination, binding.binding_dto, null).success
+      val topic_dto = new TopicDestinationDTO(binding.binding_dto.path)
+      val topic = router.local_topic_domain.get_or_create_destination(binding.destination, topic_dto, null).success
       topic.disconnect(producer)
     } else {
       dispatch_queue {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1233181&r1=1233180&r2=1233181&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Thu Jan 19 05:16:15 2012
@@ -302,7 +302,7 @@ class Topic(val router:LocalRouter, val 
         if( auto_delete_after!=0 ) {
           dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
             if( previously_idle_at == idled_at ) {
-              router.topic_domain.remove_destination(path, this)
+              router.local_topic_domain.remove_destination(path, this)
               DestinationMetricsSupport.add_destination_metrics(router.virtual_host.dead_topic_metrics, topic_metrics)
             }
           }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1233181&r1=1233180&r2=1233181&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Thu Jan 19 05:16:15 2012
@@ -272,7 +272,7 @@ class VirtualHost(val broker: Broker, va
   }
 
   def get_topic_metrics:FutureResult[AggregateDestMetricsDTO] = {
-    val topics:Iterable[Topic] = local_router.topic_domain.destinations
+    val topics:Iterable[Topic] = local_router.local_topic_domain.destinations
     val metrics: Future[Iterable[Result[DestMetricsDTO, Throwable]]] = Future.all {
       topics.map(_.status.map(_.map_success(_.metrics)))
     }
@@ -284,7 +284,7 @@ class VirtualHost(val broker: Broker, va
   }
   
   def get_queue_metrics:FutureResult[AggregateDestMetricsDTO] = {
-    val queues:Iterable[Queue] = local_router.queue_domain.destinations
+    val queues:Iterable[Queue] = local_router.local_queue_domain.destinations
     val metrics = sync_all (queues) { queue =>
       queue.get_queue_metrics
     }
@@ -296,7 +296,7 @@ class VirtualHost(val broker: Broker, va
   }
   
   def get_dsub_metrics:FutureResult[AggregateDestMetricsDTO] = sync(this) {
-    val dsubs:Iterable[Queue] = local_router.topic_domain.durable_subscriptions_by_id.values
+    val dsubs:Iterable[Queue] = local_router.local_dsub_domain.destination_by_id.values
     val metrics = sync_all (dsubs) { dsub =>
       dsub.get_queue_metrics
     }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java?rev=1233181&r1=1233180&r2=1233181&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java Thu Jan 19 05:16:15 2012
@@ -43,6 +43,12 @@ public class DurableSubscriptionDTO exte
     public String topic;
 
     /**
+     * Additional topic names.
+     */
+    @XmlElement(name="topic")
+    public ArrayList<String> topics = new ArrayList<String>();
+
+    /**
      * An optional selector that the durable subscription will be created
      * with when it's first eagerly created.
      */
@@ -60,6 +66,7 @@ public class DurableSubscriptionDTO exte
         if (id_regex != null ? !id_regex.equals(that.id_regex) : that.id_regex != null) return false;
         if (selector != null ? !selector.equals(that.selector) : that.selector != null) return false;
         if (topic != null ? !topic.equals(that.topic) : that.topic != null) return false;
+        if (topics != null ? !topics.equals(that.topics) : that.topics != null) return false;
 
         return true;
     }
@@ -69,6 +76,7 @@ public class DurableSubscriptionDTO exte
         int result = super.hashCode();
         result = 31 * result + (id_regex != null ? id_regex.hashCode() : 0);
         result = 31 * result + (topic != null ? topic.hashCode() : 0);
+        result = 31 * result + (topics != null ? topics.hashCode() : 0);
         result = 31 * result + (selector != null ? selector.hashCode() : 0);
         return result;
     }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java?rev=1233181&r1=1233180&r2=1233181&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java Thu Jan 19 05:16:15 2012
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlRootElement;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
 
 /**
  * <p>
@@ -29,33 +29,54 @@ import javax.xml.bind.annotation.XmlRoot
  */
 @XmlRootElement(name = "dsub_destination")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class DurableSubscriptionDestinationDTO extends TopicDestinationDTO {
+public class DurableSubscriptionDestinationDTO extends DestinationDTO {
 
     @XmlAttribute
     public String selector;
 
-    @XmlAttribute(name="subscription_id")
-    public String subscription_id;
+    /**
+     * Topics that the durable subscription is attached to
+     */
+    @XmlElement(name="topic")
+    public ArrayList<TopicDestinationDTO> topics = new ArrayList<TopicDestinationDTO>();
 
     public DurableSubscriptionDestinationDTO() {
     }
 
     public DurableSubscriptionDestinationDTO(String subscription_id) {
-        super();
-        this.subscription_id = subscription_id;
+        super(new String[]{subscription_id});
+    }
+
+    @JsonIgnore
+    public String subscription_id() {
+        return path.get(0);
+    }
+
+    /**
+     * Marks the destination as addressing a durable subscription directly.
+     * This will not create or modify an existing subscription.
+     */
+    @JsonIgnore
+    public DurableSubscriptionDestinationDTO direct() {
+        topics = null;
+        return this;
+    }
+
+    @JsonIgnore
+    public boolean is_direct() {
+        return topics == null;
     }
 
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (!(o instanceof DurableSubscriptionDestinationDTO)) return false;
         if (!super.equals(o)) return false;
 
         DurableSubscriptionDestinationDTO that = (DurableSubscriptionDestinationDTO) o;
 
         if (selector != null ? !selector.equals(that.selector) : that.selector != null) return false;
-        if (subscription_id != null ? !subscription_id.equals(that.subscription_id) : that.subscription_id != null)
-            return false;
+        if (topics != null ? !topics.equals(that.topics) : that.topics != null) return false;
 
         return true;
     }
@@ -64,16 +85,16 @@ public class DurableSubscriptionDestinat
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + (selector != null ? selector.hashCode() : 0);
-        result = 31 * result + (subscription_id != null ? subscription_id.hashCode() : 0);
+        result = 31 * result + (topics != null ? topics.hashCode() : 0);
         return result;
     }
 
     @Override
     public String toString() {
         return "DurableSubscriptionDestinationDTO{" +
-                "path=" + path +
+                "id='" + subscription_id() + '\'' +
                 ", selector='" + selector + '\'' +
-                ", subscription_id='" + subscription_id + '\'' +
+                ", topics=" + topics +
                 '}';
     }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1233181&r1=1233180&r2=1233181&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Jan 19 05:16:15 2012
@@ -830,22 +830,23 @@ class OpenwireProtocolHandler extends Pr
       }
 
       if( is_durable_sub ) {
-        destination = destination.map { _ match {
+
+        var subscription_id = ""
+        if( parent.parent.info.getClientId != null ) {
+          subscription_id += parent.parent.info.getClientId + ":"
+        }
+        subscription_id += info.getSubscriptionName
+
+        val rc = new DurableSubscriptionDestinationDTO(subscription_id)
+        rc.selector = info.getSelector
+
+        destination.foreach { _ match {
           case x:TopicDestinationDTO=>
-            val rc = new DurableSubscriptionDestinationDTO()
-            rc.path = x.path
-            if( is_durable_sub ) {
-              rc.subscription_id = ""
-              if( parent.parent.info.getClientId != null ) {
-                rc.subscription_id += parent.parent.info.getClientId + ":"
-              }
-              rc.subscription_id += info.getSubscriptionName
-            }
-            rc.selector = info.getSelector
-            rc
+            rc.topics.add(new TopicDestinationDTO(x.path))
           case _ => die("A durable subscription can only be used on a topic destination")
           }
         }
+        destination = Array(rc)
       }
 
       reset {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1233181&r1=1233180&r2=1233181&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Jan 19 05:16:15 2012
@@ -1190,18 +1190,22 @@ class StompProtocolHandler extends Proto
     }
 
     if( persistent ) {
-      destination = destination.map { _ match {
-        case x:DurableSubscriptionDestinationDTO=>
-          x
-        case x:TopicDestinationDTO=>
-          val rc = new DurableSubscriptionDestinationDTO()
-          rc.path = x.path
-          rc.subscription_id = decode_header(id)
-          rc.selector = if (selector == null) null else selector._1
-          rc
+      
+      val dsubs = ListBuffer[DurableSubscriptionDestinationDTO]()
+      val topics = ListBuffer[TopicDestinationDTO]()
+      destination.foreach { _ match {
+        case x:DurableSubscriptionDestinationDTO=> dsubs += x
+        case x:TopicDestinationDTO=> topics += x
         case _ => die("A persistent subscription can only be used on a topic destination")
-        }
+      } }
+
+      if( !topics.isEmpty ) {
+        val dsub = new DurableSubscriptionDestinationDTO(decode_header(id))
+        dsub.selector = if (selector == null) null else selector._1
+        topics.foreach( dsub.topics.add(_) )
+        dsubs += dsub
       }
+      destination = dsubs.toArray
     }
 
     val from_seq = from_seq_opt.map(_.toString.toLong).getOrElse(0L)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1233181&r1=1233180&r2=1233181&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Thu Jan 19 05:16:15 2012
@@ -98,7 +98,7 @@ class StompTestSupport extends FunSuiteS
     val host = broker.virtual_hosts.get(ascii("default")).get
     host.dispatch_queue.future {
       val router = host.router.asInstanceOf[LocalRouter]
-      router.queue_domain.destination_by_id.get(name).isDefined
+      router.local_queue_domain.destination_by_id.get(name).isDefined
     }.await()
   }
 
@@ -106,7 +106,7 @@ class StompTestSupport extends FunSuiteS
     val host = broker.virtual_hosts.get(ascii("default")).get
     host.dispatch_queue.future {
       val router = host.router.asInstanceOf[LocalRouter]
-      router.topic_domain.destination_by_id.get(name).isDefined
+      router.local_topic_domain.destination_by_id.get(name).isDefined
     }.await()
   }
 
@@ -1101,7 +1101,7 @@ class DurableSubscriptionOnLevelDBTest e
 
     val frame = client.receive()
     frame should startWith("ERROR\n")
-    frame should include("message:Durable subscription does not exist")
+    frame should include("message:The destination does not exist")
   }
 
   test("Direct subscribe to a non-existant a durable sub fails") {

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1233181&r1=1233180&r2=1233181&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Thu Jan 19 05:16:15 2012
@@ -363,13 +363,13 @@ case class BrokerResource() extends Reso
 
     val router: LocalRouter = host
 
-    router.queue_domain.destinations.foreach { node =>
+    router.local_queue_domain.destinations.foreach { node =>
       result.queues.add(node.id)
     }
-    router.topic_domain.destinations.foreach { node =>
+    router.local_topic_domain.destinations.foreach { node =>
       result.topics.add(node.id)
     }
-    router.topic_domain.durable_subscriptions_by_id.keys.foreach { id =>
+    router.local_dsub_domain.destination_by_id.keys.foreach { id =>
       result.dsubs.add(id)
     }
 
@@ -514,7 +514,7 @@ case class BrokerResource() extends Reso
     with_virtual_host(id) { host =>
       val router: LocalRouter = host
       val records = Future.all {
-        router.topic_domain.destination_by_id.values.map { value  =>
+        router.local_topic_domain.destination_by_id.values.map { value  =>
           monitoring(value) {
             value.status
           }
@@ -529,7 +529,7 @@ case class BrokerResource() extends Reso
   def topic(@PathParam("id") id : String, @PathParam("name") name : String):TopicStatusDTO = {
     with_virtual_host(id) { host =>
       val router:LocalRouter = host
-      val node = router.topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
+      val node = router.local_topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
       monitoring(node) {
         node.status
       }
@@ -540,7 +540,7 @@ case class BrokerResource() extends Reso
   def topic(@PathParam("id") id : String,@PathParam("name") name : String,  @PathParam("qid") qid : Long, @QueryParam("entries") entries:Boolean):QueueStatusDTO = {
     with_virtual_host(id) { host =>
       val router:LocalRouter = host
-      val node = router.topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
+      val node = router.local_topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
       val queue =router.queues_by_store_id.get(qid).getOrElse(result(NOT_FOUND))
       monitoring(node) {
         sync(queue) {
@@ -556,7 +556,7 @@ case class BrokerResource() extends Reso
             @QueryParam("q") q:String, @QueryParam("p") p:java.lang.Integer, @QueryParam("ps") ps:java.lang.Integer, @QueryParam("o") o:java.util.List[String] ):DataPageDTO = {
     with_virtual_host(id) { host =>
       val router: LocalRouter = host
-      val values: Iterable[Queue] = router.queue_domain.destination_by_id.values
+      val values: Iterable[Queue] = router.local_queue_domain.destination_by_id.values
 
       val records = sync_all(values) { value =>
         status(value, false)
@@ -571,7 +571,7 @@ case class BrokerResource() extends Reso
   def queue(@PathParam("id") id : String, @PathParam("name") name : String, @QueryParam("entries") entries:Boolean ):QueueStatusDTO = {
     with_virtual_host(id) { host =>
       val router: LocalRouter = host
-      val node = router.queue_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
+      val node = router.local_queue_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
       sync(node) {
         status(node, entries)
       }
@@ -583,7 +583,7 @@ case class BrokerResource() extends Reso
   def queue_delete(@PathParam("id") id : String, @PathParam("name") name : String):Unit = unwrap_future_result {
     with_virtual_host(id) { host =>
       val router: LocalRouter = host
-      val node = router.queue_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
+      val node = router.local_queue_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
       admining(node) {
         router._destroy_queue(node)
       }
@@ -603,7 +603,7 @@ case class BrokerResource() extends Reso
             @QueryParam("q") q:String, @QueryParam("p") p:java.lang.Integer, @QueryParam("ps") ps:java.lang.Integer, @QueryParam("o") o:java.util.List[String] ):DataPageDTO = {
     with_virtual_host(id) { host =>
       val router: LocalRouter = host
-      val values: Iterable[Queue] = router.topic_domain.durable_subscriptions_by_id.values
+      val values: Iterable[Queue] = router.local_dsub_domain.destination_by_id.values
 
       val records = sync_all(values) { value =>
         status(value, false)
@@ -617,7 +617,7 @@ case class BrokerResource() extends Reso
   def durable_subscription(@PathParam("id") id : String, @PathParam("name") name : String, @QueryParam("entries") entries:Boolean):QueueStatusDTO = {
     with_virtual_host(id) { host =>
       val router:LocalRouter = host
-      val node = router.topic_domain.durable_subscriptions_by_id.get(name).getOrElse(result(NOT_FOUND))
+      val node = router.local_dsub_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
       sync(node) {
         status(node, entries)
       }
@@ -630,7 +630,7 @@ case class BrokerResource() extends Reso
   def dsub_delete(@PathParam("id") id : String, @PathParam("name") name : String):Unit = unwrap_future_result {
     with_virtual_host(id) { host =>
       val router: LocalRouter = host
-      val node = router.topic_domain.durable_subscriptions_by_id.get(name).getOrElse(result(NOT_FOUND))
+      val node = router.local_dsub_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
       admining(node) {
         router._destroy_queue(node)
       }



Mime
View raw message