activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1208274 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apo...
Date Wed, 30 Nov 2011 04:31:00 GMT
Author: chirino
Date: Wed Nov 30 04:30:49 2011
New Revision: 1208274

URL: http://svn.apache.org/viewvc?rev=1208274&view=rev
Log:
Fixes APLO-92 : Support temporary destinations which can only be consumed by the connection
which creates them

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
Wed Nov 30 04:30:49 2011
@@ -289,7 +289,8 @@ class Broker() extends BaseService with 
     check_file_limit
 
     BrokerRegistry.add(this)
-    schedule_periodic_maintenance
+    schedule_now_update
+    schedule_virtualhost_maintenance
 
     val tracker = new LoggingTracker("broker startup", console_log, SERVICE_TIMEOUT)
     apply_update(tracker)
@@ -333,13 +334,28 @@ class Broker() extends BaseService with 
 
   }
 
-  def schedule_periodic_maintenance:Unit = dispatch_queue.after(100, TimeUnit.MILLISECONDS)
{
+  def schedule_now_update:Unit = dispatch_queue.after(100, TimeUnit.MILLISECONDS) {
     if( service_state.is_starting_or_started ) {
       now = System.currentTimeMillis
-      schedule_periodic_maintenance
+      schedule_now_update
     }
   }
 
+  def schedule_virtualhost_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
+    if( service_state.is_started ) {
+      val active_connections = connections.keySet
+
+      virtual_hosts.values.foreach { host=>
+        host.dispatch_queue {
+          if(host.service_state.is_started) {
+            host.router.remove_temp_destinations(active_connections)
+          }
+        }
+      }
+
+      schedule_virtualhost_maintenance
+    }
+  }
   protected def init_logs = {
     import OptionSupport._
     // Configure the logging categories...

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
Wed Nov 30 04:30:49 2011
@@ -16,52 +16,34 @@
  */
 package org.apache.activemq.apollo.broker
 
-import _root_.org.fusesource.hawtbuf._
-import BufferConversions._
-import Buffer._
-import org.apache.activemq.apollo.util.path.{Path, PathParser}
+import org.apache.activemq.apollo.util.path.PathParser
 import scala.collection.mutable.ListBuffer
 import collection.JavaConversions._
 import java.lang.StringBuilder
 import java.util.regex.Pattern
-import org.apache.activemq.apollo.dto.{DurableSubscriptionDestinationDTO, TopicDestinationDTO,
QueueDestinationDTO, DestinationDTO}
-
-object DestinationParser {
-
-  val OPENWIRE_PARSER = new DestinationParser();
-  OPENWIRE_PARSER.path_separator = "."
-  OPENWIRE_PARSER.any_child_wildcard = "*"
-  OPENWIRE_PARSER.any_descendant_wildcard = ">"
-
-  def create_destination(domain:String, parts:Array[String]):DestinationDTO = domain match
{
-    case LocalRouter.QUEUE_DOMAIN => new QueueDestinationDTO(parts)
-    case LocalRouter.TOPIC_DOMAIN => new TopicDestinationDTO(parts)
-    case _ => throw new Exception("Uknown destination domain: "+domain);
-  }
-
-}
+import org.apache.activemq.apollo.dto._
+import scala.Array
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class DestinationParser extends PathParser {
-  import DestinationParser._
 
   var queue_prefix = "queue:"
   var topic_prefix = "topic:"
   var dsub_prefix = "dsub:"
+  var temp_queue_prefix = "temp-queue:"
+  var temp_topic_prefix = "temp-topic:"
   var destination_separator = ","
-//  var temp_queue_prefix = "temp-queue:"
-//  var temp_topic_prefix = "temp-topic:"
 
   def copy(other:DestinationParser) = {
     super.copy(other)
     queue_prefix = other.queue_prefix
     topic_prefix = other.topic_prefix
     dsub_prefix = other.dsub_prefix
+    temp_queue_prefix = other.temp_queue_prefix
+    temp_topic_prefix = other.temp_topic_prefix
     destination_separator = other.destination_separator
-//    temp_queue_prefix = other.temp_queue_prefix
-//    temp_topic_prefix = other.temp_topic_prefix
     this
   }
 
@@ -85,10 +67,6 @@ class DestinationParser extends PathPars
           case d:TopicDestinationDTO =>
             rc.append(topic_prefix)
             rc.append(encode_path(dest.path.toIterable))
-//          case Router.TEMP_QUEUE_DOMAIN =>
-//            baos.write(temp_queue_prefix)
-//          case Router.TEMP_TOPIC_DOMAIN =>
-//            baos.write(temp_topic_prefix)
           case _ =>
             throw new Exception("Uknown destination type: "+dest.getClass);
         }
@@ -132,16 +110,17 @@ class DestinationParser extends PathPars
       } else if (dsub_prefix != null && value.startsWith(dsub_prefix)) {
         var name = value.substring(dsub_prefix.length)
         return Array( new DurableSubscriptionDestinationDTO(name) )
-//      } else if (temp_queue_prefix != null && value.startsWith(temp_queue_prefix))
{
-//        var name = value.slice(temp_queue_prefix.length, value.length).ascii()
-//        return new DestinationDTO(LocalRouter.TEMP_QUEUE_DOMAIN, name.toString)
-//      } else if (temp_topic_prefix != null && value.startsWith(temp_topic_prefix))
{
-//        var name = value.slice(temp_topic_prefix.length, value.length).ascii()
-//        return new DestinationDTO(LocalRouter.TEMP_TOPIC_DOMAIN, name.toString)
+      } else if (temp_topic_prefix != null && value.startsWith(temp_topic_prefix))
{
+        var name = value.substring(temp_topic_prefix.length)
+        return Array( new TopicDestinationDTO(parts(name)).temp(true) )
+      } else if (temp_queue_prefix != null && value.startsWith(temp_queue_prefix))
{
+        var name = value.substring(temp_queue_prefix.length)
+        return Array( new QueueDestinationDTO(parts(name)).temp(true) )
       } else {
         return null;
       }
     }
   }
+
 }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Wed Nov 30 04:30:49 2011
@@ -140,13 +140,6 @@ object LocalRouter extends Log {
 
   val destination_parser = new DestinationParser
 
-  val TOPIC_DOMAIN = "topic"
-  val QUEUE_DOMAIN = "queue"
-  val DSUB_DOMAIN = "dsub"
-
-  val QUEUE_KIND = "queue"
-  val DEFAULT_QUEUE_PATH = "default"
-
   def is_wildcard_config(dto:StringIdDTO) = {
     if( dto.id == null ) {
       true
@@ -209,6 +202,23 @@ class LocalRouter(val virtual_host:Virtu
   private val ALL = new Path(List(AnyDescendantPart))
 
   def authorizer = virtual_host.authorizer
+  
+  
+  def is_temp(destination:DestinationDTO) = {
+    destination.getClass!=classOf[DurableSubscriptionDestinationDTO] && destination.path.size()
>= 1 && destination.path.get(0) == "temp"
+  }
+  
+  def temp_owner(destination:DestinationDTO) = {
+    if( destination.path.size() < 3 ) {
+      None
+    } else {
+      try {
+        Some((destination.path.get(1), destination.path.get(2).toLong))
+      } catch {
+        case _ => None
+      }
+    }
+  }
 
   trait Domain[D <: DomainDestination] {
 
@@ -272,10 +282,10 @@ class LocalRouter(val virtual_host:Virtu
       }
 
       for(dest <- get_destination_matches(path)) {
-
-        if( destination.temp_owner != null ) {
+        if( is_temp(destination) ) {
+          val owner = temp_owner(destination).get
           for( connection <- security.connection_id) {
-            if( connection != destination.temp_owner.longValue() ) {
+            if( (virtual_host.broker.id, connection) != owner ) {
               return Some("Not authorized to destroy the temp %s '%s'. Principals=%s".format(dest.resource_kind.id,
dest.id, security.principal_dump))
             }
           }
@@ -297,11 +307,16 @@ class LocalRouter(val virtual_host:Virtu
       }
 
       // Only allow the owner to bind.
-      if( destination.temp_owner != null ) {
-        for( connection <- security.connection_id) {
-          if( connection != destination.temp_owner.longValue() ) {
-            return Some("Not authorized to receive from the temporary destination. Principals=%s".format(security.principal_dump))
-          }
+      if( is_temp(destination) ) {
+        temp_owner(destination) match {
+          case Some(owner) =>
+            for( connection <- security.connection_id) {
+              if( (virtual_host.broker.id, connection) != owner ) {
+                return Some("Not authorized to receive from the temporary destination. Principals=%s".format(security.principal_dump))
+              }
+            }
+          case None =>
+            return Some("Invalid temp destination name. Owner id missing")
         }
       }
 
@@ -509,6 +524,7 @@ class LocalRouter(val virtual_host:Virtu
             case queue:Queue =>
               // Delete any attached queue consumers..
               _destroy_queue(queue)
+            case _ =>
           }
         }
 
@@ -876,7 +892,7 @@ class LocalRouter(val virtual_host:Virtu
                     virtual_host.store.remove_queue(queue_key){x=> task.run}
                   } else {
                     var binding = BindingFactory.create(record.binding_kind, record.binding_data)
-                    if( binding.binding_dto.temp_owner != null ) {
+                    if( is_temp(binding.binding_dto) ) {
                       // These are the temp queues clients create.
                       virtual_host.store.remove_queue(queue_key){x=> task.run}
                     } else {
@@ -908,6 +924,25 @@ class LocalRouter(val virtual_host:Virtu
       create_configure_destinations
       on_completed.run()
     }
+
+
+  }
+  
+  def remove_temp_destinations(active_connections:scala.collection.Set[Long]) = {
+    virtual_host.dispatch_queue.assertExecuting()
+    // Auto delete temp destinations..
+    queue_domain.destinations.filter(x=> is_temp(x.destination_dto)).foreach { queue=>
+      val owner = temp_owner(queue.destination_dto).get
+      if( owner._1==virtual_host.broker.id && !active_connections.contains(owner._2)
) {
+        _destroy_queue(queue)
+      }
+    }
+    topic_domain.destinations.filter(x=> is_temp(x.destination_dto)).foreach { topic =>
+      val owner = temp_owner(topic.destination_dto).get
+      if( owner._1==virtual_host.broker.id && !active_connections.contains(owner._2)
) {
+        topic_domain.destroy_destination(topic.path, topic.destination_dto, null)
+      }
+    }
   }
 
   protected def _stop(on_completed: Runnable) = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Wed Nov 30 04:30:49 2011
@@ -47,6 +47,8 @@ trait Router extends Service {
   def create(destinations:Array[DestinationDTO], security:SecurityContext): Option[String]
@suspendable
 
   def apply_update(on_completed:Runnable):Unit
+
+  def remove_temp_destinations(active_connections:scala.collection.Set[Long]):Unit
 }
 
 /**

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Wed Nov 30 04:30:49 2011
@@ -33,7 +33,7 @@ import security.SecuredResource
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater:
()=>TopicDTO, val id:String, path:Path) extends DomainDestination with SecuredResource
{
+class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater:
()=>TopicDTO, val id:String, val path:Path) extends DomainDestination with SecuredResource
{
 
   val topic_metrics = new DestMetricsDTO
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
Wed Nov 30 04:30:49 2011
@@ -27,7 +27,7 @@ import org.fusesource.hawtbuf.AsciiBuffe
 import java.net.URL
 import org.apache.activemq.apollo.util._
 import collection.mutable.{ArrayBuffer, ListBuffer}
-import org.apache.activemq.apollo.dto.{AcceptingConnectorDTO, DestinationDTO, BrokerDTO}
+import org.apache.activemq.apollo.dto._
 
 /**
  *
@@ -214,16 +214,11 @@ abstract class BrokerPerfSupport extends
     var dests = new Array[DestinationDTO](destCount)
 
     for (i <- 0 until destCount) {
-      val domain = if (PTP) {LocalRouter.QUEUE_DOMAIN} else {LocalRouter.TOPIC_DOMAIN}
       val name ="dest" + (i + 1)
-      var bean = DestinationParser.create_destination(domain, Array(name))
-      dests(i) = bean
-      //        if (PTP) {
-      //          sendBroker.defaultVirtualHost.createQueue(dests(i))
-      //          if (MULTI_BROKER) {
-      //            rcvBroker.defaultVirtualHost.createQueue(dests(i))
-      //          }
-      //        }
+      dests(i) = if (PTP)
+        new QueueDestinationDTO(Array(name))
+      else
+        new TopicDestinationDTO(Array(name))
     }
     dests
   }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
Wed Nov 30 04:30:49 2011
@@ -31,21 +31,14 @@ import java.util.List;
  */
 @XmlType(name = "destination")
 @XmlSeeAlso({QueueDestinationDTO.class, DurableSubscriptionDestinationDTO.class})
-@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property =
"@class")
 @XmlAccessorType(XmlAccessType.FIELD)
 abstract public class DestinationDTO {
 
     @XmlElement(name = "path")
     public List<String> path = new ArrayList<String>();
 
-    /**
-     * If the destination is a temporary destination, then it
-     * will have temp_owner set to the owner of the connection
-     * id which owns the destination.  Only the owner will be allowed
-     * to consume from the destination.
-     */
-    @XmlAttribute(name="temp_owner")
-    public Long temp_owner;
+    public boolean temp;
 
     public DestinationDTO() {
     }
@@ -59,9 +52,9 @@ abstract public class DestinationDTO {
     }
 
     public String name(String separator) {
-        StringBuilder sb  = new StringBuilder();
-        for( String p : path) {
-            if( sb.length() != 0 ) {
+        StringBuilder sb = new StringBuilder();
+        for (String p : path) {
+            if (sb.length() != 0) {
                 sb.append(separator);
             }
             sb.append(p);
@@ -69,33 +62,40 @@ abstract public class DestinationDTO {
         return sb.toString();
     }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (!(o instanceof DestinationDTO)) return false;
-
-    DestinationDTO that = (DestinationDTO) o;
-
-    if (path != null ? !path.equals(that.path) : that.path != null)
-      return false;
-    if (temp_owner != null ? !temp_owner.equals(that.temp_owner) : that.temp_owner != null)
-      return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = path != null ? path.hashCode() : 0;
-    result = 31 * result + (temp_owner != null ? temp_owner.hashCode() : 0);
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    return "DestinationDTO{" +
-            "path=" + path +
-            ", temp_owner=" + temp_owner +
-            '}';
-  }
+    public boolean temp() {
+        return temp;
+    }
+
+    public DestinationDTO temp(boolean temp) {
+        this.temp = temp;
+        return this;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof DestinationDTO)) return false;
+
+        DestinationDTO that = (DestinationDTO) o;
+
+        if (temp != that.temp) return false;
+        if (!path.equals(that.path)) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = path.hashCode();
+        result = 31 * result + (temp ? 1 : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "DestinationDTO{" +
+                "path=" + path +
+                '}';
+    }
+
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
Wed Nov 30 04:30:49 2011
@@ -36,8 +36,8 @@ object DestinationConverter {
   OPENWIRE_PARSER.any_descendant_wildcard = ">"
 
   def to_destination_dto(domain: String, parts:Array[String]): DestinationDTO = domain match
{
-    case LocalRouter.QUEUE_DOMAIN => new QueueDestinationDTO(parts)
-    case LocalRouter.TOPIC_DOMAIN => new TopicDestinationDTO(parts)
+    case "queue" => new QueueDestinationDTO(parts)
+    case "topic" => new TopicDestinationDTO(parts)
     case _ => throw new Exception("Uknown destination domain: " + domain);
   }
 
@@ -53,13 +53,13 @@ object DestinationConverter {
       var path = OPENWIRE_PARSER.decode_path(physicalName)
       dest.getDestinationType match {
         case QUEUE_TYPE =>
-          to_destination_dto(LocalRouter.QUEUE_DOMAIN, path)
+          to_destination_dto("queue", path)
         case TOPIC_TYPE =>
-          to_destination_dto(LocalRouter.TOPIC_DOMAIN, path)
+          to_destination_dto("topic", path)
         case TEMP_QUEUE_TYPE =>
-          to_destination_dto(LocalRouter.QUEUE_DOMAIN, Path("ActiveMQ", "Temp") + path)
+          to_destination_dto("queue", Path("ActiveMQ", "Temp") + path)
         case TEMP_TOPIC_TYPE =>
-          to_destination_dto(LocalRouter.TOPIC_DOMAIN, Path("ActiveMQ", "Temp") + path)
+          to_destination_dto("topic", Path("ActiveMQ", "Temp") + path)
       }
     } else {
       dest.getCompositeDestinations.map { c =>
@@ -72,7 +72,7 @@ object DestinationConverter {
     import collection.JavaConversions._
 
     val rc = dest.map { dest =>
-      var temp = dest.temp_owner != null
+      var temp = false // dest.temp_owner != null
       val name = OPENWIRE_PARSER.encode_path(asScalaBuffer(dest.path).toList match {
         case "ActiveMQ" :: "Temp" :: rest =>
           temp = true

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Wed Nov 30 04:30:49 2011
@@ -488,9 +488,9 @@ class OpenwireProtocolHandler extends Pr
 
   def on_destination_info(info:DestinationInfo) = {
     val destinations = to_destination_dto(info.getDestination)
-    if( info.getDestination.isTemporary ) {
-      destinations.foreach(_.temp_owner = connection.id)
-    }
+//    if( info.getDestination.isTemporary ) {
+//      destinations.foreach(_.temp_owner = connection.id)
+//    }
     reset{
       val rc = info.getOperationType match {
         case DestinationInfo.ADD_OPERATION_TYPE=>
@@ -806,9 +806,9 @@ class OpenwireProtocolHandler extends Pr
 
       // if they are temp dests.. attach our owner id so that we don't
       // get rejected.
-      if( info.getDestination.isTemporary ) {
-        destination.foreach(_.temp_owner = connection.get.id)
-      }
+//      if( info.getDestination.isTemporary ) {
+//        destination.foreach(_.temp_owner = connection.get.id)
+//      }
 
       parent.consumers.put(info.getConsumerId, this)
       all_consumers.put(info.getConsumerId, this)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Wed Nov 30 04:30:49 2011
@@ -300,6 +300,8 @@ object Stomp {
   destination_parser.queue_prefix = "/queue/"
   destination_parser.topic_prefix = "/topic/"
   destination_parser.dsub_prefix = "/dsub/"
+  destination_parser.temp_queue_prefix = "/temp-queue/"
+  destination_parser.temp_topic_prefix = "/temp-topic/"
   destination_parser.destination_separator = ","
   destination_parser.path_separator = "."
   destination_parser.any_child_wildcard = "*"

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed Nov 30 04:30:49 2011
@@ -34,11 +34,11 @@ import java.util.concurrent.TimeUnit
 import java.util.Map.Entry
 import path.PathParser
 import scala.util.continuations._
-import org.apache.activemq.apollo.dto._
 import org.apache.activemq.apollo.transport.tcp.SslTransport
 import java.security.cert.X509Certificate
 import collection.mutable.{ListBuffer, HashMap}
 import java.io.IOException
+import org.apache.activemq.apollo.dto._
 
 
 case class RichBuffer(self:Buffer) extends Proxy {
@@ -138,8 +138,7 @@ class StompProtocolHandler extends Proto
     override val browser:Boolean,
     override val exclusive:Boolean,
     val auto_delete:Boolean,
-    val initial_credit_window:(Int,Int, Boolean),
-    val temp:Boolean
+    val initial_credit_window:(Int,Int, Boolean)
   ) extends BaseRetained with DeliveryConsumer {
 
 ////  The following comes in handy if we need to debug the
@@ -165,10 +164,6 @@ class StompProtocolHandler extends Proto
 //      r.release
 //    }
 
-    if( temp ) {
-      destination.foreach(_.temp_owner = connection.get.id)
-    }
-
     val ack_source = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
       def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
         if( previous == null ) {
@@ -427,7 +422,7 @@ class StompProtocolHandler extends Proto
 
       def dispose = {
         session_manager.close(downstream)
-        if( auto_delete || temp) {
+        if( auto_delete ) {
           reset {
             val rc = host.router.delete(destination, security_context)
             rc match {
@@ -489,6 +484,7 @@ class StompProtocolHandler extends Proto
   var protocol_filters = List[ProtocolFilter]()
 
   var destination_parser = Stomp.destination_parser
+  var temp_destination_map = HashMap[DestinationDTO, DestinationDTO]()
 
   var codec:StompCodec = _
 
@@ -497,9 +493,25 @@ class StompProtocolHandler extends Proto
     if( rc==null ) {
       throw new ProtocolException("Invalid stomp destiantion name: "+value);
     }
-    rc
+    rc.map { dest =>
+      if( dest.temp() ) {
+        temp_destination_map.getOrElseUpdate(dest, {
+          import scala.collection.JavaConversions._
+          val real_path= ("temp" :: broker.id :: connection.id.toString :: dest.path.toList).toArray
+          dest match {
+            case dest:QueueDestinationDTO => new QueueDestinationDTO( real_path ).temp(true)
+            case dest:TopicDestinationDTO => new TopicDestinationDTO( real_path ).temp(true)
+            case _ => throw new ProtocolException("Invalid stomp destination");
+          }
+        })
+      } else {
+        dest
+      }
+    }
   }
 
+
+
   override def set_connection(connection: BrokerConnection) = {
     super.set_connection(connection)
     import collection.JavaConversions._
@@ -528,6 +540,8 @@ class StompProtocolHandler extends Proto
       destination_parser = new DestinationParser().copy(Stomp.destination_parser)
       if( config.queue_prefix!=null ) { destination_parser.queue_prefix = config.queue_prefix
}
       if( config.topic_prefix!=null ) { destination_parser.topic_prefix = config.topic_prefix
}
+      if( config.temp_queue_prefix!=null ) { destination_parser.temp_queue_prefix = config.temp_queue_prefix
}
+      if( config.temp_topic_prefix!=null ) { destination_parser.temp_topic_prefix = config.temp_topic_prefix
}
       if( config.destination_separator!=null ) { destination_parser.destination_separator
= config.destination_separator }
       if( config.path_separator!=null ) { destination_parser.path_separator = config.path_separator
}
       if( config.any_child_wildcard!=null ) { destination_parser.any_child_wildcard = config.any_child_wildcard
}
@@ -888,8 +902,8 @@ class StompProtocolHandler extends Proto
 
   def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
 
-    val destiantion: Array[DestinationDTO] = get(frame.headers, DESTINATION).get
-    val key = destiantion.toList
+    val destination: Array[DestinationDTO] = get(frame.headers, DESTINATION).get
+    val key = destination.toList
     producerRoutes.get(key) match {
       case null =>
         // create the producer route...
@@ -907,7 +921,7 @@ class StompProtocolHandler extends Proto
         // don't process frames until producer is connected...
         connection.transport.suspendRead
         reset {
-          val rc = host.router.connect(destiantion, route, security_context)
+          val rc = host.router.connect(destination, route, security_context)
           rc match {
             case Some(failure) =>
               async_die(failure)
@@ -915,23 +929,34 @@ class StompProtocolHandler extends Proto
               if (!connection.stopped) {
                 resumeRead
                 producerRoutes.put(key, route)
-                send_via_route(route, frame, uow)
+                send_via_route(destination, route, frame, uow)
               }
           }
         }
 
       case route =>
         // we can re-use the existing producer route
-        send_via_route(route, frame, uow)
+        send_via_route(destination, route, frame, uow)
 
     }
   }
 
   var message_id_counter = 0;
 
-  def updated_headers(headers:HeaderMap) = {
+  def updated_headers(destination: Array[DestinationDTO], headers:HeaderMap) = {
     var rc:HeaderMap=Nil
 
+    // Do we need to re-write the destination names?
+    if( destination.find(_.temp()).isDefined ) {
+      rc ::= (DESTINATION -> encode_header(destination_parser.encode_destination(destination)))
+    }
+    get(headers, REPLY_TO).foreach { value=>
+      val dests:Array[DestinationDTO] = value
+      if( dests.find(_.temp()).isDefined ) {
+        rc ::= (REPLY_TO -> encode_header(destination_parser.encode_destination(dests)))
+      }
+    }
+
     // Do we need to add the message id?
     if( get( headers, MESSAGE_ID) == None ) {
       message_id_counter += 1
@@ -969,7 +994,7 @@ class StompProtocolHandler extends Proto
     rc
   }
 
-  def send_via_route(route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
+  def send_via_route(destination: Array[DestinationDTO], route:DeliveryProducerRoute, frame:StompFrame,
uow:StoreUOW) = {
     var storeBatch:StoreUOW=null
     // User might be asking for ack that we have processed the message..
     val receipt = frame.header(RECEIPT_REQUESTED)
@@ -977,7 +1002,7 @@ class StompProtocolHandler extends Proto
     if( !route.targets.isEmpty ) {
 
       // We may need to add some headers..
-      var message = updated_headers(frame.headers) match {
+      var message = updated_headers(destination, frame.headers) match {
         case Nil=>
           StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content))
         case updated_headers =>
@@ -1036,7 +1061,6 @@ class StompProtocolHandler extends Proto
     var browser = get(headers, BROWSER).map( _ == TRUE ).getOrElse(false)
     var exclusive = get(headers, EXCLUSIVE).map( _ == TRUE ).getOrElse(false)
     var auto_delete = get(headers, AUTO_DELETE).map( _ == TRUE ).getOrElse(false)
-    var temp = get(headers, TEMP).map( _ == TRUE ).getOrElse(false)
 
     val ack_mode = get(headers, ACK_MODE).getOrElse(ACK_MODE_AUTO)
     val credit_window = get(headers, CREDIT) match {
@@ -1095,7 +1119,7 @@ class StompProtocolHandler extends Proto
       }
     }
 
-    val consumer = new StompConsumer(subscription_id, destination, ack_mode, selector, browser,
exclusive, auto_delete, credit_window, temp);
+    val consumer = new StompConsumer(subscription_id, destination, ack_mode, selector, browser,
exclusive, auto_delete, credit_window);
     consumers += (id -> consumer)
 
     reset {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
Wed Nov 30 04:30:49 2011
@@ -77,6 +77,12 @@ public class StompDTO extends ProtocolDT
     @XmlAttribute(name="topic_prefix")
     public String topic_prefix;
 
+    @XmlAttribute(name="temp_queue_prefix")
+    public String temp_queue_prefix;
+
+    @XmlAttribute(name="temp_topic_prefix")
+    public String temp_topic_prefix;
+
     @XmlAttribute(name="destination_separator")
     public String destination_separator;
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Wed Nov 30 04:30:49 2011
@@ -1640,4 +1640,109 @@ class StompAutoDeleteTest extends StompT
     expect(false)(queue_exists)
 
   }
+}
+
+
+class StompTempDestinationTest extends StompTestSupport {
+
+  def path_separator = "."
+
+  test("Temp Queue Send Receive") {
+    connect("1.1")
+
+    def put(msg:String) = {
+      client.write(
+        "SEND\n" +
+        "destination:/temp-queue/test\n" +
+        "reply-to:/temp-queue/test\n" +
+        "\n" +
+        "message:"+msg+"\n")
+    }
+    put("1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/temp-queue/test\n" +
+      "id:1\n" +
+      "\n")
+
+    def get(dest:String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\nmessage:%s\n".format(dest))
+
+      // extract headers as a map of values.
+      Map((frame.split("\n").reverse.flatMap { line =>
+        if( line.contains(":") ) {
+          val parts = line.split(":", 2)
+          Some((parts(0), parts(1)))
+        } else {
+          None
+        }
+      }):_*)
+    }
+
+    // The destination and reply-to headers should get updated with actual
+    // Queue names
+    val message = get("1")
+    message.get("destination").get should startWith("/queue/temp.default.")
+    message.get("reply-to") should be === ( message.get("destination") )
+  }
+
+  test("Temp Topic Send Receive") {
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/temp-topic/test\n" +
+      "id:1\n" +
+      "\n")
+
+    def get(dest:String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\nmessage:%s\n".format(dest))
+
+      // extract headers as a map of values.
+      Map((frame.split("\n").reverse.flatMap { line =>
+        if( line.contains(":") ) {
+          val parts = line.split(":", 2)
+          Some((parts(0), parts(1)))
+        } else {
+          None
+        }
+      }):_*)
+    }
+
+    def put(msg:String) = {
+      client.write(
+        "SEND\n" +
+        "destination:/temp-topic/test\n" +
+        "reply-to:/temp-topic/test\n" +
+        "\n" +
+        "message:"+msg+"\n")
+    }
+    put("1")
+
+    // The destination and reply-to headers should get updated with actual
+    // Queue names
+    val message = get("1")
+    message.get("destination").get should startWith("/topic/temp.default.")
+    message.get("reply-to") should be === ( message.get("destination") )
+  }
+
+  test("Receive not allowed on another connection's temp queue") {
+
+    connect("1.1")
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/temp.default.1212112.test\n" +
+      "id:1\n" +
+      "\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include regex("""message:Not authorized to receive from the temporary destination""")
+
+  }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1208274&r1=1208273&r2=1208274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Wed Nov
30 04:30:49 2011
@@ -1418,6 +1418,56 @@ an error message.
 The auto-delete feature does not work with composite or wildcard destinations.
 it also does not work with topics or durable subscriptions.
 
+
+### Temporary Destinations
+
+Temporary destinations are typically used to receive response messages in
+a request/response messaging exchange.  A temporary destination can only
+be consumed by a subscription created on the connection which is associated
+with the temporary destination.  Once the connection is closed, all associated
+temporary destinations are removed. Temporary destinations are prefixed with:
+
+* `/temp-queue/` - For temporary queues.  Has the same delivery semantics as queues.
+* `/temp-topic/` - For temporary topics.  It has the same delivery semantics of topics.
+
+In a request/response scenario, you would first subscribe to the temporary topic:
+
+    SUBSCRIBE
+    id:mysub
+    destination:/temp-queue/example
+    
+    ^@
+
+Then you would send a request with the reply-to header set to the temporary destination.

+Example:
+
+    SEND
+    destination:/queue/PO.REQUEST
+    reply-to:/temp-queue/example
+
+    PO145
+    ^@
+
+The consumer receiving the request will receive a message similar to:
+
+    MESSAGE
+    subscription:foo
+    reply-to:/queue/temp.default.23.example
+    destination:/queue/PO.REQUEST
+    reply-to:/temp-queue/example
+    
+    PO145
+
+Notice that the reply-to` header value is updated from a temporary destination
+name to normal destination name.  The subscription servicing he requests should respond
+to the updated destination value (`/queue/temp.default.23.example` in the example above).
+
+Temporary destination names actually map to normal queues and topics. They just have
+a `temp.<broker_id>.<connection_id>.` prefix.  Any destination which starts with
+`temp.` has a security policy which only allows the connection which created it
+to subscribe from it.  These destinations are also auto deleted once the connection
+is closed.
+
 ### Destination Wildcards
 
 We support destination wildcards to easily subscribe to multiple destinations
@@ -1495,5 +1545,3 @@ Destination names are restricted to usin
 `_`, `-` `%`, `~`, or `.` in addition to composite separator `,` and the wild
 card `*`.
 
-
-



Mime
View raw message