activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1240510 [1/3] - in /activemq/activemq-apollo/trunk: ./ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/ apollo-dto/src/main/java/org/apache/activemq/apollo/d...
Date Sat, 04 Feb 2012 14:42:53 GMT
Author: chirino
Date: Sat Feb  4 14:42:52 2012
New Revision: 1240510

URL: http://svn.apache.org/viewvc?rev=1240510&view=rev
Log:
Fixes APLO-136 - Broker complains when a dot is used in a dsub name

- Changed the Router interfaces to avoid leaky abstractions coming from the DTO module
- Less encoding/decoding of destinations.
- Full unicode destination names should now be possible.

Added:
    activemq/activemq-apollo/trunk/apollo.log
    activemq/activemq-apollo/trunk/svn-commit.tmp
    activemq/activemq-apollo/trunk/webapp-resources/
    activemq/activemq-apollo/trunk/webapp-resources/META-INF/
    activemq/activemq-apollo/trunk/webapp-resources/META-INF/maven/
    activemq/activemq-apollo/trunk/webapp-resources/META-INF/maven/org.apache.activemq/
    activemq/activemq-apollo/trunk/webapp-resources/META-INF/maven/org.apache.activemq/apollo-web/
    activemq/activemq-apollo/trunk/webapp-resources/META-INF/maven/org.apache.activemq/apollo-web/pom.properties
    activemq/activemq-apollo/trunk/webapp-resources/META-INF/maven/org.apache.activemq/apollo-web/pom.xml
    activemq/activemq-apollo/trunk/webapp-resources/WEB-INF/
    activemq/activemq-apollo/trunk/webapp-resources/WEB-INF/apollo-web.txt
    activemq/activemq-apollo/trunk/webapp-resources/WEB-INF/web.xml
    activemq/activemq-apollo/trunk/webapp-resources/css/
    activemq/activemq-apollo/trunk/webapp-resources/css/style.css
    activemq/activemq-apollo/trunk/webapp-resources/favicon.ico
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/DiskBenchmark.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Part.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Sat Feb  4 14:42:52 2012
@@ -18,16 +18,14 @@ package org.apache.activemq.apollo.broke
 
 import org.apache.activemq.apollo.selector.SelectorParser
 import org.apache.activemq.apollo.filter.{ConstantExpression, BooleanExpression}
-import org.apache.activemq.apollo.dto._
 import org.apache.activemq.apollo.util.ClassFinder
 import org.apache.activemq.apollo.util.path.Path
-import java.lang.String
 import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
-import Buffer._
+import collection.JavaConversions._
+import org.apache.activemq.apollo.dto._
 
 trait BindingFactory {
-  def create(binding_kind:AsciiBuffer, binding_data:Buffer):Binding
-  def create(binding_dto:DestinationDTO):Binding
+  def apply(binding_kind:AsciiBuffer, binding_data:Buffer):Binding
 }
 
 /**
@@ -42,22 +40,13 @@ object BindingFactory {
 
   def create(binding_kind:AsciiBuffer, binding_data:Buffer):Binding = {
     finder.singletons.foreach { provider=>
-      val rc = provider.create(binding_kind, binding_data)
+      val rc = provider(binding_kind, binding_data)
       if( rc!=null ) {
         return rc
       }
     }
     throw new IllegalArgumentException("Invalid binding type: "+binding_kind);
   }
-  def create(binding_dto:DestinationDTO):Binding = {
-    finder.singletons.foreach { provider=>
-      val rc = provider.create(binding_dto)
-      if( rc!=null ) {
-        return rc
-      }
-    }
-    throw new IllegalArgumentException("Invalid binding type: "+binding_dto);
-  }
 
 }
 
@@ -70,11 +59,6 @@ object BindingFactory {
 trait Binding {
 
   /**
-   * The name of the queue (could be the queue name or a subscription id etc)
-   */
-  def id:String
-
-  /**
    * Wires a queue into the a virtual host based on the binding information contained
    * in the buffer.
    */
@@ -82,19 +66,19 @@ trait Binding {
   
   def unbind(node:LocalRouter, queue:Queue)
 
-  def binding_kind:AsciiBuffer
+  def dto_class:Class[_ <:DestinationDTO]
+  def dto:DestinationDTO = JsonCodec.decode(binding_data, dto_class)
 
+  def binding_kind:AsciiBuffer
   def binding_data:Buffer
 
-  def binding_dto:DestinationDTO
+  def address:DestinationAddress
 
   def message_filter:BooleanExpression = ConstantExpression.TRUE
 
-  def destination:Path
-
   def config(host:VirtualHost):QueueDTO
 
-  override def toString: String = id
+  override def toString = address.toString
 }
 
 object QueueDomainQueueBinding extends BindingFactory {
@@ -102,23 +86,29 @@ object QueueDomainQueueBinding extends B
   val POINT_TO_POINT_KIND = new AsciiBuffer("ptp")
   val DESTINATION_PATH = new AsciiBuffer("default");
 
-  def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
+  def apply(binding_kind:AsciiBuffer, binding_data:Buffer):QueueDomainQueueBinding = {
     if( binding_kind == POINT_TO_POINT_KIND ) {
       val dto = JsonCodec.decode(binding_data, classOf[QueueDestinationDTO])
-      new QueueDomainQueueBinding(binding_data, dto)
+
+      // TODO: remove after next release.
+      // schema upgrade, we can get rid of this after the next release.
+      if( !dto.path.isEmpty ) {
+        dto.name = LocalRouter.destination_parser.encode_path_iter(dto.path.toIterable)
+      }
+
+      var path: Path = DestinationAddress.decode_path(dto.name)
+      new QueueDomainQueueBinding(binding_data, SimpleAddress("queue", path))
     } else {
       null
     }
   }
 
-  def create(binding_dto:DestinationDTO) = binding_dto match {
-    case ptp_dto:QueueDestinationDTO =>
-      new QueueDomainQueueBinding(JsonCodec.encode(ptp_dto), ptp_dto)
-    case _ => null
+  def apply(address:DestinationAddress):QueueDomainQueueBinding = {
+    val dto = new QueueDestinationDTO(address.id)
+    new QueueDomainQueueBinding(JsonCodec.encode(dto), address)
   }
 
   def queue_config(virtual_host:VirtualHost, path:Path):QueueDTO = {
-    import collection.JavaConversions._
     import LocalRouter.destination_parser._
 
     def matches(x:QueueDTO):Boolean = {
@@ -136,11 +126,10 @@ object QueueDomainQueueBinding extends B
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class QueueDomainQueueBinding(val binding_data:Buffer, val binding_dto:QueueDestinationDTO) extends Binding {
+class QueueDomainQueueBinding(val binding_data:Buffer, val address:DestinationAddress) extends Binding {
 
   import QueueDomainQueueBinding._
-
-  val destination = LocalRouter.destination_parser.decode_path(binding_dto.path)
+  def dto_class = classOf[QueueDestinationDTO]
   def binding_kind = POINT_TO_POINT_KIND
 
   def unbind(node: LocalRouter, queue: Queue) = {
@@ -151,8 +140,6 @@ class QueueDomainQueueBinding(val bindin
     node.local_queue_domain.bind(queue)
   }
 
-  val id = binding_dto.name(LocalRouter.destination_parser.path_separator)
-
   override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
 
   override def equals(o:Any):Boolean = o match {
@@ -161,9 +148,7 @@ class QueueDomainQueueBinding(val bindin
   }
 
 
-  def config(host:VirtualHost):QueueDTO = queue_config(host, destination)
-
-  override def toString = "queue: "+id
+  def config(host:VirtualHost):QueueDTO = queue_config(host, address.path)
 }
 
 
@@ -171,35 +156,47 @@ object DurableSubscriptionQueueBinding e
 
   val DURABLE_SUB_KIND = new AsciiBuffer("ds")
 
-  def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
+  def apply(binding_kind:AsciiBuffer, binding_data:Buffer):DurableSubscriptionQueueBinding = {
     if( binding_kind == DURABLE_SUB_KIND ) {
-      new DurableSubscriptionQueueBinding(binding_data, JsonCodec.decode(binding_data, classOf[DurableSubscriptionDestinationDTO]))
+      var dto = JsonCodec.decode(binding_data, classOf[DurableSubscriptionDestinationDTO])
+      // TODO: remove after next release.
+      // schema upgrade, we can get rid of this after the next release.
+      if( !dto.path.isEmpty ) {
+        dto.name = dto.path.get(0);
+      }
+      import collection.JavaConversions._
+      val topics = dto.topics.toSeq.toArray.map { t=>
+        new SimpleAddress("topic", DestinationAddress.decode_path(t.name))
+      }
+
+      var path: Path = DestinationAddress.decode_path(dto.name)
+      DurableSubscriptionQueueBinding(binding_data, SubscriptionAddress(path, dto.selector, topics))
     } else {
       null
     }
   }
-  def create(binding_dto:DestinationDTO) = {
-    if( binding_dto.isInstanceOf[DurableSubscriptionDestinationDTO] ) {
-      new DurableSubscriptionQueueBinding(JsonCodec.encode(binding_dto), binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO])
-    } else {
-      null
+  
+  def apply(destination:SubscriptionAddress):DurableSubscriptionQueueBinding = {
+    val dto = new DurableSubscriptionDestinationDTO(destination.id)
+    dto.selector = destination.selector
+    destination.topics.foreach { t =>
+      dto.topics.add(new TopicDestinationDTO(t.id))
     }
+    DurableSubscriptionQueueBinding(JsonCodec.encode(dto), destination)
   }
 
 
-  def dsub_config(host:VirtualHost, id:String) = {
+  def dsub_config(host:VirtualHost, address:SubscriptionAddress) = {
+    import LocalRouter.destination_parser._
     import collection.JavaConversions._
     def matches(x:DurableSubscriptionDTO):Boolean = {
-      if( x.id != null && x.id!=id ) {
-        return false
+      if( x.id != null ) {
+        return decode_filter(x.id).matches(address.path)
       }
-
       if( x.id_regex != null ) {
         // May need to cache the regex...
         val regex = x.id_regex.r
-        if( !regex.findFirstIn(id).isDefined ) {
-          return false
-        }
+        return regex.findFirstIn(address.id).isDefined
       }
       true
     }
@@ -213,11 +210,10 @@ object DurableSubscriptionQueueBinding e
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class DurableSubscriptionQueueBinding(val binding_data:Buffer, val binding_dto:DurableSubscriptionDestinationDTO) extends Binding {
+case class DurableSubscriptionQueueBinding(binding_data:Buffer, address:SubscriptionAddress) extends Binding {
   import DurableSubscriptionQueueBinding._
 
-  val destination = Path(binding_dto.subscription_id)
-
+  def dto_class = classOf[DurableSubscriptionDestinationDTO]
   def binding_kind = DURABLE_SUB_KIND
 
 
@@ -229,8 +225,6 @@ class DurableSubscriptionQueueBinding(va
     router.local_dsub_domain.bind(queue)
   }
 
-  def id = binding_dto.subscription_id
-
   override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
 
   override def equals(o:Any):Boolean = o match {
@@ -239,32 +233,20 @@ class DurableSubscriptionQueueBinding(va
   }
 
   override def message_filter = {
-    if ( binding_dto.selector==null ) {
+    if ( address.selector==null ) {
       ConstantExpression.TRUE
     } else {
-      SelectorParser.parse(binding_dto.selector)
+      SelectorParser.parse(address.selector)
     }
   }
 
-  def config(host:VirtualHost):DurableSubscriptionDTO = dsub_config(host, binding_dto.subscription_id)
-
-  override def toString = "dsub: "+binding_dto.subscription_id
+  def config(host:VirtualHost):DurableSubscriptionDTO = dsub_config(host, address)
 }
 
 
 object TempQueueBinding {
   val TEMP_DATA = new AsciiBuffer("")
   val TEMP_KIND = new AsciiBuffer("tmp")
-
-//  def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
-//    if( binding_kind == TEMP_KIND ) {
-//      new TempQueueBinding("", "")
-//    } else {
-//      null
-//    }
-//  }
-//
-//  def create(binding_dto:DestinationDTO) = throw new UnsupportedOperationException
 }
 
 /**
@@ -273,19 +255,21 @@ object TempQueueBinding {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class TempQueueBinding(val key:AnyRef, val destination:Path, val binding_dto:DestinationDTO) extends Binding {
+case class TempQueueBinding(key:AnyRef, address:DestinationAddress) extends Binding {
   import TempQueueBinding._
 
   def binding_kind = TEMP_KIND
   def binding_data = TEMP_DATA
 
+
+  override def dto: DestinationDTO = null
+  def dto_class = null
+
   def unbind(router: LocalRouter, queue: Queue) = {}
   def bind(router: LocalRouter, queue: Queue) = {}
 
   override def hashCode = if(key==null) 0 else key.hashCode
 
-  def id = key.toString
-
   def config(host: VirtualHost) = new QueueDTO
 
   override def equals(o:Any):Boolean = o match {
@@ -293,5 +277,5 @@ class TempQueueBinding(val key:AnyRef, v
     case _ => false
   }
 
-  override def toString = "temp queue: "+key
+  override def toString = super.toString+":"+key
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Sat Feb  4 14:42:52 2012
@@ -168,7 +168,7 @@ class Delivery {
   /**
    * Where the delivery is originating from.
    */
-  var sender:DestinationDTO = _
+  var sender:DestinationAddress = _
 
   /**
    * Total size of the delivery.  Used for resource allocation tracking

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala Sat Feb  4 14:42:52 2012
@@ -18,10 +18,8 @@ package org.apache.activemq.apollo.broke
 
 import org.apache.activemq.apollo.util.path.PathParser
 import scala.collection.mutable.ListBuffer
-import collection.JavaConversions._
 import java.lang.StringBuilder
 import java.util.regex.Pattern
-import org.apache.activemq.apollo.dto._
 import scala.Array
 
 /**
@@ -35,7 +33,6 @@ class DestinationParser extends PathPars
   var temp_queue_prefix = "temp-queue:"
   var temp_topic_prefix = "temp-topic:"
   var destination_separator = ","
-  var sanitize_destinations = false
 
   def copy(other:DestinationParser) = {
     super.copy(other)
@@ -48,49 +45,46 @@ class DestinationParser extends PathPars
     this
   }
 
-  def encode_destination(value: Array[DestinationDTO]): String = {
-    if (value == null) {
+  def encode_destination(addresses: Array[_ <: DestinationAddress]): String = {
+    if (addresses == null) {
       null
     } else {
       val rc = new StringBuilder
-      value.foreach { dest =>
+      addresses.foreach { address =>
         if (rc.length() != 0 ) {
           assert( destination_separator!=null )
           rc.append(destination_separator)
         }
-        dest match {
-          case d:QueueDestinationDTO =>
-            if ( d.temp() ) {
-              if( temp_queue_prefix!=null ) {
-                rc.append(temp_queue_prefix)
-              }
-            } else {
-              if( queue_prefix!=null ) {
-                rc.append(queue_prefix)
-              }
-            }
-            rc.append(encode_path_iter(dest.path.toIterable, sanitize_destinations))
-          case d:DurableSubscriptionDestinationDTO =>
-            if( dsub_prefix!=null ) {
-              rc.append(dsub_prefix)
-            }
-            rc.append(unsanitize_destination_part(d.subscription_id))
-          case d:TopicDestinationDTO =>
-            if ( d.temp() ) {
-              if( temp_topic_prefix!=null ) {
-                rc.append(temp_topic_prefix)
-              }
-            } else {
-              if( topic_prefix!=null ) {
-                rc.append(topic_prefix)
-              }
-            }
-            rc.append(encode_path_iter(dest.path.toIterable, sanitize_destinations))
-          case _ =>
-            throw new Exception("Uknown destination type: "+dest.getClass);
-        }
+        rc.append(encode_destination(address))
+      }
+      rc.toString
+    }
+  }
 
+  def encode_destination(address: DestinationAddress): String = {
+    if (address == null) {
+      null
+    } else {
+      val rc = new StringBuilder
+      address.domain match {
+        case "temp-queue" => if( temp_queue_prefix!=null ) {
+          rc.append(temp_queue_prefix)
+        }
+        case "queue" => if( queue_prefix!=null ) {
+          rc.append(queue_prefix)
+        }
+        case "temp-topic" => if( temp_topic_prefix!=null ) {
+          rc.append(temp_topic_prefix)
+        }
+        case "topic" => if( topic_prefix!=null ) {
+          rc.append(topic_prefix)
+        }
+        case "dsub" => if( dsub_prefix!=null ) {
+          rc.append(dsub_prefix)
+        }
+        case _ => throw sys.error("Uknown domain: "+address.domain);
       }
+      rc.append(encode_path(address.path))
       rc.toString
     }
   }
@@ -102,14 +96,14 @@ class DestinationParser extends PathPars
    * @param compositeSeparator
    * @return
    */
-  def decode_multi_destination(value: String, unqualified:(String)=>DestinationDTO=null): Array[DestinationDTO] = {
+  def decode_multi_destination(value: String, unqualified:(String)=>SimpleAddress=null): Array[SimpleAddress] = {
     if (value == null) {
       return null;
     }
 
     if (destination_separator!=null && value.contains(destination_separator)) {
       var rc = value.split(Pattern.quote(destination_separator));
-      var dl = ListBuffer[DestinationDTO]()
+      var dl = ListBuffer[SimpleAddress]()
       for (buffer <- rc) {
         val d = decode_single_destination(buffer, unqualified)
         if (d == null) {
@@ -135,22 +129,22 @@ class DestinationParser extends PathPars
    * @param compositeSeparator
    * @return
    */
-  def decode_single_destination(value: String, unqualified: (String) => DestinationDTO): DestinationDTO = {
+  def decode_single_destination(value: String, unqualified: (String) => SimpleAddress): SimpleAddress = {
     if (queue_prefix != null && value.startsWith(queue_prefix)) {
       var name = value.substring(queue_prefix.length)
-      return new QueueDestinationDTO(parts(name, sanitize_destinations))
+      return new SimpleAddress("queue", decode_path(name))
     } else if (topic_prefix != null && value.startsWith(topic_prefix)) {
       var name = value.substring(topic_prefix.length)
-      return new TopicDestinationDTO(parts(name, sanitize_destinations))
+      return new SimpleAddress("topic", decode_path(name))
     } else if (dsub_prefix != null && value.startsWith(dsub_prefix)) {
-      var name = sanitize_destination_part(value.substring(dsub_prefix.length))
-      return new DurableSubscriptionDestinationDTO(name).direct();
+      var name = value.substring(dsub_prefix.length)
+      return new SimpleAddress("dsub", decode_path(name))
     } else if (temp_topic_prefix != null && value.startsWith(temp_topic_prefix)) {
       var name = value.substring(temp_topic_prefix.length)
-      return new TopicDestinationDTO(parts(name, sanitize_destinations)).temp(true)
+      return new SimpleAddress("temp-topic", decode_path(name))
     } else if (temp_queue_prefix != null && value.startsWith(temp_queue_prefix)) {
       var name = value.substring(temp_queue_prefix.length)
-      return new QueueDestinationDTO(parts(name, sanitize_destinations)).temp(true)
+      return new SimpleAddress("temp-queue", decode_path(name))
     } else if (unqualified != null) {
       return unqualified(value)
     } else {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Sat Feb  4 14:42:52 2012
@@ -118,16 +118,16 @@ object RouterListenerFactory {
  */
 trait DomainDestination extends SecuredResource {
 
-  def id:String
+  def address:DestinationAddress
+  def id = address.id
   def virtual_host:VirtualHost
 
-  def destination_dto:DestinationDTO
 
-  def bind (destination:DestinationDTO, consumer:DeliveryConsumer)
-  def unbind (consumer:DeliveryConsumer, persistent:Boolean)
+  def bind (bind_address:BindAddress, consumer:DeliveryConsumer):Unit
+  def unbind (consumer:DeliveryConsumer, persistent:Boolean):Unit
 
-  def connect (destination:DestinationDTO, producer:BindableDeliveryProducer)
-  def disconnect (producer:BindableDeliveryProducer)
+  def connect (connect_address:ConnectAddress, producer:BindableDeliveryProducer):Unit
+  def disconnect (producer:BindableDeliveryProducer):Unit
 
   def update(on_completed:Runnable):Unit
 
@@ -144,13 +144,12 @@ object LocalRouter extends Log {
     if( dto.id == null ) {
       true
     } else {
-      val parts = destination_parser.parts(dto.id)
-      val path = destination_parser.decode_path(parts)
+      val path = destination_parser.decode_path(dto.id)
       PathParser.containsWildCards(path)
     }
   }
 
-  class ConsumerContext(val destination:DestinationDTO, val consumer:DeliveryConsumer, val security:SecurityContext) {
+  class ConsumerContext(val bind_address:BindAddress, val consumer:DeliveryConsumer, val security:SecurityContext) {
     override def hashCode: Int = consumer.hashCode
 
     override def equals(obj: Any): Boolean = {
@@ -161,7 +160,7 @@ object LocalRouter extends Log {
     }
   }
 
-  class ProducerContext(val destination:DestinationDTO, val producer:BindableDeliveryProducer, val security:SecurityContext) {
+  class ProducerContext(val connect_address:ConnectAddress, val producer:BindableDeliveryProducer, val security:SecurityContext) {
     override def hashCode: Int = producer.hashCode
 
     override def equals(obj: Any): Boolean = {
@@ -203,24 +202,25 @@ class LocalRouter(val virtual_host:Virtu
 
   def authorizer = virtual_host.authorizer
   
-  
-  def is_temp(destination:DestinationDTO) = {
-    destination.getClass!=classOf[DurableSubscriptionDestinationDTO] && destination.path.size() >= 1 && destination.path.get(0) == "temp"
+  def is_temp(address:DestinationAddress) = {
+    address.domain != "dsub" && address.id.startsWith("temp.")
   }
   
-  def temp_owner(destination:DestinationDTO) = {
-    if( destination.path.size() < 3 ) {
+  def temp_owner(address:DestinationAddress) = {
+    if( address.path.parts.length < 3 ) {
       None
     } else {
       try {
-        Some((destination.path.get(1), destination.path.get(2)))
+        val broker = address.path.parts(1).asInstanceOf[LiteralPart]
+        val owner = address.path.parts(2).asInstanceOf[LiteralPart]
+        Some((broker.value, owner.value))
       } catch {
         case _ => None
       }
     }
   }
 
-  trait Domain[D <: DomainDestination, DTO <: DestinationDTO] {
+  trait Domain[D <: DomainDestination] {
 
     // holds all the destinations in the domain by id
     var destination_by_id = LinkedHashMap[String, D]()
@@ -247,13 +247,14 @@ class LocalRouter(val virtual_host:Virtu
     def auto_create_on_connect = auto_create_destinations
     def auto_create_on_bind = auto_create_destinations
 
-    def can_create_destination(path:Path, destination:DTO, security:SecurityContext):Option[String]
-    def create_destination(path:Path, destination:DTO, security:SecurityContext):Result[D,String]
+    def can_create_destination(address:DestinationAddress, security:SecurityContext):Option[String]
+    def create_destination(address:DestinationAddress, security:SecurityContext):Result[D,String]
+
 
-    def get_or_create_destination(path:Path, destination:DTO, security:SecurityContext):Result[D,String] = {
-      Option(destination_by_path.chooseValue(path)).
+    def get_or_create_destination(address:DestinationAddress, security:SecurityContext):Result[D,String] = {
+      Option(destination_by_path.chooseValue(address.path)).
       map(Success(_)).
-      getOrElse( create_destination(path, destination, security))
+      getOrElse(create_destination(address, security))
     }
 
     var add_destination = (path:Path, dest:D) => {
@@ -264,12 +265,12 @@ class LocalRouter(val virtual_host:Virtu
       import JavaConversions._
       consumers_by_path.get( path ).foreach { x=>
         if( authorizer.can(x.security, bind_action(x.consumer), dest) ) {
-          dest.bind(x.destination, x.consumer)
+          dest.bind(x.bind_address, x.consumer)
         }
       }
       producers_by_path.get( path ).foreach { x=>
         if( authorizer.can(x.security, "send", dest) ) {
-          dest.connect(x.destination, x.producer)
+          dest.connect(x.connect_address, x.producer)
         }
       }
     }
@@ -279,14 +280,14 @@ class LocalRouter(val virtual_host:Virtu
       destination_by_id.remove(dest.id)
     }
 
-    def can_destroy_destination(path:Path, destination:DTO, security:SecurityContext):Option[String] = {
+    def can_destroy_destination(address:DestinationAddress, security:SecurityContext):Option[String] = {
       if( security==null ) {
         return None
       }
 
-      for(dest <- get_destination_matches(path)) {
-        if( is_temp(destination) ) {
-          val owner = temp_owner(destination).get
+      for(dest <- get_destination_matches(address.path)) {
+        if( is_temp(address) ) {
+          val owner = temp_owner(address).get
           for( connection <- security.session_id) {
             if( (virtual_host.broker.id, connection) != owner ) {
               return Some("Not authorized to destroy the temp %s '%s'. Principals=%s".format(dest.resource_kind.id, dest.id, security.principal_dump))
@@ -300,18 +301,19 @@ class LocalRouter(val virtual_host:Virtu
       }
       None
     }
-    def destroy_destination(path:Path, destination:DTO, security: SecurityContext):Unit
+    
+    def destroy_destination(address:DestinationAddress, security: SecurityContext):Unit
 
     def bind_action(consumer:DeliveryConsumer):String
 
-    def can_bind_all(path:Path, destination:DTO, consumer:DeliveryConsumer, security:SecurityContext):Option[String] = {
+    def can_bind_all(bind_address:BindAddress, consumer:DeliveryConsumer, security:SecurityContext):Option[String] = {
       if( security==null ) {
         return None
       }
 
       // Only allow the owner to bind.
-      if( is_temp(destination) ) {
-        temp_owner(destination) match {
+      if( is_temp(bind_address) ) {
+        temp_owner(bind_address) match {
           case Some(owner) =>
             for( connection <- security.session_id) {
               if( (virtual_host.broker.id, connection) != owner ) {
@@ -323,17 +325,17 @@ class LocalRouter(val virtual_host:Virtu
         }
       }
 
-      val wildcard = PathParser.containsWildCards(path)
-      var matches = get_destination_matches(path)
+      val wildcard = PathParser.containsWildCards(bind_address.path)
+      var matches = get_destination_matches(bind_address.path)
 
       // Should we attempt to auto create the destination?
       if( !wildcard ) {
         if ( matches.isEmpty && auto_create_on_bind ) {
-          val rc = create_destination(path, destination, security)
+          val rc = create_destination(bind_address, security)
           if( rc.failed ) {
             return Some(rc.failure)
           }
-          matches = get_destination_matches(path)
+          matches = get_destination_matches(bind_address.path)
         }
         if( matches.isEmpty ) {
           return Some("The destination does not exist.")
@@ -349,24 +351,23 @@ class LocalRouter(val virtual_host:Virtu
       None
     }
 
-    def bind(path:Path, destination:DTO, consumer:DeliveryConsumer, security:SecurityContext):Unit = {
-      var matches = get_destination_matches(path)
+    def bind(bind_address:BindAddress, consumer:DeliveryConsumer, security:SecurityContext):Unit = {
+      var matches = get_destination_matches(bind_address.path)
       matches.foreach { dest=>
         if( authorizer.can(security, bind_action(consumer), dest) ) {
-          dest.bind(destination, consumer)
+          dest.bind(bind_address, consumer)
           for( l <- router_listeners) {
             l.on_bind(dest, consumer, security)
           }
         }
       }
       consumer.retain
-      consumers_by_path.put(path, new ConsumerContext(destination, consumer, security))
+      consumers_by_path.put(bind_address.path, new ConsumerContext(bind_address, consumer, security))
     }
 
-    def unbind(destination:DTO, consumer:DeliveryConsumer, persistent:Boolean, security: SecurityContext) = {
-      val path = destination_parser.decode_path(destination.path)
-      if( consumers_by_path.remove(path, new ConsumerContext(destination, consumer, null) ) ) {
-        get_destination_matches(path).foreach{ dest=>
+    def unbind(bind_address:BindAddress, consumer:DeliveryConsumer, persistent:Boolean, security: SecurityContext) = {
+      if( consumers_by_path.remove(bind_address.path, new ConsumerContext(bind_address, consumer, null) ) ) {
+        get_destination_matches(bind_address.path).foreach{ dest=>
           dest.unbind(consumer, persistent)
           for( l <- router_listeners) {
             l.on_unbind(dest, consumer, persistent)
@@ -376,10 +377,9 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def can_connect_all(path:Path, destination:DTO, producer:BindableDeliveryProducer, security:SecurityContext):Option[String] = {
-
-      val wildcard = PathParser.containsWildCards(path)
-      var matches = get_destination_matches(path)
+    def can_connect_all(address:DestinationAddress, producer:BindableDeliveryProducer, security:SecurityContext):Option[String] = {
+      val wildcard = PathParser.containsWildCards(address.path)
+      var matches = get_destination_matches(address.path)
 
       if( wildcard ) {
 
@@ -391,11 +391,11 @@ class LocalRouter(val virtual_host:Virtu
 
         // Should we attempt to auto create the destination?
         if ( matches.isEmpty && auto_create_on_connect ) {
-          val rc = create_destination(path, destination, security)
+          val rc = create_destination(address, security)
           if( rc.failed ) {
             return Some(rc.failure)
           }
-          matches = get_destination_matches(path)
+          matches = get_destination_matches(address.path)
         }
 
         if( matches.isEmpty ) {
@@ -414,22 +414,21 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def connect(path:Path, destination:DTO, producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
-      get_destination_matches(path).foreach { dest=>
+    def connect(connect_address:ConnectAddress, producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
+      get_destination_matches(connect_address.path).foreach { dest=>
         if( authorizer.can(security, "send", dest) ) {
-          dest.connect(destination, producer)
+          dest.connect(connect_address, producer)
           for( l <- router_listeners) {
             l.on_connect(dest, producer, security)
           }
         }
       }
-      producers_by_path.put(path, new ProducerContext(destination, producer, security))
+      producers_by_path.put(connect_address.path, new ProducerContext(connect_address, producer, security))
     }
 
-    def disconnect(destination:DTO, producer:BindableDeliveryProducer) = {
-      val path = destination_parser.decode_path(destination.path)
-      producers_by_path.remove(path, new ProducerContext(destination, producer, null))
-      get_destination_matches(path).foreach { dest=>
+    def disconnect(connect_address:ConnectAddress, producer:BindableDeliveryProducer) = {
+      producers_by_path.remove(connect_address.path, new ProducerContext(connect_address, producer, null))
+      get_destination_matches(connect_address.path).foreach { dest=>
         dest.disconnect(producer)
         for( l <- router_listeners) {
           l.on_disconnect(dest, producer)
@@ -440,7 +439,7 @@ class LocalRouter(val virtual_host:Virtu
   }
 
 
-  class TopicDomain extends Domain[Topic, TopicDestinationDTO] {
+  class TopicDomain extends Domain[Topic] {
 
     def topic_config(name:Path):TopicDTO = {
       import collection.JavaConversions._
@@ -450,7 +449,8 @@ class LocalRouter(val virtual_host:Virtu
       }.getOrElse(new TopicDTO)
     }
 
-    def destroy_destination(path:Path, destination: TopicDestinationDTO, security: SecurityContext): Unit = {
+    def destroy_destination(address: DestinationAddress, security: SecurityContext): Unit = {
+      val path:Path = address.path
       val matches = get_destination_matches(path)
       matches.foreach { dest =>
         for( l <- router_listeners) {
@@ -460,18 +460,15 @@ class LocalRouter(val virtual_host:Virtu
         // Disconnect the producers.
         dest.disconnect_producers
 
-        // Delete the durable subs which
-        for( queue <- dest.durable_subscriptions ) {
-          // we delete the durable sub if it's not wildcard'ed
-          if( !PathParser.containsWildCards(queue.binding.destination) ) {
-            _destroy_queue(queue)
-          }
+        // Disconnect the durable subs
+        for( dsub <- dest.durable_subscriptions ) {
+          dest.unbind_durable_subscription(dsub)
         }
 
+        // Delete any consumer temp queues..
         for( consumer <- dest.consumers ) {
           consumer match {
             case queue:Queue =>
-              // Delete any attached queue consumers..
               _destroy_queue(queue)
             case _ =>
           }
@@ -482,18 +479,18 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def can_create_destination(path:Path, destination:TopicDestinationDTO, security:SecurityContext):Option[String] = {
+    def can_create_destination(address:DestinationAddress, security:SecurityContext):Option[String] = {
       if (security==null) {
         return None;
       }
 
       // We can't create a wild card destination.. only wild card subscriptions.
-      assert( !PathParser.containsWildCards(path) )
+      assert( !PathParser.containsWildCards(address.path) )
       // A new destination is being created...
 
       val resource = new SecuredResource() {
         def resource_kind = TopicKind
-        def id = destination_parser.encode_path(path)
+        def id = destination_parser.encode_path(address.path)
       }
       if( !authorizer.can(security, "create", resource)) {
         return Some("Not authorized to create the topic '%s'. Principals=%s".format(resource.id, security.principal_dump))
@@ -502,23 +499,23 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def create_destination(path:Path, destination:TopicDestinationDTO, security:SecurityContext):Result[Topic,String] = {
+    def create_destination(address:DestinationAddress, security:SecurityContext):Result[Topic,String] = {
       // We can't create a wild card destination.. only wild card subscriptions.
-      assert( !PathParser.containsWildCards(path) )
+      assert( !PathParser.containsWildCards(address.path) )
 
       // A new destination is being created...
-      val dto = topic_config(path)
+      val dto = topic_config(address.path)
 
       val resource = new SecuredResource() {
         def resource_kind = TopicKind
-        def id = destination_parser.encode_path(path)
+        def id = address.id
       }
       if( !authorizer.can(security, "create", resource)) {
         return Failure("Not authorized to create the topic '%s'. Principals=%s".format(resource.id, security.principal_dump))
       }
 
-      val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO], ()=>topic_config(path), path.toString(destination_parser), path)
-      add_destination(path, topic)
+      val topic = new Topic(LocalRouter.this, address, ()=>topic_config(address.path))
+      add_destination(address.path, topic)
 
       for( l <- router_listeners) {
         l.on_create(topic, security)
@@ -530,51 +527,43 @@ class LocalRouter(val virtual_host:Virtu
 
   }
 
-  class DsubDomain extends Domain[Queue, DurableSubscriptionDestinationDTO] {
+  class DsubDomain extends Domain[Queue] {
 
     override def auto_create_on_connect = false
 
-    def dsub_config(subid:String) = DurableSubscriptionQueueBinding.dsub_config(virtual_host, subid)
-
     def bind(queue:Queue) = {
       assert_executing
-      val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
-      val path = queue.binding.destination
-
-      add_destination(path, queue)
+      val address = queue.address.asInstanceOf[SubscriptionAddress]
+      add_destination(address.path, queue)
 
       import collection.JavaConversions._
-      destination.topics.foreach { topic =>
-        val path = destination_parser.decode_path(topic.path)
-        val wildcard = PathParser.containsWildCards(path)
-        var matches = local_topic_domain.get_destination_matches(path)
+      address.topics.foreach { topic =>
+
+        val wildcard = PathParser.containsWildCards(topic.path)
+        var matches = local_topic_domain.get_destination_matches(topic.path)
 
         // We may need to create the topic...
         if( !wildcard && matches.isEmpty ) {
-          local_topic_domain.create_destination(path, topic, null)
-          matches = local_topic_domain.get_destination_matches(path)
+          local_topic_domain.create_destination(topic, null)
+          matches = local_topic_domain.get_destination_matches(topic.path)
         }
-        matches.foreach( _.bind_durable_subscription(destination, queue) )
+        matches.foreach( _.bind_durable_subscription(address, queue) )
       }
     }
 
     def unbind(queue:Queue) = {
       assert_executing
-      val destination = queue.destination_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
+      val address = queue.address.asInstanceOf[SubscriptionAddress]
 
-      import collection.JavaConversions._
-      destination.topics.foreach { topic =>
-        val path = destination_parser.decode_path(topic.path)
-        var matches = local_topic_domain.get_destination_matches(path)
-        matches.foreach( _.unbind_durable_subscription(destination, queue) )
+      address.topics.foreach { topic =>
+        var matches = local_topic_domain.get_destination_matches(topic.path)
+        matches.foreach( _.unbind_durable_subscription(queue) )
       }
-
-      val path = Path(destination.subscription_id)
-      remove_destination(path, queue)
+      remove_destination(address.path, queue)
     }
 
-    def destroy_destination(path:Path, destination: DurableSubscriptionDestinationDTO, security: SecurityContext): Unit = {
-      destination_by_id.get(destination.subscription_id).foreach { sub=>
+    def destroy_destination(address: DestinationAddress, security: SecurityContext): Unit = {
+      destination_by_id.get(address.id).foreach { sub=>
         for( l <- router_listeners) {
           l.on_destroy(sub, security)
         }
@@ -582,31 +571,32 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def get_dsub_secured_resource(config: DurableSubscriptionDestinationDTO):SecuredResource = {
-      destination_by_id.get(config.subscription_id).getOrElse(new SecuredResource() {
+    def get_dsub_secured_resource(address: DestinationAddress):SecuredResource = {
+      destination_by_id.get(address.id).getOrElse(new SecuredResource() {
         def resource_kind = SecuredResource.DurableSubKind
-        def id = config.subscription_id
+        def id = address.id
       })
     }
 
-    def can_create_destination(path:Path, destination:DurableSubscriptionDestinationDTO, security:SecurityContext):Option[String] = {
-      // It's it's directly addressed durable sub, then it must already exist.
-      if( destination.is_direct && !destination_by_id.contains(destination.subscription_id)) {
-        Some("Durable subscription does not exist")
-      } else {
-        val resource = get_dsub_secured_resource(destination)
-        if( !authorizer.can(security, "create", resource)) {
-          Some("Not authorized to create the dsub '%s'. Principals=%s".format(resource.id, security.principal_dump))
-        } else {
-          None
-        }
+    def can_create_destination(address:DestinationAddress, security:SecurityContext):Option[String] = {
+      address match {
+        case address:SubscriptionAddress=>
+          val resource = get_dsub_secured_resource(address)
+          if( !authorizer.can(security, "create", resource)) {
+            Some("Not authorized to create the dsub '%s'. Principals=%s".format(resource.id, security.principal_dump))
+          } else {
+            None
+          }
+        case _ =>
+          // We can't create it.. not enough info.
+          Some("Durable subscription does not exist")
       }
     }
 
-    def create_destination(path:Path, destination:DurableSubscriptionDestinationDTO, security:SecurityContext):Result[Queue,String] = {
-      can_create_destination(path, destination, security).map(Failure(_)).getOrElse {
-        val dsub = _create_queue(BindingFactory.create(destination))
-        add_destination(path, dsub)
+    def create_destination(address:DestinationAddress, security:SecurityContext):Result[Queue,String] = {
+      can_create_destination(address, security).map(Failure(_)).getOrElse {
+        val dsub = _create_queue(DurableSubscriptionQueueBinding(address.asInstanceOf[SubscriptionAddress]))
+        add_destination(address.path, dsub)
         for( l <- router_listeners) {
           l.on_create(dsub, security)
         }
@@ -620,51 +610,36 @@ class LocalRouter(val virtual_host:Virtu
       "consume"
     }
 
+    override def bind(bind_address: BindAddress, consumer: DeliveryConsumer, security: SecurityContext) {
+      destination_by_id.get(bind_address.id).foreach { queue =>
 
-//    override def connect(path:Path, destination:DurableSubscriptionDestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
-//      // Connects a producer directly to a durable subscription..
-//      durable_subscriptions_by_id.get(destination.subscription_id).foreach { dest=>
-//        dest.connect(destination, producer)
-//        for( l <- router_listeners) {
-//          l.on_connect(dest, producer, security)
-//        }
-//      }
-//    }
-
-//    override def disconnect(destination:DurableSubscriptionDestinationDTO, producer:BindableDeliveryProducer) = {
-//      durable_subscriptions_by_id.get(destination.subscription_id).foreach { dest=>
-//        dest.disconnect(producer)
-//        for( l <- router_listeners) {
-//          l.on_disconnect(dest, producer)
-//        }
-//      }
-//    }
-
-    override def bind(path: Path, destination: DurableSubscriptionDestinationDTO, consumer: DeliveryConsumer, security: SecurityContext) {
-      destination_by_id.get(destination.subscription_id).foreach { queue =>
         // We may need to update the bindings...
-        if( !destination.is_direct() && queue.destination_dto != destination && authorizer.can(security, "consume", queue)) {
-
-          val binding = BindingFactory.create(destination)
-          if( queue.tune_persistent && queue.store_id == -1 ) {
-            val record = QueueRecord(queue.store_id, binding.binding_kind, binding.binding_data)
-            // Update the bindings
-            virtual_host.store.add_queue(record) { rc => Unit }
-          }
+        bind_address match {
+          case bind_address:SubscriptionAddress=>
+            if( queue.address!=bind_address && authorizer.can(security, "consume", queue)) {
+
+              val binding = DurableSubscriptionQueueBinding(bind_address)
+              if( queue.tune_persistent && queue.store_id == -1 ) {
+                val record = QueueRecord(queue.store_id, binding.binding_kind, binding.binding_data)
+                // Update the bindings
+                virtual_host.store.add_queue(record) { rc => Unit }
+              }
 
-          // and then rebind the queue in the router.
-          unbind(queue)
-          queue.binding = binding
-          bind(queue)
-
-          // Make sure the update is visible in the queue's thread context..
-          queue.dispatch_queue {
-            queue.binding = binding
-          }
+              // and then rebind the queue in the router.
+              unbind(queue)
+              queue.binding = binding
+              bind(queue)
+
+              // Make sure the update is visible in the queue's thread context..
+              queue.dispatch_queue {
+                queue.binding = binding
+              }
+            }
+          case _ =>
         }
 
         if( authorizer.can(security, bind_action(consumer), queue) ) {
-          queue.bind(destination, consumer)
+          queue.bind(bind_address, consumer)
           for( l <- router_listeners) {
             l.on_bind(queue, consumer, security)
           }
@@ -672,8 +647,8 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    override def unbind(destination: DurableSubscriptionDestinationDTO, consumer: DeliveryConsumer, persistent: Boolean, security: SecurityContext) = {
-      destination_by_id.get(destination.subscription_id).foreach { queue =>
+    override def unbind(bind_address:BindAddress, consumer: DeliveryConsumer, persistent: Boolean, security: SecurityContext) = {
+      destination_by_id.get(bind_address.id).foreach { queue =>
         queue.unbind(consumer, persistent)
         if( persistent ) {
           _destroy_queue(queue, security)
@@ -683,83 +658,37 @@ class LocalRouter(val virtual_host:Virtu
         }
       }
     }
-
-//    override def can_connect_all(path: Path, destination: DurableSubscriptionDestinationDTO, producer: BindableDeliveryProducer, security: SecurityContext) = {
-//      // User is trying to directly send to a durable subscription.. has to already exist.
-//      if( !durable_subscriptions_by_id.contains(destination.subscription_id) ) {
-//        Some("Durable subscription does not exist")
-//      } else {
-//        can_connect_dsub(destination, security)
-//      }
-//    }
-//
-//    def can_connect_dsub(config:DurableSubscriptionDestinationDTO, security:SecurityContext):Option[String] = {
-//      val resource = get_dsub_secured_resource(config)
-//      if( !authorizer.can(security, "send", resource) ) {
-//        Some("Not authorized to send to durable subscription '%s'. Principals=%s".format(resource.id, security.principal_dump))
-//      } else {
-//        None
-//      }
-//    }
-
-//    override def can_bind_all(path: Path, destination: DurableSubscriptionDestinationDTO, consumer: DeliveryConsumer, security: SecurityContext) = {
-//      super.can_bind_all(path, destination, consumer, security) orElse {
-//        if( !destination_by_id.contains(destination.subscription_id) ) {
-//          val resource = get_dsub_secured_resource(config)
-//          if( !authorizer.can(security, "create", resource) ) {
-//            Some("Not authorized to create the durable subscription '%s'. Principals=%s".format(resource.id, security.principal_dump))
-//          } else {
-//            None
-//          }
-//        } else {
-//          None
-//        } orElse {
-//          can_bind_dsub(destination, consumer, security)
-//        }
-//      }
-//    }
-//
-//    def can_bind_dsub(config:DurableSubscriptionDestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Option[String] = {
-//      val resource = get_dsub_secured_resource(config)
-//      val action = if ( consumer.browser ) "receive" else "consume"
-//      if( !authorizer.can(security, action, resource) ) {
-//        Some("Not authorized to %s from durable subscription '%s'. Principals=%s".format(action, resource.id, security.principal_dump))
-//      } else {
-//        None
-//      }
-//    }
   }
 
-  class QueueDomain extends Domain[Queue, QueueDestinationDTO] {
+  class QueueDomain extends Domain[Queue] {
 
     def bind(queue:Queue) = {
-      val path = queue.binding.destination
+      val path = queue.address.path
       assert( !PathParser.containsWildCards(path) )
       add_destination(path, queue)
 
       import OptionSupport._
       if( queue.config.mirrored.getOrElse(false) ) {
         // hook up the queue to be a subscriber of the topic.
-
-        val topic = local_topic_domain.get_or_create_destination(path, new TopicDestinationDTO(queue.binding.binding_dto.path), null).success
-        topic.bind(null, queue)
+        val topic = local_topic_domain.get_or_create_destination(SimpleAddress("topic", path), null).success
+        topic.bind(SimpleAddress("queue", path), queue)
       }
     }
 
     def unbind(queue:Queue) = {
-      val path = queue.binding.destination
+      val path = queue.address.path
       remove_destination(path, queue)
 
       import OptionSupport._
       if( queue.config.mirrored.getOrElse(false) ) {
         // unhook the queue from the topic
-        val topic = local_topic_domain.get_or_create_destination(path, new TopicDestinationDTO(queue.binding.binding_dto.path), null).success
+        val topic = local_topic_domain.get_or_create_destination(SimpleAddress("topic", path), null).success
         topic.unbind(queue, false)
       }
     }
 
-    def destroy_destination(path:Path, destination: QueueDestinationDTO, security: SecurityContext): Unit = {
-      val matches = get_destination_matches(path)
+    def destroy_destination(address: DestinationAddress, security: SecurityContext): Unit = {
+      val matches = get_destination_matches(address.path)
       matches.foreach { queue =>
         for( l <- router_listeners) {
           l.on_destroy(queue, security)
@@ -768,10 +697,10 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def can_create_destination(path: Path, destination:QueueDestinationDTO, security: SecurityContext):Option[String] = {
+    def can_create_destination(address:DestinationAddress, security: SecurityContext):Option[String] = {
       val resource = new SecuredResource() {
         def resource_kind = QueueKind
-        def id = destination_parser.encode_path(path)
+        def id = address.id
       }
       if( authorizer.can(security, "create", resource)) {
         None
@@ -780,14 +709,11 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def create_destination(path: Path, destination:QueueDestinationDTO, security: SecurityContext) = {
-      val dto = new QueueDestinationDTO
-      dto.path.addAll(destination.path)
-      val binding = QueueDomainQueueBinding.create(dto)
-
+    def create_destination(address:DestinationAddress, security: SecurityContext) = {
+      val binding = QueueDomainQueueBinding(address)
       val resource = new SecuredResource() {
         def resource_kind = QueueKind
-        def id = destination_parser.encode_path(path)
+        def id = address.id
       }
       if( authorizer.can(security, "create", resource)) {
         var queue = _create_queue(binding)
@@ -819,14 +745,13 @@ class LocalRouter(val virtual_host:Virtu
 
   protected def create_configure_destinations {
     import collection.JavaConversions._
-    def create_configured_dests[DTO <:DestinationDTO](list: ArrayList[_ <: StringIdDTO], d: Domain[_, DTO], f: (Array[String]) => DTO) = {
+    def create_configured_dests(list: ArrayList[_ <: StringIdDTO], d: Domain[_], to_address: (Path) => DestinationAddress) = {
       list.foreach { dto =>
         if (dto.id != null) {
           try {
-            val parts = destination_parser.parts(dto.id)
-            val path = destination_parser.decode_path(parts)
+            val path = destination_parser.decode_path(dto.id)
             if (!PathParser.containsWildCards(path)) {
-              d.get_or_create_destination(path, f(parts), null)
+              d.get_or_create_destination(to_address(path), null)
             }
           } catch {
             case x:PathException => warn(x, "Invalid destination id '%s'", dto.id)
@@ -834,23 +759,25 @@ class LocalRouter(val virtual_host:Virtu
         }
       }
     }
-    create_configured_dests(virtual_host.config.queues, local_queue_domain, (parts) => new QueueDestinationDTO(parts))
-    create_configured_dests(virtual_host.config.topics, local_topic_domain, (parts) => new TopicDestinationDTO(parts))
+    create_configured_dests(virtual_host.config.queues, local_queue_domain, (path) => SimpleAddress("queue", path))
+    create_configured_dests(virtual_host.config.topics, local_topic_domain, (path) => SimpleAddress("topic", path))
 
     virtual_host.config.dsubs.foreach { dto =>
       if (dto.id != null && ( dto.topic!=null || !dto.topics.isEmpty) ) {
+        val path = destination_parser.decode_path(dto.id)
+        val id = DestinationAddress.encode_path(path)
 
         // We will create the durable sub if it does not exist yet..
-        if( !local_dsub_domain.destination_by_id.contains(dto.id) ) {
-          val destination = new DurableSubscriptionDestinationDTO(dto.id)
-          destination.selector = dto.selector
-          if( dto.topic!=null ) {
-            destination.topics.add(new TopicDestinationDTO(destination_parser.parts(dto.topic)))
+        if( !PathParser.containsWildCards(path) &&
+            !local_dsub_domain.destination_by_id.contains(id) ) {
+
+          var topics = dto.topics.toList.map { n =>
+            SimpleAddress("topic", destination_parser.decode_path(n))
           }
-          dto.topics.foreach { n =>
-            destination.topics.add(new TopicDestinationDTO(destination_parser.parts(n)))
+          if( dto.topic!=null ) {
+            topics ::= SimpleAddress("topic", destination_parser.decode_path(dto.topic))
           }
-          _create_queue(BindingFactory.create(destination))
+          _create_queue(DurableSubscriptionQueueBinding(SubscriptionAddress(path, dto.selector, topics.toArray)))
         }
       }
     }
@@ -875,7 +802,7 @@ class LocalRouter(val virtual_host:Virtu
                     virtual_host.store.remove_queue(queue_key){x=> task.run}
                   } else {
                     var binding = BindingFactory.create(record.binding_kind, record.binding_data)
-                    if( is_temp(binding.binding_dto) ) {
+                    if( is_temp(binding.address) ) {
                       // These are the temp queues clients create.
                       virtual_host.store.remove_queue(queue_key){x=> task.run}
                     } else {
@@ -917,8 +844,8 @@ class LocalRouter(val virtual_host:Virtu
     val min_create_time = virtual_host.broker.now - 1000;
 
     // Auto delete temp destinations..
-    local_queue_domain.destinations.filter(x=> is_temp(x.destination_dto)).foreach { queue=>
-      val owner = temp_owner(queue.destination_dto).get
+    local_queue_domain.destinations.filter(x=> is_temp(x.address)).foreach { queue=>
+      val owner = temp_owner(queue.address).get
       if( owner._1==virtual_host.broker.id // are we the broker that owns the temp destination?
           && !active_connections.contains(owner._2) // Has the connection not around?
           && queue.service_state.since < min_create_time // It's not a recently created destination?
@@ -926,13 +853,13 @@ class LocalRouter(val virtual_host:Virtu
         _destroy_queue(queue)
       }
     }
-    local_topic_domain.destinations.filter(x=> is_temp(x.destination_dto)).foreach { topic =>
-      val owner = temp_owner(topic.destination_dto).get
+    local_topic_domain.destinations.filter(x=> is_temp(x.address)).foreach { topic =>
+      val owner = temp_owner(topic.address).get
       if( owner._1==virtual_host.broker.id // are we the broker that owns the temp destination?
           && !active_connections.contains(owner._2) // Has the connection not around?
           && topic.created_at < min_create_time // It's not a recently created destination?
       ) {
-        local_topic_domain.destroy_destination(topic.path, topic.destination_dto, null)
+        local_topic_domain.destroy_destination(topic.address, null)
       }
     }
   }
@@ -1005,34 +932,31 @@ class LocalRouter(val virtual_host:Virtu
   final val local_topic_domain = new TopicDomain
   final val local_dsub_domain = new DsubDomain
 
-  def queue_domain: Domain[_ <: DomainDestination, QueueDestinationDTO] = local_queue_domain
-  def topic_domain:Domain[_ <: DomainDestination, TopicDestinationDTO] = local_topic_domain
-  def dsub_domain:Domain[_ <: DomainDestination, DurableSubscriptionDestinationDTO] = local_dsub_domain
+  def queue_domain: Domain[_ <: DomainDestination] = local_queue_domain
+  def topic_domain:Domain[_ <: DomainDestination] = local_topic_domain
+  def dsub_domain:Domain[_ <: DomainDestination] = local_dsub_domain
 
-  def bind(destinations: Array[DestinationDTO], consumer: DeliveryConsumer, security: SecurityContext):Option[String] = {
+  def bind(addresses: Array[_ <: BindAddress], consumer: DeliveryConsumer, security: SecurityContext):Option[String] = {
     dispatch_queue.assertExecuting()
     if(!virtual_host.service_state.is_started) {
       return Some("virtual host stopped.")
     } else {
       try {
-        val actions = destinations.map { destination =>
-          destination match {
-            case destination:TopicDestinationDTO =>
-              val path = destination_parser.decode_path(destination.path)
-              val allowed = topic_domain.can_bind_all(path, destination, consumer, security)
-              def perform() = topic_domain.bind(path, destination, consumer, security)
+        val actions = addresses.map { address =>
+          address.domain match {
+            case "topic" =>
+              val allowed = topic_domain.can_bind_all(address, consumer, security)
+              def perform() = topic_domain.bind(address, consumer, security)
               (allowed, perform _)
-            case destination:QueueDestinationDTO =>
-              val path = destination_parser.decode_path(destination.path)
-              val allowed = queue_domain.can_bind_all(path, destination, consumer, security)
-              def perform() = queue_domain.bind(path, destination, consumer, security)
+            case "queue" =>
+              val allowed = queue_domain.can_bind_all(address, consumer, security)
+              def perform() = queue_domain.bind(address, consumer, security)
               (allowed, perform _)
-            case destination:DurableSubscriptionDestinationDTO =>
-              val path = Path(destination.subscription_id())
-              val allowed = dsub_domain.can_bind_all(path, destination, consumer, security)
-              def perform() = dsub_domain.bind(path, destination, consumer, security)
+            case "dsub" =>
+              val allowed = dsub_domain.can_bind_all(address, consumer, security)
+              def perform() = dsub_domain.bind(address, consumer, security)
               (allowed, perform _)
-            case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+            case _ => sys.error("Unknown domain: "+address.domain)
           }
         }
 
@@ -1050,49 +974,46 @@ class LocalRouter(val virtual_host:Virtu
     }
   }
 
-  def unbind(destinations: Array[DestinationDTO], consumer: DeliveryConsumer, persistent:Boolean, security: SecurityContext) = {
+  def unbind(addresses: Array[_ <: BindAddress], consumer: DeliveryConsumer, persistent:Boolean, security: SecurityContext) = {
     consumer.retain
     dispatch_queue {
-      destinations.foreach { destination=>
-        destination match {
-          case destination:TopicDestinationDTO =>
-            topic_domain.unbind(destination, consumer, persistent, security)
-          case destination:QueueDestinationDTO =>
-            queue_domain.unbind(destination, consumer, persistent, security)
-          case destination:DurableSubscriptionDestinationDTO =>
-            dsub_domain.unbind(destination, consumer, persistent, security)
-          case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+      addresses.foreach { address=>
+        address.domain match {
+          case "topic" =>
+            topic_domain.unbind(address, consumer, persistent, security)
+          case "queue" =>
+            queue_domain.unbind(address, consumer, persistent, security)
+          case "dsub" =>
+            dsub_domain.unbind(address, consumer, persistent, security)
+          case _ => sys.error("Unknown domain: "+address.domain)
         }
       }
       consumer.release
     }
   }
 
-  def connect(destinations: Array[DestinationDTO], producer: BindableDeliveryProducer, security: SecurityContext):Option[String] = {
+  def connect(addresses: Array[_ <: ConnectAddress], producer: BindableDeliveryProducer, security: SecurityContext):Option[String] = {
     dispatch_queue.assertExecuting()
     producer.retain
     try {
       if(!virtual_host.service_state.is_started) {
         return Some("virtual host stopped.")
       } else {
-        val actions = destinations.map { destination =>
-          destination match {
-            case destination:TopicDestinationDTO =>
-              val path = destination_parser.decode_path(destination.path)
-              val allowed = topic_domain.can_connect_all(path, destination, producer, security)
-              def perform() = topic_domain.connect(path, destination, producer, security)
+        val actions = addresses.map { address =>
+          address.domain match {
+            case "topic" =>
+              val allowed = topic_domain.can_connect_all(address, producer, security)
+              def perform() = topic_domain.connect(address, producer, security)
               (allowed, perform _)
-            case destination:QueueDestinationDTO =>
-              val path = destination_parser.decode_path(destination.path)
-              val allowed = queue_domain.can_connect_all(path, destination, producer, security)
-              def perform() = queue_domain.connect(path, destination, producer, security)
+            case "queue" =>
+              val allowed = queue_domain.can_connect_all(address, producer, security)
+              def perform() = queue_domain.connect(address, producer, security)
               (allowed, perform _)
-            case destination:DurableSubscriptionDestinationDTO =>
-              val path = Path(destination.subscription_id())
-              val allowed = dsub_domain.can_connect_all(path, destination, producer, security)
-              def perform() = dsub_domain.connect(path, destination, producer, security)
+            case "dsub" =>
+              val allowed = dsub_domain.can_connect_all(address, producer, security)
+              def perform() = dsub_domain.connect(address, producer, security)
               (allowed, perform _)
-            case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+            case _ => sys.error("Unknown domain: "+address.domain)
           }
         }
   
@@ -1111,17 +1032,17 @@ class LocalRouter(val virtual_host:Virtu
     }
   }
 
-  def disconnect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer) = {
+  def disconnect(addresses:Array[_ <: ConnectAddress], producer:BindableDeliveryProducer) = {
     dispatch_queue {
-      destinations.foreach { destination=>
-        destination match {
-          case destination:TopicDestinationDTO =>
-            topic_domain.disconnect(destination, producer)
-          case destination:QueueDestinationDTO =>
-            queue_domain.disconnect(destination, producer)
-          case destination:DurableSubscriptionDestinationDTO =>
-            dsub_domain.disconnect(destination, producer)
-          case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+      addresses.foreach { address=>
+        address.domain match {
+          case "topic" =>
+            topic_domain.disconnect(address, producer)
+          case "queue" =>
+            queue_domain.disconnect(address, producer)
+          case "dsub" =>
+            dsub_domain.disconnect(address, producer)
+          case _ => sys.error("Unknown domain: "+address.domain)
         }
       }
       producer.disconnected()
@@ -1129,30 +1050,27 @@ class LocalRouter(val virtual_host:Virtu
     }
   }
 
-  def create(destinations:Array[DestinationDTO], security: SecurityContext):Option[String] = {
+  def create(addresses:Array[_ <: DestinationAddress], security: SecurityContext):Option[String] = {
     dispatch_queue.assertExecuting()
     if(!virtual_host.service_state.is_started) {
       return Some("virtual host stopped.")
     } else {
 
-      val actions = destinations.map { destination =>
-        destination match {
-          case destination:TopicDestinationDTO =>
-            val path = destination_parser.decode_path(destination.path)
-            val allowed = topic_domain.can_create_destination(path, destination, security)
-            def perform() = topic_domain.create_destination(path, destination, security)
+      val actions = addresses.map { address =>
+        address.domain match {
+          case "topic" =>
+            val allowed = topic_domain.can_create_destination(address, security)
+            def perform() = topic_domain.create_destination(address, security)
             (allowed, perform _)
-          case destination:QueueDestinationDTO =>
-            val path = destination_parser.decode_path(destination.path)
-            val allowed = queue_domain.can_create_destination(path, destination, security)
-            def perform() = queue_domain.create_destination(path, destination, security)
+          case "queue" =>
+            val allowed = queue_domain.can_create_destination(address, security)
+            def perform() = queue_domain.create_destination(address, security)
             (allowed, perform _)
-          case destination:DurableSubscriptionDestinationDTO =>
-            val path = Path(destination.subscription_id())
-            val allowed = dsub_domain.can_create_destination(path, destination, security)
-            def perform() = dsub_domain.create_destination(path, destination, security)
+          case "dsub" =>
+            val allowed = dsub_domain.can_create_destination(address, security)
+            def perform() = dsub_domain.create_destination(address, security)
             (allowed, perform _)
-          case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+          case _ => sys.error("Unknown domain: "+address.domain)
         }
       }
 
@@ -1166,30 +1084,27 @@ class LocalRouter(val virtual_host:Virtu
     }
   }
 
-  def delete(destinations:Array[DestinationDTO], security: SecurityContext):Option[String] = {
+  def delete(addresses:Array[_ <: DestinationAddress], security: SecurityContext):Option[String] = {
     dispatch_queue.assertExecuting()
     if(!virtual_host.service_state.is_started) {
       return Some("virtual host stopped.")
     } else {
 
-      val actions = destinations.map { destination =>
-        destination match {
-          case destination:TopicDestinationDTO =>
-            val path = destination_parser.decode_path(destination.path)
-            val allowed = topic_domain.can_destroy_destination(path, destination, security)
-            def perform() = topic_domain.destroy_destination(path, destination, security)
+      val actions = addresses.map { address =>
+        address.domain match {
+          case "topic" =>
+            val allowed = topic_domain.can_destroy_destination(address, security)
+            def perform() = topic_domain.destroy_destination(address, security)
             (allowed, perform _)
-          case destination:QueueDestinationDTO =>
-            val path = destination_parser.decode_path(destination.path)
-            val allowed = queue_domain.can_destroy_destination(path, destination, security)
-            def perform() = queue_domain.destroy_destination(path, destination, security)
+          case "queue" =>
+            val allowed = queue_domain.can_destroy_destination(address, security)
+            def perform() = queue_domain.destroy_destination(address, security)
             (allowed, perform _)
-          case destination:DurableSubscriptionDestinationDTO =>
-            val path = Path(destination.subscription_id())
-            val allowed = dsub_domain.can_destroy_destination(path, destination, security)
-            def perform() = dsub_domain.destroy_destination(path, destination, security)
+          case "dsub" =>
+            val allowed = dsub_domain.can_destroy_destination(address, security)
+            def perform() = dsub_domain.destroy_destination(address, security)
             (allowed, perform _)
-          case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+          case _ => sys.error("Unknown domain: "+address.domain)
         }
       }
 
@@ -1204,23 +1119,19 @@ class LocalRouter(val virtual_host:Virtu
   }
 
 
-  def get_or_create_destination(id: DestinationDTO, security: SecurityContext) = dispatch_queue ! {
-    _get_or_create_destination(id, security)
+  def get_or_create_destination(adress: DestinationAddress, security: SecurityContext) = dispatch_queue ! {
+    _get_or_create_destination(adress, security)
   }
 
   /**
    * Returns the previously created queue if it already existed.
    */
-  def _get_or_create_destination(destination: DestinationDTO, security:SecurityContext): Result[DomainDestination, String] = {
-    val path = destination_parser.decode_path(destination.path)
-    destination match {
-      case destination:TopicDestinationDTO =>
-        topic_domain.get_or_create_destination(path, destination, security)
-      case destination:QueueDestinationDTO =>
-        queue_domain.get_or_create_destination(path, destination, security)
-      case destination:DurableSubscriptionDestinationDTO =>
-        dsub_domain.get_or_create_destination(path, destination, security)
-      case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+  def _get_or_create_destination(address: DestinationAddress, security:SecurityContext): Result[DomainDestination, String] = {
+    address.domain match {
+      case "queue" => queue_domain.get_or_create_destination(address, security)
+      case "topic" => topic_domain.get_or_create_destination(address, security)
+      case "dsub"  => dsub_domain.get_or_create_destination(address, security)
+      case _       => sys.error("Unknown domain: "+address.domain)
     }
   }
 
@@ -1232,15 +1143,15 @@ class LocalRouter(val virtual_host:Virtu
   //
   /////////////////////////////////////////////////////////////////////////////
 
-  var queues_by_binding = LinkedHashMap[Binding, Queue]()
+//  var queues_by_binding = LinkedHashMap[Binding, Queue]()
   var queues_by_store_id = LinkedHashMap[Long, Queue]()
 
-  /**
-   * Gets an existing queue.
-   */
-  def get_queue(dto:DestinationDTO) = dispatch_queue ! {
-    queues_by_binding.get(BindingFactory.create(dto))
-  }
+//  /**
+//   * Gets an existing queue.
+//   */
+//  def get_queue(dto:DestinationDTO) = dispatch_queue ! {
+//    queues_by_binding.get(BindingFactory.create(dto))
+//  }
 
   /**
    * Gets an existing queue.
@@ -1267,7 +1178,7 @@ class LocalRouter(val virtual_host:Virtu
     }
 
     queue.start
-    queues_by_binding.put(binding, queue)
+//    queues_by_binding.put(binding, queue)
     queues_by_store_id.put(qid, queue)
 
     // this causes the queue to get registered in the right location in
@@ -1290,19 +1201,19 @@ class LocalRouter(val virtual_host:Virtu
     }
   }
 
-  /**
-   * Returns true if the queue no longer exists.
-   */
-  def destroy_queue(dto:DestinationDTO, security:SecurityContext) = dispatch_queue ! { _destroy_queue(dto, security) }
-
-  def _destroy_queue(dto:DestinationDTO, security:SecurityContext):Option[String] = {
-    queues_by_binding.get(BindingFactory.create(dto)) match {
-      case Some(queue) =>
-        _destroy_queue(queue, security)
-      case None =>
-        Some("Does not exist")
-    }
-  }
+//  /**
+//   * Returns true if the queue no longer exists.
+//   */
+//  def destroy_queue(dto:DestinationDTO, security:SecurityContext) = dispatch_queue ! { _destroy_queue(dto, security) }
+//
+//  def _destroy_queue(dto:DestinationDTO, security:SecurityContext):Option[String] = {
+//    queues_by_binding.get(BindingFactory.create(dto)) match {
+//      case Some(queue) =>
+//        _destroy_queue(queue, security)
+//      case None =>
+//        Some("Does not exist")
+//    }
+//  }
 
   def _destroy_queue(queue:Queue, security:SecurityContext):Option[String] = {
     if( !authorizer.can(security, "destroy", queue) ) {
@@ -1317,7 +1228,7 @@ class LocalRouter(val virtual_host:Virtu
     queue.stop(dispatch_queue.runnable{
 
       queue.binding.unbind(this, queue)
-      queues_by_binding.remove(queue.binding)
+//      queues_by_binding.remove(queue.binding)
       queues_by_store_id.remove(queue.store_id)
       if (queue.tune_persistent) {
         queue.dispatch_queue {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sat Feb  4 14:42:52 2012
@@ -60,8 +60,6 @@ import Queue._
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class Queue(val router: LocalRouter, val store_id:Long, var binding:Binding, var config:QueueDTO) extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService with DomainDestination with Dispatched with SecuredResource {
-  def id = binding.id
-
   override def toString = binding.toString
 
   def virtual_host = router.virtual_host
@@ -81,7 +79,7 @@ class Queue(val router: LocalRouter, val
 
   override val dispatch_queue: DispatchQueue = createQueue(id);
 
-  def destination_dto: DestinationDTO = binding.binding_dto
+  def address = binding.address
 
   debug("created queue: " + id)
 
@@ -293,7 +291,7 @@ class Queue(val router: LocalRouter, val
     rc.id = this.id
     rc.state = this.service_state.toString
     rc.state_since = this.service_state.since
-    rc.binding = this.binding.binding_dto
+    rc.binding = this.binding.dto
     rc.config = this.config
     rc.metrics = this.get_queue_metrics
     rc.metrics.current_time = now
@@ -489,7 +487,7 @@ class Queue(val router: LocalRouter, val
 
   def on_queue_flushed = {
     if(stop_listener_waiting_for_flush!=null) {
-      destination_dto match {
+      address match {
         case d:DurableSubscriptionDestinationDTO =>
           DestinationMetricsSupport.add_destination_metrics(virtual_host.dead_dsub_metrics, get_queue_metrics)
         case t:TopicDestinationDTO =>
@@ -558,7 +556,7 @@ class Queue(val router: LocalRouter, val
         val entry = tail_entry
         tail_entry = new QueueEntry(Queue.this, next_message_seq)
         val queue_delivery = delivery.copy
-        queue_delivery.sender = destination_dto
+        queue_delivery.sender = address
         queue_delivery.seq = entry.seq
         entry.init(queue_delivery)
         
@@ -1009,20 +1007,20 @@ class Queue(val router: LocalRouter, val
 
   def disconnected() = throw new RuntimeException("unsupported")
 
-  def bind(destination:DestinationDTO, consumer: DeliveryConsumer) = {
+  def bind(bind_address:BindAddress, consumer: DeliveryConsumer) = {
     bind(consumer::Nil)
   }
   def unbind(consumer: DeliveryConsumer, persistent:Boolean):Unit = {
     unbind(consumer::Nil)
   }
 
-  def connect (destination:DestinationDTO, producer:BindableDeliveryProducer) = {
+  def connect (connect_address:ConnectAddress, producer:BindableDeliveryProducer) = {
     import OptionSupport._
     if( config.mirrored.getOrElse(false) ) {
       // this is a mirrored queue.. actually have the produce bind to the topic, instead of the
-      val topic_dto = new TopicDestinationDTO(binding.binding_dto.path)
-      val topic = router.local_topic_domain.get_or_create_destination(binding.destination, topic_dto, null).success
-      topic.connect(destination, producer)
+      val topic_address = new SimpleAddress("topic", binding.address.path)
+      val topic = router.local_topic_domain.get_or_create_destination(topic_address, null).success
+      topic.connect(topic_address, producer)
     } else {
       dispatch_queue {
         producers += producer
@@ -1036,8 +1034,8 @@ class Queue(val router: LocalRouter, val
   def disconnect (producer:BindableDeliveryProducer) = {
     import OptionSupport._
     if( config.mirrored.getOrElse(false) ) {
-      val topic_dto = new TopicDestinationDTO(binding.binding_dto.path)
-      val topic = router.local_topic_domain.get_or_create_destination(binding.destination, topic_dto, null).success
+      val topic_address = new SimpleAddress("topic", binding.address.path)
+      val topic = router.local_topic_domain.get_or_create_destination(topic_address, null).success
       topic.disconnect(producer)
     } else {
       dispatch_queue {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Sat Feb  4 14:42:52 2012
@@ -18,11 +18,132 @@ package org.apache.activemq.apollo.broke
 
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.util._
+import path._
 import scala.collection.immutable.List
-import org.apache.activemq.apollo.dto._
 import security.SecurityContext
 import store.StoreUOW
 import java.util.concurrent.atomic.AtomicReference
+import collection.mutable.ListBuffer
+import java.util.regex.Pattern
+import java.lang.String
+
+object DestinationAddress {
+  
+  def encode_path(path:Path) = {
+    val rc = new StringBuilder
+    var first = true
+    for (p <- path.parts) {
+      if ( !first ) {
+        rc.append(".")
+      }
+      first = false
+      p match {
+        case RootPart =>
+        case AnyChildPart => rc.append("*")
+        case AnyDescendantPart => rc.append("**")
+        case p:RegexChildPart => rc.append("*"+escape(p.regex.pattern()))
+        case p:LiteralPart => rc.append(escape(p.value))
+      }
+    }
+    rc.toString
+  }
+
+  val DOT_PATTERN = Pattern.compile("\\.");
+
+  def decode_path(value:String) = {
+    val rc = ListBuffer[Part]()
+    for (p <- DOT_PATTERN.split(value)) {
+      rc += (if( p startsWith "*" ) {
+        if( p.length()==1 ) {
+          AnyChildPart
+        } else if ( p=="**" ) {
+          AnyDescendantPart
+        } else {
+          val regex_text = unescape(p.substring(1))
+          RegexChildPart(Pattern.compile(regex_text))
+        }
+      } else {
+        LiteralPart(unescape(p))
+      })
+    }
+    new Path(rc.toList)
+  }
+
+  
+  def escape(value:String) = {
+    val rc = new StringBuffer(value.length())
+    def unicode_encode(c:Char) = {
+      rc.append("\\u%04x".format(c.toInt))
+    }
+    var i=0;
+    while( i < value.length() ) {
+      val c = value.charAt(i);
+      if ( c== '\\' ) {
+        rc.append("\\\\")
+      }  else if( c == '\n' ) {
+        rc.append("\\\n")
+      }  else if( c == '\r' ) {
+        rc.append("\\\r")
+      }  else if( c == '\t' ) {
+        rc.append("\\\t")
+      }  else if( c == '\b' ) {
+        rc.append("\\\b")
+      }  else if( c == '*' ) {
+        rc.append("\\w")
+      }  else if( c == '.' ) {
+        rc.append("\\d")
+      } else if  ( c < '!' || c > '~' ) {
+        unicode_encode(c)
+      }
+      rc.append(c)
+      i+=1
+    }
+    rc.toString
+  }
+  
+  def unescape(value:String) = {
+    val rc = new StringBuffer(value.length())
+    var i=0
+    while( i < value.length() ) {
+      val c = value.charAt(i);
+      if( c == '\\') {
+        i+=1
+        val c2 = value.charAt(i);
+        rc.append(c2 match {
+          case '\\' => '\\'
+          case 'n' => '\n'
+          case 'r' => '\r'
+          case 't' => '\t'
+          case 'b' => '\b'
+          case 'w' => '*'
+          case 'd' => '.'
+          case 'u' => 
+            i+=1
+            val rc = Integer.parseInt(value.substring(i, i+4), 16).toChar
+            i+=4
+            rc
+        })
+      } else {
+        rc.append(c)
+      }      
+      i+=1
+    }
+    rc.toString
+  }
+
+}
+sealed trait DestinationAddress {
+  def domain:String
+  def path:Path
+  val id = DestinationAddress.encode_path(path)
+  override def toString: String =  domain+":"+id
+}
+sealed trait ConnectAddress extends DestinationAddress
+sealed trait BindAddress extends DestinationAddress
+case class SimpleAddress(val domain:String, val path:Path) extends ConnectAddress with BindAddress
+case class SubscriptionAddress(val path:Path, val selector:String, topics:Array[_ <: BindAddress]) extends BindAddress {
+  def domain = "dsub"
+}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -33,17 +154,17 @@ trait Router extends Service {
 
   def get_queue(dto:Long):Option[Queue]
 
-  def bind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, security:SecurityContext): Option[String]
+  def bind(destinations:Array[_ <: BindAddress], consumer:DeliveryConsumer, security:SecurityContext): Option[String]
 
-  def unbind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, persistent:Boolean, security:SecurityContext)
+  def unbind(destinations:Array[_ <: BindAddress], consumer:DeliveryConsumer, persistent:Boolean, security:SecurityContext)
 
-  def connect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer, security:SecurityContext): Option[String]
+  def connect(destinations:Array[_ <: ConnectAddress], producer:BindableDeliveryProducer, security:SecurityContext): Option[String]
 
-  def disconnect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer)
+  def disconnect(destinations:Array[_ <: ConnectAddress], producer:BindableDeliveryProducer)
 
-  def delete(destinations:Array[DestinationDTO], security:SecurityContext): Option[String]
+  def delete(destinations:Array[_ <: DestinationAddress], security:SecurityContext): Option[String]
 
-  def create(destinations:Array[DestinationDTO], security:SecurityContext): Option[String]
+  def create(destinations:Array[_ <: DestinationAddress], security:SecurityContext): Option[String]
 
   def apply_update(on_completed:Runnable):Unit
 



Mime
View raw message