activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1063582 [2/3] - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/acti...
Date Wed, 26 Jan 2011 03:10:37 GMT
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jan 26 03:10:35 2011
@@ -30,7 +30,7 @@ import collection.JavaConversions
 import java.util.concurrent.atomic.AtomicLong
 import org.apache.activemq.apollo.util.OptionSupport._
 import org.apache.activemq.apollo.util.path.{Path, PathParser}
-import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO, BindingDTO, VirtualHostDTO}
+import org.apache.activemq.apollo.dto.{TopicDTO, QueueDTO, DestinationDTO, VirtualHostDTO}
 import security.{AclAuthorizer, JaasAuthenticator, Authenticator, Authorizer}
 import org.apache.activemq.apollo.broker.store.{ZeroCopyBufferAllocator, Store, StoreFactory}
 
@@ -76,7 +76,7 @@ class VirtualHost(val broker: Broker, va
   override val dispatch_queue:DispatchQueue = createQueue("virtual-host") // getGlobalQueue(DispatchPriority.HIGH).createQueue("virtual-host")
 
   var config:VirtualHostDTO = _
-  val router = new Router(this)
+  val router:Router = new LocalRouter(this)
 
   var names:List[String] = Nil;
 
@@ -129,66 +129,44 @@ class VirtualHost(val broker: Broker, va
 
     if( store!=null ) {
       store.configure(config.store, LoggingReporter(VirtualHost))
-      val store_startup_done = tracker.task("store startup")
+      val task = tracker.task("store startup")
       store.start {
-
-        val get_key_done = tracker.task("store get last queue key")
-        store.get_last_queue_key{ key=>
-          key match {
-            case Some(x)=>
-              queue_id_counter.set(key.get)
-            case None =>
-              warn("Could not get last queue key")
+        {
+          val task = tracker.task("store get last queue key")
+          store.get_last_queue_key{ key=>
+            key match {
+              case Some(x)=>
+                queue_id_counter.set(key.get)
+              case None =>
+                warn("Could not get last queue key")
+            }
+            task.run
           }
-          get_key_done.run
-        }
 
-        if( config.purge_on_startup.getOrElse(false) ) {
-          store_startup_done.name = "store purge"
-          store.purge {
-            store_startup_done.run
-          }
-        } else {
-          store_startup_done.name = "store recover queues"
-          store.list_queues { queue_keys =>
-            for( queue_key <- queue_keys) {
-              val task = tracker.task("store load queue key: "+queue_key)
-              // Use a global queue to so we concurrently restore
-              // the queues.
-              globalQueue {
-                store.get_queue(queue_key) { x =>
-                  x match {
-                    case Some(record)=>
-                    dispatch_queue ^{
-                      router.create_queue(record, null)
-                      task.run
-                    }
-                    case _ =>
-                      task.run
-                  }
-                }
-              }
+          if( config.purge_on_startup.getOrElse(false) ) {
+            val task = tracker.task("store purge")
+            store.purge {
+              task.run
             }
-            store_startup_done.run
           }
         }
+        task.run
       }
     }
 
-    tracker.callback(on_completed)
-
-    if(config.regroup_connections.getOrElse(false)) {
-      schedual_connection_regroup
+    tracker.callback {
+      val tracker = new LoggingTracker("virtual host startup", dispatch_queue)
+      tracker.start(router)
+      tracker.callback(on_completed)
     }
+
   }
 
 
   override protected def _stop(on_completed:Runnable):Unit = {
 
     val tracker = new LoggingTracker("virtual host shutdown", dispatch_queue)
-    router.queues.valuesIterator.foreach { queue=>
-      tracker.stop(queue)
-    }
+    tracker.stop(router);
     if( store!=null ) {
       tracker.stop(store);
     }
@@ -196,66 +174,4 @@ class VirtualHost(val broker: Broker, va
   }
 
 
-  // Try to periodically re-balance connections so that consumers/producers
-  // are grouped onto the same thread.
-  def schedual_connection_regroup:Unit = {
-    def connectionRegroup = {
-
-      // this should really be much more fancy.  It should look at the messaging
-      // rates between producers and consumers, look for natural data flow partitions
-      // and then try to equally divide the load over the available processing
-      // threads/cores.
-      router.routing_nodes.foreach { node =>
-
-        // For the topics, just collocate the producers onto the first consumer's
-        // thread.
-        node.broadcast_consumers.headOption.foreach{ consumer =>
-          node.broadcast_producers.foreach { r=>
-            r.producer.collocate(consumer.dispatch_queue)
-          }
-        }
-
-        node.queues.foreach { queue=>
-
-          queue.dispatch_queue {
-
-            // Collocate the queue's with the first consumer
-            // TODO: change this so it collocates with the fastest consumer.
-
-            queue.all_subscriptions.headOption.map( _._1 ).foreach { consumer=>
-              queue.collocate( consumer.dispatch_queue )
-            }
-
-            // Collocate all the producers with the queue..
-
-            queue.inbound_sessions.foreach { session =>
-              session.producer.collocate( queue.dispatch_queue )
-            }
-          }
-
-        }
-      }
-      schedual_connection_regroup
-    }
-    dispatch_queue.dispatchAfter(1, TimeUnit.SECONDS, ^{ if(service_state.is_started) { connectionRegroup } } )
-  }
-
-  def destination_config(name:Path):Option[DestinationDTO] = {
-    import collection.JavaConversions._
-    import DestinationParser.default._
-    import AsciiBuffer._
-    config.destinations.find( x=> parseFilter(ascii(x.name)).matches(name) )
-  }
-
-  def queue_config(binding:Binding):Option[QueueDTO] = {
-    import collection.JavaConversions._
-    config.queues.find{ config=>
-      binding.matches(config)
-    }
-  }
-
-  def queue_config(dto:BindingDTO):Option[QueueDTO] = {
-    queue_config(BindingFactory.create(dto))
-  }
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala Wed Jan 26 03:10:35 2011
@@ -18,8 +18,8 @@ package org.apache.activemq.apollo.broke
 
 import scala.util.continuations._
 import org.apache.activemq.apollo.util.path.Path
+import org.apache.activemq.apollo.broker.{Connector, VirtualHost, Broker}
 import org.apache.activemq.apollo.dto._
-import org.apache.activemq.apollo.broker.{Connector, Destination, VirtualHost, Broker}
 
 /**
  * <p>
@@ -54,7 +54,7 @@ class AclAuthorizer(val default_kinds:Li
     true
   }
 
-  private def can_dest(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO)(func: DestinationAclDTO=>java.util.List[PrincipalDTO]) = {
+  private def can_dest(ctx: SecurityContext, host: VirtualHost, dest: TopicDTO)(func: TopicAclDTO=>java.util.List[PrincipalDTO]) = {
     if( dest.acl!=null ) {
       is_in(ctx, func(dest.acl))
     } else {
@@ -62,16 +62,16 @@ class AclAuthorizer(val default_kinds:Li
     }
   }
 
-  def can_send_to(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO) = {
+  def can_send_to(ctx: SecurityContext, host: VirtualHost, dest: TopicDTO) = {
     can_dest(ctx, host, dest)(_.sends)
   }
-  def can_receive_from(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO) = {
+  def can_receive_from(ctx: SecurityContext, host: VirtualHost, dest: TopicDTO) = {
     can_dest(ctx, host, dest)(_.receives)
   }
-  def can_destroy(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO) = {
+  def can_destroy(ctx: SecurityContext, host: VirtualHost, dest: TopicDTO) = {
     can_dest(ctx, host, dest)(_.destroys)
   }
-  def can_create(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO) = {
+  def can_create(ctx: SecurityContext, host: VirtualHost, dest: TopicDTO) = {
     can_dest(ctx, host, dest)(_.creates)
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala Wed Jan 26 03:10:35 2011
@@ -18,7 +18,7 @@ package org.apache.activemq.apollo.broke
 import scala.util.continuations._
 import org.apache.activemq.apollo.broker._
 import org.apache.activemq.apollo.util.path.Path
-import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO, BindingDTO}
+import org.apache.activemq.apollo.dto.{TopicDTO, QueueDTO, DestinationDTO}
 
 /**
  * <p>
@@ -41,22 +41,22 @@ trait Authorizer {
   /**
    * @returns true if the user is allowed to send to the destination
    */
-  def can_send_to(ctx:SecurityContext, host:VirtualHost, dest:DestinationDTO):Boolean
+  def can_send_to(ctx:SecurityContext, host:VirtualHost, dest:TopicDTO):Boolean
 
   /**
    * @returns true if the user is allowed to receive from the destination
    */
-  def can_receive_from(ctx:SecurityContext, host:VirtualHost, dest:DestinationDTO):Boolean
+  def can_receive_from(ctx:SecurityContext, host:VirtualHost, dest:TopicDTO):Boolean
 
   /**
    * @returns true if the user is allowed to create the destination
    */
-  def can_create(ctx:SecurityContext, host:VirtualHost, dest:DestinationDTO):Boolean
+  def can_create(ctx:SecurityContext, host:VirtualHost, dest:TopicDTO):Boolean
 
   /**
    * @returns true if the user is allowed to destroy the destination
    */
-  def can_destroy(ctx:SecurityContext, host:VirtualHost, dest:DestinationDTO):Boolean
+  def can_destroy(ctx:SecurityContext, host:VirtualHost, dest:TopicDTO):Boolean
 
 
   /**

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml Wed Jan 26 03:10:35 2011
@@ -20,11 +20,11 @@
   <virtual_host id="default">
     <host_name>test</host_name>
 
-    <destination name="unified.*" unified="true"/>
-    <destination name="notunified.*" unified="false"/>
+    <topic name="unified.*"/>
+    <topic name="notunified.*"/>
 
-    <queue name="unified.a" kind="ptp" queue_buffer="333"/>
-    <queue name="unified.*" kind="ds" queue_buffer="444"/>
+    <queue name="unified.a" kind="ptp" queue_buffer="333" unified="true"/>
+    <queue name="unified.*" kind="ds" queue_buffer="444" unified="true"/>
     <queue queue_buffer="111"/>
 
   </virtual_host>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala Wed Jan 26 03:10:35 2011
@@ -43,49 +43,35 @@ class DestinationConfigurationTest exten
 
     // Let make sure we are reading in the expected config..
     expect(2) {
-      host.destinations.size
+      host.topics.size
     }
     expect(3) {
       host.queues.size
     }
 
-    val router = broker.default_virtual_host.router
+    val router = broker.default_virtual_host.router.asInstanceOf[LocalRouter]
 
-    def check_tune_queue_buffer(expected:Int)(dto:BindingDTO) = {
+    def check_tune_queue_buffer(expected:Int)(dto:DestinationDTO) = {
       var actual=0
       reset {
-        var q = router.get_or_create_queue(dto, null).success
+        var q = router.get_or_create_destination(dto, null).success.asInstanceOf[Queue]
         actual = q.tune_queue_buffer
       }
       expect(expected) {actual}
     }
 
     check_tune_queue_buffer(333) {
-      var p = new QueueBindingDTO()
+      var p = new QueueDestinationDTO()
       p.name = "unified.a"
       p
     }
-    check_tune_queue_buffer(444) {
-      val p = new SubscriptionBindingDTO()
-      p.name = "unified.b"
-      p.client_id = "a"
-      p.subscription_id = "b"
-      p
-    }
 
     check_tune_queue_buffer(111) {
-      var p = new QueueBindingDTO()
+      var p = new QueueDestinationDTO()
       p.name = "notunified.other"
       p
     }
 
-    def dest(v:String) = DestinationParser.decode_path(v)
-    expect(true) {
-      router.destinations.chooseValue(dest("unified.a")).unified
-    }
-    expect(false) {
-      router.destinations.chooseValue(dest("notunified.other")).unified
-    }
     ServiceControl.stop(broker, "broker")
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Wed Jan 26 03:10:35 2011
@@ -25,9 +25,9 @@ import java.io.File
 import org.apache.activemq.apollo.util.metric.{Period, MetricAggregator}
 import org.fusesource.hawtbuf.AsciiBuffer
 import java.net.URL
-import org.apache.activemq.apollo.dto.BrokerDTO
 import org.apache.activemq.apollo.util._
 import collection.mutable.{ArrayBuffer, ListBuffer}
+import org.apache.activemq.apollo.dto.{DestinationDTO, BrokerDTO}
 
 /**
  *
@@ -210,13 +210,13 @@ abstract class BrokerPerfSupport extends
 
   val parser = new DestinationParser
 
-  def createDestinations(destCount: Int): Array[Destination] = {
-    var dests = new Array[Destination](destCount)
+  def createDestinations(destCount: Int): Array[DestinationDTO] = {
+    var dests = new Array[DestinationDTO](destCount)
 
     for (i <- 0 until destCount) {
-      val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
+      val domain = if (PTP) {LocalRouter.QUEUE_DOMAIN} else {LocalRouter.TOPIC_DOMAIN}
       val name = new AsciiBuffer("dest" + (i + 1))
-      var bean = new SingleDestination(domain, parser.parsePath(name))
+      var bean = DestinationParser.create_destination(domain, name.toString)(0)
       dests(i) = bean
       //        if (PTP) {
       //          sendBroker.defaultVirtualHost.createQueue(dests(i))
@@ -248,7 +248,7 @@ abstract class BrokerPerfSupport extends
     initBrokers
     startBrokers
 
-    val dests: Array[Destination] = createDestinations(destCount)
+    val dests: Array[DestinationDTO] = createDestinations(destCount)
 
     for (i <- 0 until producerCount) {
       var destination = dests(i % destCount)
@@ -273,7 +273,7 @@ abstract class BrokerPerfSupport extends
     // }
   }
 
-  def _createConsumer(i: Int, destination: Destination): RemoteConsumer = {
+  def _createConsumer(i: Int, destination: DestinationDTO): RemoteConsumer = {
 
     var consumer = createConsumer()
     consumer.stopping = stopping
@@ -291,7 +291,7 @@ abstract class BrokerPerfSupport extends
     broker.config.connectors.get(0).advertise
   }
 
-  def _createProducer(id: Int, messageSize: Int, destination: Destination): RemoteProducer = {
+  def _createProducer(id: Int, messageSize: Int, destination: DestinationDTO): RemoteProducer = {
     var producer = createProducer()
     producer.stopping = stopping
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala Wed Jan 26 03:10:35 2011
@@ -17,11 +17,11 @@
 
 package org.apache.activemq.apollo.broker.perf
 
-import org.apache.activemq.apollo.broker.Destination
 import org.apache.activemq.apollo.util.metric.MetricAggregator
 import org.apache.activemq.apollo.util.{ServiceControl, FileSupport}
 import FileSupport._
 import java.io.File
+import org.apache.activemq.apollo.dto.DestinationDTO
 
 trait LargeInitialDB extends PersistentScenario {
   PURGE_STORE = false
@@ -62,7 +62,7 @@ trait LargeInitialDB extends PersistentS
     ServiceControl.start(sendBroker, "initial db broker startup")
 
     PTP = true
-    val dests: Array[Destination] = createDestinations(1)
+    val dests: Array[DestinationDTO] = createDestinations(1)
 
     totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala Wed Jan 26 03:10:35 2011
@@ -18,12 +18,13 @@
 package org.apache.activemq.apollo.broker.perf
 
 import org.apache.activemq.apollo.util.metric._
-import org.apache.activemq.apollo.broker.{Destination, Delivery, Connection}
+import org.apache.activemq.apollo.broker.{Delivery, Connection}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import java.util.concurrent.TimeUnit
 import org.fusesource.hawtdispatch._
 import java.io.IOException
 import org.apache.activemq.apollo.transport.TransportFactory
+import org.apache.activemq.apollo.dto.DestinationDTO
 
 abstract class RemoteConnection extends Connection {
 
@@ -36,7 +37,7 @@ abstract class RemoteConnection extends 
   var rateAggregator: MetricAggregator = null
 
   var stopping: AtomicBoolean = null
-  var destination: Destination = null
+  var destination: DestinationDTO = null
 
   def init = {
     if (rate.getName == null) {

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java Wed Jan 26 03:10:35 2011
@@ -16,37 +16,28 @@
  */
 package org.apache.activemq.apollo.dto;
 
-
+import org.codehaus.jackson.annotate.JsonTypeInfo;
 
 import javax.xml.bind.annotation.*;
-import java.util.ArrayList;
 
 /**
+ * <p>
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name = "destination")
+@XmlType(name = "queue_binding")
+@XmlSeeAlso({QueueDestinationDTO.class, DurableSubscriptionDestinationDTO.class})
+@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class DestinationDTO extends StringIdDTO {
-
-    /**
-     * The name of the destination.  You can use wild cards.
-     */
-	@XmlAttribute
-	public String name;
+abstract public class DestinationDTO {
 
-    /**
-     * If set to true, then routing then there is no difference between
-     * sending to a queue or topic of the same name.  The first time
-     * a queue subscriptions is created, it will act like if a durable
-     * subscription was created on the topic. 
-     */
     @XmlAttribute
-    public Boolean unified;
-
-    @XmlElement(name="slow_consumer_policy")
-    public String slow_consumer_policy;
-
-    @XmlElement(name="acl")
-    public DestinationAclDTO acl;
+    public String name;
 
-}
+    public DestinationDTO() {
+    }
+    public DestinationDTO(String name) {
+        this.name = name;
+    }
+}
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java (from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java&r1=1062213&r2=1063582&rev=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java Wed Jan 26 03:10:35 2011
@@ -16,28 +16,27 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlRootElement;
+
+
+import javax.xml.bind.annotation.*;
 
 /**
- * <p>
- * </p>
- *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name = "subscription_binding")
+@XmlRootElement(name = "durable_subscription")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class SubscriptionBindingDTO extends BindingDTO {
-
-    public String name;
-
-    public String filter;
+public class DurableSubscriptionDTO extends QueueDTO {
 
+    /**
+     * To narrow down matches to a client id
+     */
     @XmlAttribute(name="client_id")
     public String client_id;
 
+    /**
+     * To narrow down matches to a subscription id
+     */
     @XmlAttribute(name="subscription_id")
     public String subscription_id;
-}
\ No newline at end of file
+
+}

Added: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java?rev=1063582&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java Wed Jan 26 03:10:35 2011
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.dto;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name = "durable_subscription_destination")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class DurableSubscriptionDestinationDTO extends DestinationDTO {
+
+    @XmlAttribute
+    public String filter;
+
+    @XmlAttribute(name="client_id")
+    public String client_id;
+
+    @XmlAttribute(name="subscription_id")
+    public String subscription_id;
+
+    public DurableSubscriptionDestinationDTO() {
+    }
+
+    public DurableSubscriptionDestinationDTO(String name) {
+        super(name);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        DurableSubscriptionDestinationDTO that = (DurableSubscriptionDestinationDTO) o;
+
+        if (client_id != null ? !client_id.equals(that.client_id) : that.client_id != null) return false;
+        if (filter != null ? !filter.equals(that.filter) : that.filter != null) return false;
+        if (name != null ? !name.equals(that.name) : that.name != null) return false;
+        if (subscription_id != null ? !subscription_id.equals(that.subscription_id) : that.subscription_id != null)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = name != null ? name.hashCode() : 0;
+        result = 31 * result + (filter != null ? filter.hashCode() : 0);
+        result = 31 * result + (client_id != null ? client_id.hashCode() : 0);
+        result = 31 * result + (subscription_id != null ? subscription_id.hashCode() : 0);
+        return result;
+    }
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueAclDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueAclDTO.java?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueAclDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueAclDTO.java Wed Jan 26 03:10:35 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.apollo.dto;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
 import java.util.*;
 
 /**
@@ -29,7 +28,7 @@ import java.util.*;
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 @XmlAccessorType(XmlAccessType.FIELD)
-public class QueueAclDTO extends DestinationAclDTO {
+public class QueueAclDTO extends TopicAclDTO {
 
     @XmlElement(name="consume")
     public List<PrincipalDTO> consumes = new ArrayList<PrincipalDTO>();

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Wed Jan 26 03:10:35 2011
@@ -33,27 +33,14 @@ public class QueueDTO {
     @XmlAttribute
     public String name;
 
-    /*
-     * The kind of queue.  I
-     * If not set, then this configuration applies to all queue types.
-     */
-    @XmlAttribute
-    public String kind;
-
-    /**
-     * If the kind is "ds" then you can specify which client
-     * id this configuration should match.
-     */
-    @XmlAttribute(name="client_id")
-    public String client_id;
-
     /**
-     * If the kind is "ds" then you can specify which subscription
-     * id this configuration should match.
+     * If set to true, then routing then there is no difference between
+     * sending to a queue or topic of the same name.  The first time
+     * a queue is created, it will act like if a durable
+     * subscription was created on the topic.
      */
-    @XmlAttribute(name="subscription_id")
-    public String subscription_id;
-
+    @XmlAttribute
+    public Boolean unified;
 
     /**
      *  The amount of memory buffer space for receiving messages.

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java (from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java&r1=1062213&r2=1063582&rev=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java Wed Jan 26 03:10:35 2011
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
 import javax.xml.bind.annotation.*;
 
 /**
@@ -26,9 +24,31 @@ import javax.xml.bind.annotation.*;
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlType(name = "binding")
-@XmlSeeAlso({QueueBindingDTO.class, SubscriptionBindingDTO.class})
-@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
+@XmlRootElement(name = "p2p_queue_binding")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class BindingDTO {
+public class QueueDestinationDTO extends DestinationDTO {
+
+    public QueueDestinationDTO() {
+    }
+
+    public QueueDestinationDTO(String name) {
+        super(name);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        QueueDestinationDTO that = (QueueDestinationDTO) o;
+
+        if (name != null ? !name.equals(that.name) : that.name != null) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return name != null ? name.hashCode() : 0;
+    }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Wed Jan 26 03:10:35 2011
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonProperty;
-
 import javax.xml.bind.annotation.*;
 import java.util.ArrayList;
 import java.util.List;
@@ -36,7 +34,7 @@ public class QueueStatusDTO extends Long
     public QueueDTO config;
 
     @XmlElement
-    public BindingDTO binding;
+    public DestinationDTO destination;
 
     @XmlElement
     public QueueMetricsDTO metrics = new QueueMetricsDTO();

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicAclDTO.java (from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicAclDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicAclDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java&r1=1062213&r2=1063582&rev=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicAclDTO.java Wed Jan 26 03:10:35 2011
@@ -28,7 +28,7 @@ import java.util.*;
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 @XmlAccessorType(XmlAccessType.FIELD)
-public class DestinationAclDTO {
+public class TopicAclDTO {
 
     @XmlElement(name="create")
     public List<PrincipalDTO> creates = new ArrayList<PrincipalDTO>();

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java (from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java&r1=1062213&r2=1063582&rev=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java Wed Jan 26 03:10:35 2011
@@ -16,24 +16,27 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonTypeInfo;
+
 
 import javax.xml.bind.annotation.*;
 
 /**
- * <p>
- * </p>
- *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name = "queue_binding")
+@XmlRootElement(name = "topic")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class QueueBindingDTO extends BindingDTO {
+public class TopicDTO {
 
     /**
-     * A label that describes the binding
+     * The name of the destination.  You can use wild cards.
      */
-    @XmlAttribute
-    public String name;
+	@XmlAttribute
+	public String name;
+
+    @XmlElement(name="slow_consumer_policy")
+    public String slow_consumer_policy;
+
+    @XmlElement(name="acl")
+    public TopicAclDTO acl;
 
-}
\ No newline at end of file
+}

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java (from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java&r1=1062213&r2=1063582&rev=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java Wed Jan 26 03:10:35 2011
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.apollo.dto;
 
-
-
 import javax.xml.bind.annotation.*;
-import java.util.*;
 
 /**
  * <p>
@@ -27,19 +24,33 @@ import java.util.*;
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
+@XmlRootElement(name = "queue_destination")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class DestinationAclDTO {
+public class TopicDestinationDTO extends DestinationDTO {
+
+    public TopicDestinationDTO() {
+    }
+
+    public TopicDestinationDTO(String name) {
+        super(name);
+    }
+
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
 
-    @XmlElement(name="create")
-    public List<PrincipalDTO> creates = new ArrayList<PrincipalDTO>();
+        TopicDestinationDTO that = (TopicDestinationDTO) o;
 
-    @XmlElement(name="destroy")
-    public List<PrincipalDTO> destroys = new ArrayList<PrincipalDTO>();
+        if (name != null ? !name.equals(that.name) : that.name != null) return false;
 
-    @XmlElement(name="send")
-    public List<PrincipalDTO> sends = new ArrayList<PrincipalDTO>();
+        return true;
+    }
 
-    @XmlElement(name="receive")
-    public List<PrincipalDTO> receives = new ArrayList<PrincipalDTO>();
+    @Override
+    public int hashCode() {
+        return name != null ? name.hashCode() : 0;
+    }
 
-}
+}
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java (from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java&r1=1062213&r2=1063582&rev=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java Wed Jan 26 03:10:35 2011
@@ -26,9 +26,9 @@ import java.util.List;
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="destination_status")
+@XmlRootElement(name="topic_status")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class DestinationStatusDTO extends LongIdDTO {
+public class TopicStatusDTO extends LongIdDTO {
 
     /**
      * The destination name
@@ -37,7 +37,7 @@ public class DestinationStatusDTO extend
     public String name;
 
     @XmlElement
-    public DestinationDTO config;
+    public TopicDTO config;
 
     /**
      * Ids of all connections that are producing to the destination
@@ -54,6 +54,6 @@ public class DestinationStatusDTO extend
     /**
      * Ids of all queues that are associated with the destination
      */
-    @XmlElement(name="queue")
-    public List<LongIdLabeledDTO> queues = new ArrayList<LongIdLabeledDTO>();
+    @XmlElement(name="durable_subscription")
+    public List<LongIdLabeledDTO> durable_subscriptions = new ArrayList<LongIdLabeledDTO>();
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Wed Jan 26 03:10:35 2011
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonProperty;
-
 import java.util.ArrayList;
 
 import javax.xml.bind.annotation.*;
@@ -36,11 +34,11 @@ public class VirtualHostDTO extends Serv
     public StoreDTO store;
 
     /**
-     * Should queues be auto created when they are first accessed
+     * Should destinations be auto created when they are first accessed
      * by clients?
      */
-    @XmlAttribute(name="auto_create_queues")
-    public Boolean auto_create_queues;
+    @XmlAttribute(name="auto_create_destinations")
+    public Boolean auto_create_destinations;
 
     /**
      * Should queues be purged on startup?
@@ -51,8 +49,8 @@ public class VirtualHostDTO extends Serv
     /**
      * Holds the configuration for the destinations.
      */
-    @XmlElement(name="destination")
-    public ArrayList<DestinationDTO> destinations = new ArrayList<DestinationDTO>();
+    @XmlElement(name="topic")
+    public ArrayList<TopicDTO> topics = new ArrayList<TopicDTO>();
 
     /**
      * Holds the configuration for the queues.
@@ -61,6 +59,12 @@ public class VirtualHostDTO extends Serv
     public ArrayList<QueueDTO> queues = new ArrayList<QueueDTO>();
 
     /**
+     * Holds the configuration for the queues.
+     */
+    @XmlElement(name="durable_subscription")
+    public ArrayList<DurableSubscriptionDTO> durable_subscriptions = new ArrayList<DurableSubscriptionDTO>();
+
+    /**
      * Should connections get regroups so they get serviced by the same thread?
      */
     @XmlAttribute(name="regroup_connections")

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java Wed Jan 26 03:10:35 2011
@@ -39,10 +39,16 @@ public class VirtualHostStatusDTO extend
     public StoreStatusDTO store;
 
     /**
-     * Ids of all the destinations running on the broker
+     * Ids of all the topics running on the broker
      */
-    @XmlElement(name="destination")
-    public List<LongIdLabeledDTO> destinations = new ArrayList<LongIdLabeledDTO>();
+    @XmlElement(name="topics")
+    public List<LongIdLabeledDTO> topics = new ArrayList<LongIdLabeledDTO>();
+
+    /**
+     * Ids of all the queues running on the broker
+     */
+    @XmlElement(name="queues")
+    public List<LongIdLabeledDTO> queues = new ArrayList<LongIdLabeledDTO>();
 
 
     /**

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index Wed Jan 26 03:10:35 2011
@@ -20,7 +20,7 @@ BrokerSummaryDTO
 ConnectionStatusDTO
 ConnectorDTO
 ConnectorStatusDTO
-DestinationStatusDTO
+TopicStatusDTO
 EntryStatusDTO
 IntMetricDTO
 LongIdDTO
@@ -42,7 +42,7 @@ KeyStorageDTO
 SimpleStoreStatusDTO
 NullStoreDTO
 QueueDTO
-DestinationDTO
+TopicDTO
 LinkDTO
 QueueConsumerStatusDTO
 StompDTO

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java Wed Jan 26 03:10:35 2011
@@ -51,6 +51,10 @@ public class XmlCodecTest {
         assertEquals("localhost", host.host_names.get(0));
         assertEquals("example.com", host.host_names.get(1));
 
+        assertEquals("queue1", host.queues.get(0).name);
+        assertEquals("topic1", host.topics.get(0).name);
+        assertEquals("durable_subscription1", host.durable_subscriptions.get(0).name);
+
         assertNotNull(dto.acl);
         assertTrue(dto.acl.admins.contains(new PrincipalDTO("hiram")));
         assertTrue(dto.acl.admins.contains(new PrincipalDTO("james")));

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml Wed Jan 26 03:10:35 2011
@@ -27,6 +27,11 @@
     <acl/>
     <host_name>localhost</host_name>
     <host_name>example.com</host_name>
+
+    <queue name="queue1"/>
+    <topic name="topic1"/>
+    <durable_subscription name="durable_subscription1"/>
+
   </virtual_host>
   <connector bind="tcp://0.0.0.0:61616" enabled="true" id="port-61616">
     <stomp add_user_header="JMSXUserID"/>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jan 26 03:10:35 2011
@@ -22,6 +22,7 @@ import java.lang.{String, Class}
 import org.apache.activemq.apollo.broker._
 import java.io.OutputStream
 import org.apache.activemq.apollo.broker.store.ZeroCopyBuffer
+import org.apache.activemq.apollo.dto.DestinationDTO
 
 /**
  *
@@ -67,7 +68,7 @@ case class StompFrameMessage(frame:Stomp
   /**
    * where the message was sent to.
    */
-  var destination: Destination = null
+  var destination: Array[DestinationDTO] = null
 
   for( header <- (frame.updated_headers ::: frame.headers).reverse ) {
     header match {
@@ -308,9 +309,9 @@ object Stomp {
   destination_parser.any_child_wildcard = ascii("*")
   destination_parser.any_descendant_wildcard = ascii("**")
 
-  destination_parser.default_domain = Router.QUEUE_DOMAIN
+  destination_parser.default_domain = LocalRouter.QUEUE_DOMAIN
 
-  implicit def toDestination(value:AsciiBuffer):Destination = {
+  implicit def toDestinationDTO(value:AsciiBuffer):Array[DestinationDTO] = {
     val d = destination_parser.parse(value)
     if( d==null ) {
       throw new ProtocolException("Invalid stomp destiantion name: "+value);

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Jan 26 03:10:35 2011
@@ -200,14 +200,36 @@ class StompProtocolHandler extends Proto
   class StompConsumer(
 
     val subscription_id:Option[AsciiBuffer],
-    val destination:Destination,
+    val destination:Array[DestinationDTO],
     val ack_handler:AckHandler,
     val selector:(String, BooleanExpression),
-    val binding:BindingDTO,
     override val browser:Boolean,
     override val exclusive:Boolean
   ) extends BaseRetained with DeliveryConsumer {
 
+////  The following comes in handy if we need to debug the
+////  reference counts of the consumers.
+//
+//    val r = new BaseRetained
+//
+//    def setDisposer(p1: Runnable): Unit = r.setDisposer(p1)
+//    def retained: Int =r.retained
+//
+//    def printST(name:String) = {
+//      val e = new Exception
+//      println(name+": ")
+//      println("  "+e.getStackTrace.drop(1).take(4).mkString("\n  "))
+//    }
+//
+//    def retain: Unit = {
+//      printST("retain")
+//      r.retain
+//    }
+//    def release: Unit = {
+//      printST("release")
+//      r.release
+//    }
+
     val dispatch_queue = StompProtocolHandler.this.dispatchQueue
 
     override def connection = Some(StompProtocolHandler.this.connection)
@@ -302,9 +324,9 @@ class StompProtocolHandler extends Proto
   var closed = false
   var consumers = Map[AsciiBuffer, StompConsumer]()
 
-  var producerRoutes = new LRUCache[Destination, DeliveryProducerRoute](10) {
-    override def onCacheEviction(eldest: Entry[Destination, DeliveryProducerRoute]) = {
-      host.router.disconnect(eldest.getValue)
+  var producerRoutes = new LRUCache[List[DestinationDTO], DeliveryProducerRoute](10) {
+    override def onCacheEviction(eldest: Entry[List[DestinationDTO], DeliveryProducerRoute]) = {
+      host.router.disconnect(eldest.getKey.toArray, eldest.getValue)
     }
   }
 
@@ -396,19 +418,12 @@ class StompProtocolHandler extends Proto
 
       import collection.JavaConversions._
       producerRoutes.foreach{
-        case(_,route)=> host.router.disconnect(route)
+        case(dests,route)=> host.router.disconnect(dests.toArray, route)
       }
       producerRoutes.clear
       consumers.foreach {
         case (_,consumer)=>
-          if( consumer.binding==null ) {
-            host.router.unbind(consumer.destination, consumer)
-          } else {
-            reset {
-              val queue = host.router.get_queue(consumer.binding)
-              queue.foreach( _.unbind(consumer::Nil) )
-            }
-          }
+          host.router.unbind(consumer.destination, consumer)
       }
       consumers = Map()
       trace("stomp protocol resources released")
@@ -653,32 +668,35 @@ class StompProtocolHandler extends Proto
 
   def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
 
-    val destiantion: Destination = get(frame.headers, DESTINATION).get
-    producerRoutes.get(destiantion) match {
+    val destiantion: Array[DestinationDTO] = get(frame.headers, DESTINATION).get
+    val key = destiantion.toList
+    producerRoutes.get(key) match {
       case null =>
         // create the producer route...
 
-        val producer = new DeliveryProducer() {
+        val route = new DeliveryProducerRoute(host.router) {
           override def connection = Some(StompProtocolHandler.this.connection)
-
           override def dispatch_queue = queue
+
+          refiller = ^{
+            resumeRead
+          }
         }
 
         // don't process frames until producer is connected...
         connection.transport.suspendRead
-        host.router.connect(destiantion, producer, security_context) {
-          case Failure(reason) =>
-            async_die(reason)
-
-          case Success(route) =>
+        reset {
+          val rc = host.router.connect(destiantion, route, security_context)
+          if( rc.failed ) {
+            async_die(rc.failure)
+          } else {
             if (!connection.stopped) {
               resumeRead
-              route.refiller = ^ {
-                resumeRead
-              }
-              producerRoutes.put(destiantion, route)
+              producerRoutes.put(key, route)
               send_via_route(route, frame, uow)
             }
+          }
+
         }
 
       case route =>
@@ -745,7 +763,7 @@ class StompProtocolHandler extends Proto
       if( route.full ) {
         // but once it gets full.. suspend, so that we get more stomp messages
         // until it's not full anymore.
-        suspendRead("blocked destination: "+route.destination)
+        suspendRead("blocked sending to: "+route.overflowSessions.mkString(", "))
       }
 
     } else {
@@ -759,7 +777,7 @@ class StompProtocolHandler extends Proto
 
   def on_stomp_subscribe(headers:HeaderMap):Unit = {
     val dest = get(headers, DESTINATION).getOrElse(die("destination not set."))
-    val destination:Destination = dest
+    var destination:Array[DestinationDTO] = dest
 
     val subscription_id = get(headers, ID)
     var id:AsciiBuffer = subscription_id.getOrElse {
@@ -773,7 +791,7 @@ class StompProtocolHandler extends Proto
 
     }
 
-    val topic = destination.domain == Router.TOPIC_DOMAIN
+//    val topic = destination.isInstanceOf[TopicDestinationDTO]
     var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
     var browser = get(headers, BROWSER).map( _ == TRUE ).getOrElse(false)
     var exclusive = get(headers, EXCLUSIVE).map( _ == TRUE ).getOrElse(false)
@@ -807,66 +825,52 @@ class StompProtocolHandler extends Proto
       die("A subscription with identified with '"+id+"' allready exists")
     }
 
-    val binding: BindingDTO = if( topic && !persistent ) {
-      null
-    } else {
-      // Controls how the created queue gets bound
-      // to the destination name space (this is used to
-      // recover the queue on restart and rebind it the
-      // way again)
-      if (topic) {
-        val rc = new SubscriptionBindingDTO
-        rc.name = DestinationParser.encode_path(destination.name)
-        // TODO:
-        // rc.client_id =
-        rc.subscription_id = if( persistent ) decode_header(id) else null
-        rc.filter = if (selector == null) null else selector._1
-        rc
-      } else {
-        val rc = new QueueBindingDTO
-        rc.name = DestinationParser.encode_path(destination.name)
-        rc
+    if( persistent ) {
+      destination = destination.map { _ match {
+        case x:TopicDestinationDTO=>
+          val rc = new DurableSubscriptionDestinationDTO(x.name)
+          rc.subscription_id = decode_header(id)
+          rc.filter = if (selector == null) null else selector._1
+          rc
+        case _ => die("A persistent subscription can only be used on a topic destination")
+        }
       }
     }
 
-    val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding, browser, exclusive);
+    val consumer = new StompConsumer(subscription_id, destination, ack, selector, browser, exclusive);
     consumers += (id -> consumer)
 
-    if( binding==null ) {
-
-      // consumer is bind bound as a topic
-      reset {
-        val rc = host.router.bind(destination, consumer, security_context)
-        consumer.release
-        rc match {
-          case Failure(reason)=>
-            async_die(reason)
-          case _=>
-            send_receipt(headers)
-        }
-      }
-
-    } else {
-      reset {
-        // create a queue and bind the consumer to it.
-        val x= host.router.get_or_create_queue(binding, security_context)
-        x match {
-          case Success(queue) =>
-            val rc = queue.bind(consumer, security_context)
-            consumer.release
-            rc match {
-              case Failure(reason)=>
-                consumers -= id
-                async_die(reason)
-              case _ =>
-                send_receipt(headers)
-            }
-          case Failure(reason) =>
-            consumers -= id
-            async_die(reason)
-        }
+    reset {
+      val rc = host.router.bind(destination, consumer, security_context)
+      consumer.release
+      rc match {
+        case Failure(reason)=>
+          consumers -= id
+          async_die(reason)
+        case _=>
+          send_receipt(headers)
       }
     }
+
+//      reset {
+//        // create a queue and bind the consumer to it.
+//        val x= host.router.get_or_create_queue(binding, security_context)
+//        x match {
+//          case Success(queue) =>
+//            val rc = queue.bind(consumer, security_context)
+//            consumer.release
+//            rc match {
+//              case Failure(reason)=>
+//                consumers -= id
+//                async_die(reason)
+//              case _ =>
+//                send_receipt(headers)
+//            }
+//          case Failure(reason) =>
+//            consumers -= id
+//            async_die(reason)
+//        }
+//      }
   }
 
   def on_stomp_unsubscribe(headers:HeaderMap):Unit = {
@@ -894,25 +898,8 @@ class StompProtocolHandler extends Proto
 
         // consumer gets disposed after all producer stop sending to it...
         consumer.setDisposer(^{ send_receipt(headers) })
-
         consumers -= id
-        if( consumer.binding==null ) {
-          host.router.unbind(consumer.destination, consumer)
-        } else {
-
-          reset {
-            val queue = host.router.get_queue(consumer.binding)
-            queue.foreach( _.unbind(consumer::Nil) )
-          }
-
-          if( persistent && consumer.binding!=null ) {
-            reset {
-              host.router.destroy_queue(consumer.binding, security_context).failure_option.foreach{ reason=>
-                async_die(reason)
-              }
-            }
-          }
-        }
+        host.router.unbind(consumer.destination, consumer, persistent)
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Wed Jan 26 03:10:35 2011
@@ -16,12 +16,15 @@
     limitations under the License.
 -->
 <broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
-    <notes>This broker configuration is what the unit tests in this module load up.</notes>
+  <notes>This broker configuration is what the unit tests in this module load up.</notes>
 
-    <virtual_host id="default" purge_on_startup="true" auto_create_queues="true">
-        <host_name>localhost</host_name>
-    </virtual_host>
+  <virtual_host id="default" purge_on_startup="true" auto_create_queues="true">
+    <host_name>localhost</host_name>
 
-    <connector id="tcp" protocol="stomp" bind="tcp://0.0.0.0:0"/>
+    <queue name="unified.**" unified="true"/>
+
+  </virtual_host>
+
+  <connector id="tcp" protocol="stomp" bind="tcp://0.0.0.0:0"/>
 
 </broker>
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Wed Jan 26 03:10:35 2011
@@ -633,6 +633,104 @@ class StompDestinationTest extends Stomp
 
 }
 
+class StompUnifiedQueueTest extends StompTestSupport {
+
+  test("Topic gets copy of message sent to queue") {
+    connect("1.1")
+
+    // Connect to subscribers
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/topic/unified.a\n" +
+      "id:1\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/unified.a\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+
+    put(1)
+
+    def get(id:Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+    }
+    get(1)
+  }
+
+  test("Queue gets copy of message sent to topic") {
+    connect("1.1")
+
+    // Connect to subscribers
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/unified.b\n" +
+      "id:1\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/topic/unified.b\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+
+    put(1)
+
+    def get(id:Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+    }
+    get(1)
+
+  }
+
+  test("Queue does not get copies from topic until it's first created") {
+    connect("1.1")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/topic/unified.c\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+
+    put(1)
+
+    // Connect to subscribers
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/unified.c\n" +
+      "id:1\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
+
+    put(2)
+
+    def get(id:Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+    }
+    get(2)
+  }
+
+
+}
+
 class StompSslDestinationTest extends StompDestinationTest {
   override val broker_config_uri: String = "xml:classpath:apollo-stomp-ssl.xml"
 
@@ -982,7 +1080,7 @@ class StompSecurityTest extends StompTes
 
     val frame = client.receive()
     frame should startWith("ERROR\n")
-    frame should include("message:Not authorized to send to the queue\n")
+    frame should include("message:Not authorized to create the queue\n")
   }
 
   test("Send authorized but not create") {
@@ -1003,24 +1101,22 @@ class StompSecurityTest extends StompTes
 
   }
 
-//
-//  test("Consume authorized but not create") {
-//    connect("1.1", client,
-//      "login:can_consume_queue\n" +
-//      "passcode:can_consume_queue\n")
-//
-//    client.write(
-//      "SUBSCRIBE\n" +
-//      "destination:/queue/secure\n" +
-//      "id:0\n" +
-//      "receipt:0\n" +
-//      "\n")
-//    wait_for_receipt("0")
-//
-//    val frame = client.receive()
-//    frame should startWith("ERROR\n")
-//    frame should include("message:Not authorized to create the queue\n")
-//  }
+  test("Consume authorized but not create") {
+    connect("1.1", client,
+      "login:can_consume_queue\n" +
+      "passcode:can_consume_queue\n")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/secure\n" +
+      "id:0\n" +
+      "receipt:0\n" +
+      "\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to create the queue\n")
+  }
 
   test("Send and create authorized") {
     connect("1.1", client,
@@ -1070,7 +1166,7 @@ class StompSecurityTest extends StompTes
 
     val frame = client.receive()
     frame should startWith("ERROR\n")
-    frame should include("message:Not authorized to consume from the queue\n")
+    frame should include("message:Not authorized to reveive from the destination.\n")
   }
 
 //  test("Consume authorized and JMSXUserID is set on message") {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala Wed Jan 26 03:10:35 2011
@@ -29,7 +29,7 @@ import AsciiBuffer._
 import Stomp._
 import _root_.org.apache.activemq.apollo.stomp.StompFrame
 import org.fusesource.hawtdispatch._
-
+import org.apache.activemq.apollo.dto.{TopicDestinationDTO, QueueDestinationDTO}
 
 class StompRemoteConsumer extends RemoteConsumer {
   var outboundSink: OverflowSink[StompFrame] = null
@@ -38,10 +38,9 @@ class StompRemoteConsumer extends Remote
     outboundSink = new OverflowSink[StompFrame](MapSink(transport_sink) {x => x})
     outboundSink.refiller = ^ {}
 
-    val stompDestination = if (destination.domain == Router.QUEUE_DOMAIN) {
-      ascii("/queue/" + destination.name.toString());
-    } else {
-      ascii("/topic/" + destination.name.toString());
+    val stompDestination = destination match {
+      case x:QueueDestinationDTO => ascii("/queue/" + x.name);
+      case x:TopicDestinationDTO => ascii("/topic/" + x.name);
     }
 
     var frame = StompFrame(CONNECT);
@@ -149,11 +148,11 @@ class StompRemoteProducer extends Remote
     outboundSink = new OverflowSink[StompFrame](MapSink(transport_sink) {x => x})
     outboundSink.refiller = ^ {drain}
 
-    if (destination.domain == Router.QUEUE_DOMAIN) {
-      stompDestination = ascii("/queue/" + destination.name.toString());
-    } else {
-      stompDestination = ascii("/topic/" + destination.name.toString());
+    stompDestination = destination match {
+      case x:QueueDestinationDTO => ascii("/queue/" + x.name);
+      case x:TopicDestinationDTO => ascii("/topic/" + x.name);
     }
+
     outboundSink.offer(StompFrame(CONNECT));
     send_next
   }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala Wed Jan 26 03:10:35 2011
@@ -58,7 +58,7 @@ trait BaseService extends Service {
   protected class STOPPING extends State with CallbackSupport { override def is_stopping = true  }
   protected class STOPPED extends State { override def is_stopped = true  }
 
-  protected val dispatch_queue:DispatchQueue
+  protected def dispatch_queue:DispatchQueue
 
   final def start() = start(null)
   final def stop() = stop(null)

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java Wed Jan 26 03:10:35 2011
@@ -61,9 +61,8 @@ public class PathMap<Value> {
     /**
      * Removes the value from the associated path
      */
-    public void remove(Path path, Value value) {
-        root.remove(path, 0, value);
-
+    public boolean remove(Path path, Value value) {
+        return root.remove(path, 0, value);
     }
 
     public PathMapNode<Value> getRootNode() {

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java Wed Jan 26 03:10:35 2011
@@ -134,10 +134,11 @@ public class PathMapNode<Value> implemen
         }
     }
 
-    public void remove(Path path, int idx, Value value) {
+    public boolean remove(Path path, int idx, Value value) {
         if (idx >= path.parts.size()) {
-            values.remove(value);
+            boolean rc = values.remove(value);
             pruneIfEmpty();
+            return rc;
         } else {
             // if (idx == paths.size() - 1) {
             // getAnyChildNode().getValues().remove(value);
@@ -145,7 +146,7 @@ public class PathMapNode<Value> implemen
             // else {
             // getAnyChildNode().remove(paths, idx + 1, value);
             // }
-            getChildOrCreate(path.parts.get(idx)).remove(path, ++idx, value);
+            return getChildOrCreate(path.parts.get(idx)).remove(path, ++idx, value);
         }
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Wed Jan 26 03:10:35 2011
@@ -177,7 +177,7 @@ case class RuntimeResource(parent:Broker
 
   def get_queue_metrics(virtualHost:VirtualHost):Future[AggregateQueueMetricsDTO] = {
     val metrics = Future.all{
-      virtualHost.router.queues.values.map { queue=>
+      virtualHost.router.asInstanceOf[LocalRouter].queues_by_id.values.map { queue=>
         queue.dispatch_queue.future { get_queue_metrics(queue) }
       }
     }
@@ -194,8 +194,12 @@ case class RuntimeResource(parent:Broker
       result.state_since = virtualHost.service_state.since
       result.config = virtualHost.config
 
-      virtualHost.router.routing_nodes.foreach { node=>
-        result.destinations.add(new LongIdLabeledDTO(node.id, node.name.toString))
+      virtualHost.router.asInstanceOf[LocalRouter].topic_domain.destinations.foreach { node=>
+        result.topics.add(new LongIdLabeledDTO(node.id, node.name))
+      }
+
+      virtualHost.router.asInstanceOf[LocalRouter].queue_domain.destinations.foreach { node=>
+        result.queues.add(new LongIdLabeledDTO(node.id, node.binding.label))
       }
 
       get_queue_metrics(virtualHost).onComplete { metrics=>
@@ -240,18 +244,19 @@ case class RuntimeResource(parent:Broker
     link
   }
 
-  @GET @Path("virtual-hosts/{id}/destinations/{dest}")
-  def destination(@PathParam("id") id : Long, @PathParam("dest") dest : Long):DestinationStatusDTO = {
+  @GET @Path("virtual-hosts/{id}/topics/{dest}")
+  def destination(@PathParam("id") id : Long, @PathParam("dest") dest : Long):TopicStatusDTO = {
     with_virtual_host(id) { case (virtualHost,cb) =>
-      cb(virtualHost.router.routing_nodes.find { _.id == dest } map { node=>
-        val result = new DestinationStatusDTO
+      cb(virtualHost.router.asInstanceOf[LocalRouter].topic_domain.destination_by_id.get(dest) map { node=>
+        val result = new TopicStatusDTO
         result.id = node.id
-        result.name = node.name.toString
+        result.name = node.name
         result.config = node.config
-        node.queues.foreach { q=>
-          result.queues.add(new LongIdLabeledDTO(q.id, q.binding.label))
+
+        node.durable_subscriptions.foreach { q=>
+          result.durable_subscriptions.add(new LongIdLabeledDTO(q.id, q.binding.label))
         }
-        node.broadcast_consumers.foreach { consumer=>
+        node.consumers.foreach { consumer=>
           consumer match {
             case queue:Queue =>
               result.consumers.add(link(queue))
@@ -261,7 +266,7 @@ case class RuntimeResource(parent:Broker
               }
           }
         }
-        node.broadcast_producers.flatMap( _.producer.connection ).foreach { connection=>
+        node.producers.flatMap( _.connection ).foreach { connection=>
           result.producers.add(link(connection))
         }
 
@@ -270,7 +275,7 @@ case class RuntimeResource(parent:Broker
     }
   }
 
-  @GET @Path("virtual-hosts/{id}/queues/{queue}")
+  @GET @Path("virtual-hosts/{id}/all-queues/{queue}")
   def queue(@PathParam("id") id : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean):QueueStatusDTO = {
     with_virtual_host(id) { case (virtualHost,cb) =>
       reset {
@@ -280,11 +285,11 @@ case class RuntimeResource(parent:Broker
     }
   }
 
-  @GET @Path("virtual-hosts/{id}/destinations/{dest}/queues/{queue}")
-  def destination_queue(@PathParam("id") id : Long, @PathParam("dest") dest : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean ):QueueStatusDTO = {
+  @GET @Path("virtual-hosts/{id}/queues/{queue}")
+  def destination_queue(@PathParam("id") id : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean ):QueueStatusDTO = {
     with_virtual_host(id) { case (virtualHost,cb) =>
       import JavaConversions._
-      val queue = virtualHost.router.routing_nodes.find { _.id == dest } flatMap { node=> node.queues.find  { _.id == qid } }
+      val queue = virtualHost.router.asInstanceOf[LocalRouter].queue_domain.destination_by_id.get(qid)
       status(queue, entries, cb)
     }
   }
@@ -330,7 +335,7 @@ case class RuntimeResource(parent:Broker
     q.dispatch_queue {
       val rc = new QueueStatusDTO
       rc.id = q.id
-      rc.binding = q.binding.binding_dto
+      rc.destination = q.binding.binding_dto
       rc.config = q.config
       rc.metrics = get_queue_metrics(q)
 

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Wed Jan 26 03:10:35 2011
@@ -25,21 +25,22 @@
 .breadcumbs
   a(href={strip_resolve("..")}) Back
 
-- binding match
-  - case x:QueueBindingDTO =>
-    h1 Queue #{x.name}
-
-  - case x:SubscriptionBindingDTO =>
-    h1 Durable Subscription on #{x.name}
-    p client id: ${x.client_id}
-    p subscription id: ${x.subscription_id}
-    p filter: ${x.filter}
+-#
+  - binding match
+    - case x:QueueBindingDTO =>
+      h1 Queue #{x.name}
+
+    - case x:SubscriptionBindingDTO =>
+      h1 Durable Subscription on #{x.name}
+      p client id: ${x.client_id}
+      p subscription id: ${x.subscription_id}
+      p filter: ${x.filter}
 
-  - case x:TempBindingDTO =>
-    h1 Temporary Queue
+    - case x:TempBindingDTO =>
+      h1 Temporary Queue
 
-  - case x =>
-    h1 Unknown Queue Type: #{x.getClass.getName}
+    - case x =>  x
+      h1 Unknown Queue Type: #{x.getClass.getName}
 
 h2 Current Size
 

Copied: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade (from r1062213, activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade?p2=activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade&p1=activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade&r1=1062213&r2=1063582&rev=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade Wed Jan 26 03:10:35 2011
@@ -22,12 +22,6 @@
 
 h1 Destination: #{name}
 
-h3 Queue Domain
-ul
-  - for( x <- queues )
-    li
-      a(href={ path("../../queues/"+x.id) }) #{x.label}
-
 h3 Topic Domain
 
 h4 Publishers
@@ -51,4 +45,8 @@ ul
           a(href={ path("../../../../connections/"+x.ref) }) #{x.label}
       - case _ =>
 
-
+h4 Durable Subscribers
+ul
+  - for( x <- durable_subscriptions )
+    li
+      a(href={ path("../../all-queues/"+x.id) }) #{x.label}

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.jade?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.jade Wed Jan 26 03:10:35 2011
@@ -28,9 +28,14 @@ p state: #{state} #{ uptime(state_since)
   p
     a(href={ path("store") }) store
 
-h2 Destinations
+h2 Queues
 ul
-  - for( x <- destinations )
+  - for( x <- queues )
     li
-      a(href={ path("destinations/"+x.id) }) #{x.label}
+      a(href={ path("queues/"+x.id) }) #{x.label}
 
+h2 Topics
+ul
+  - for( x <- topics )
+    li
+      a(href={ path("topics/"+x.id) }) #{x.label}



Mime
View raw message