activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1236424 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-broker/src/main/scala/org/apache/activemq/apollo/...
Date Thu, 26 Jan 2012 22:46:13 GMT
Author: chirino
Date: Thu Jan 26 22:46:12 2012
New Revision: 1236424

URL: http://svn.apache.org/viewvc?rev=1236424&view=rev
Log:
- Added ':' as a valid destination part value.
- Temp destinations now check ownership against the a connection's session_id instead of the
connection id (the session id is value assigned by the protocol).
- Updated the openwire protocol handler temp destination implementation to use the broker's
temp destination features so that it can enforce security and the temp dest lifecycle.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
    activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
Thu Jan 26 22:46:12 2012
@@ -348,12 +348,12 @@ class Broker() extends BaseService with 
 
   def schedule_virtualhost_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
     if( service_state.is_started ) {
-      val active_connections = connections.keySet
+      val active_sessions = connections.values.flatMap(_.session_id).toSet
 
       virtual_hosts.values.foreach { host=>
         host.dispatch_queue {
           if(host.service_state.is_started) {
-            host.router.remove_temp_destinations(active_connections)
+            host.router.remove_temp_destinations(active_sessions)
           }
         }
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
Thu Jan 26 22:46:12 2012
@@ -96,6 +96,8 @@ class BrokerConnection(var connector: Co
 
   var protocol_handler: ProtocolHandler = null;
 
+  def session_id = Option(protocol_handler).flatMap(_.session_id)
+  
   override def toString = "id: "+id.toString
 
   protected override  def _start(on_completed:Runnable) = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
Thu Jan 26 22:46:12 2012
@@ -28,6 +28,7 @@ import scala.Array
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class DestinationParser extends PathParser {
+  import PathParser._
 
   var queue_prefix = "queue:"
   var topic_prefix = "topic:"

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Thu Jan 26 22:46:12 2012
@@ -213,7 +213,7 @@ class LocalRouter(val virtual_host:Virtu
       None
     } else {
       try {
-        Some((destination.path.get(1), destination.path.get(2).toLong))
+        Some((destination.path.get(1), destination.path.get(2)))
       } catch {
         case _ => None
       }
@@ -287,7 +287,7 @@ class LocalRouter(val virtual_host:Virtu
       for(dest <- get_destination_matches(path)) {
         if( is_temp(destination) ) {
           val owner = temp_owner(destination).get
-          for( connection <- security.connection_id) {
+          for( connection <- security.session_id) {
             if( (virtual_host.broker.id, connection) != owner ) {
               return Some("Not authorized to destroy the temp %s '%s'. Principals=%s".format(dest.resource_kind.id,
dest.id, security.principal_dump))
             }
@@ -313,7 +313,7 @@ class LocalRouter(val virtual_host:Virtu
       if( is_temp(destination) ) {
         temp_owner(destination) match {
           case Some(owner) =>
-            for( connection <- security.connection_id) {
+            for( connection <- security.session_id) {
               if( (virtual_host.broker.id, connection) != owner ) {
                 return Some("Not authorized to receive from the temporary destination. Principals=%s".format(security.principal_dump))
               }
@@ -911,7 +911,7 @@ class LocalRouter(val virtual_host:Virtu
 
   }
   
-  def remove_temp_destinations(active_connections:scala.collection.Set[Long]) = {
+  def remove_temp_destinations(active_connections:scala.collection.Set[String]) = {
     virtual_host.dispatch_queue.assertExecuting()
     val min_create_time = virtual_host.broker.now - 1000;
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Thu Jan 26 22:46:12 2012
@@ -48,7 +48,7 @@ trait Router extends Service {
 
   def apply_update(on_completed:Runnable):Unit
 
-  def remove_temp_destinations(active_connections:scala.collection.Set[Long]):Unit
+  def remove_temp_destinations(active_connections:scala.collection.Set[String]):Unit
 }
 
 /**

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
Thu Jan 26 22:46:12 2012
@@ -154,6 +154,8 @@ class AnyProtocolHandler extends Protoco
 
   var discriminated = false
 
+  def session_id = None
+
   override def on_transport_command(command: AnyRef) = {
 
     if (!command.isInstanceOf[ProtocolDetected]) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
Thu Jan 26 22:46:12 2012
@@ -67,6 +67,8 @@ trait ProtocolHandler {
 
   def protocol:String
 
+  def session_id:Option[String]
+
   var connection:BrokerConnection = null;
 
   def set_connection(brokerConnection:BrokerConnection) = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
Thu Jan 26 22:46:12 2012
@@ -39,7 +39,7 @@ class SecurityContext {
   var local_address:SocketAddress = _
   var remote_address:SocketAddress = _
   var login_context:LoginContext = _
-  var connection_id:Option[Long] = None
+  var session_id:Option[String] = None
 
   def credential_dump = {
     var rc = List[String]()

Modified: activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/pom.xml?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/pom.xml Thu Jan 26 22:46:12 2012
@@ -60,33 +60,47 @@
     <!-- so we can test against a persisentce store -->
     <dependency>
       <groupId>org.apache.activemq</groupId>
-      <artifactId>apollo-leveldb</artifactId>
+      <artifactId>apollo-broker</artifactId>
       <version>1.1-SNAPSHOT</version>
+      <type>test-jar</type>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.activemq</groupId>
-      <artifactId>apollo-bdb</artifactId>
+      <artifactId>apollo-util</artifactId>
       <version>1.1-SNAPSHOT</version>
+      <type>test-jar</type>
       <scope>test</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.activemq</groupId>
-      <artifactId>apollo-broker</artifactId>
+      <artifactId>apollo-bdb</artifactId>
       <version>1.1-SNAPSHOT</version>
-      <type>test-jar</type>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.activemq</groupId>
-      <artifactId>apollo-util</artifactId>
+      <artifactId>apollo-leveldb</artifactId>
       <version>1.1-SNAPSHOT</version>
-      <type>test-jar</type>
       <scope>test</scope>
     </dependency>
 
     <dependency>
+      <groupId>org.eclipse.jetty.aggregate</groupId>
+      <artifactId>jetty-all-server</artifactId>
+      <version>${jetty-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-web</artifactId>
+      <version>1.1-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
Thu Jan 26 22:46:12 2012
@@ -175,7 +175,7 @@ class DestinationAdvisoryRouterListener(
 
   def send(delivery:Delivery): Unit = {
     val message = delivery.message.asInstanceOf[OpenwireMessage].message
-    val dest: Array[DestinationDTO] = to_destination_dto(message.getDestination)
+    val dest: Array[DestinationDTO] = to_destination_dto(message.getDestination,null)
     val key = dest.toList
 
     val route = producerRoutes.get(key) match {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
Thu Jan 26 22:46:12 2012
@@ -16,11 +16,12 @@
  */
 package org.apache.activemq.apollo.openwire
 
-import command._
-import org.apache.activemq.apollo.util.path.Path
 import java.util.regex.{Matcher, Pattern}
 import org.apache.activemq.apollo.dto.{TopicDestinationDTO, QueueDestinationDTO, DestinationDTO}
-import org.apache.activemq.apollo.broker.{DestinationParser, LocalRouter}
+import org.apache.activemq.apollo.broker.DestinationParser
+import org.fusesource.hawtbuf.Buffer.utf8
+import org.apache.activemq.apollo.openwire.command._
+import org.fusesource.hawtbuf._
 
 /**
  * <p>
@@ -35,35 +36,32 @@ object DestinationConverter {
   OPENWIRE_PARSER.any_child_wildcard = "*"
   OPENWIRE_PARSER.any_descendant_wildcard = ">"
 
-  def to_destination_dto(domain: String, parts:Array[String]): DestinationDTO = domain match
{
-    case "queue" => new QueueDestinationDTO(parts)
-    case "topic" => new TopicDestinationDTO(parts)
-    case _ => throw new Exception("Uknown destination domain: " + domain);
-  }
-
-  def to_destination_dto(domain: String, path: Path): Array[DestinationDTO] = {
-      Array(to_destination_dto(domain, OPENWIRE_PARSER.path_parts(path)))
-  }
+  //  = Pattern.compile("[ a-zA-Z0-9\\_\\-\\%\\~]")
+  
+  def to_destination_dto(dest: ActiveMQDestination, handler:OpenwireProtocolHandler): Array[DestinationDTO]
= {
 
-  def to_destination_dto(dest: ActiveMQDestination): Array[DestinationDTO] = {
     if( !dest.isComposite ) {
       import ActiveMQDestination._
-      val physicalName = dest.getPhysicalName.replaceAll(Pattern.quote(":"), Matcher.quoteReplacement("%58"))
-
-      var path = OPENWIRE_PARSER.decode_path(physicalName)
-      dest.getDestinationType match {
+      var name = dest.getPhysicalName
+      Array(dest.getDestinationType match {
         case QUEUE_TYPE =>
-          to_destination_dto("queue", path)
+          var path_parts = OPENWIRE_PARSER.parts(name).map(sanitize_destination_part(_))
+          new QueueDestinationDTO(path_parts)
         case TOPIC_TYPE =>
-          to_destination_dto("topic", path)
+          var path_parts = OPENWIRE_PARSER.parts(name).map(sanitize_destination_part(_))
+          new TopicDestinationDTO(path_parts)
         case TEMP_QUEUE_TYPE =>
-          to_destination_dto("queue", Path("ActiveMQ", "Temp") + path)
+          val (connectionid, rest)= name.splitAt(name.lastIndexOf(':'))
+          val real_path = ("temp" :: handler.broker.id :: sanitize_destination_part(connectionid)
:: sanitize_destination_part(rest.substring(1)) :: Nil).toArray
+          new QueueDestinationDTO( real_path ).temp(true)
         case TEMP_TOPIC_TYPE =>
-          to_destination_dto("topic", Path("ActiveMQ", "Temp") + path)
-      }
+          val (connectionid, rest)= name.splitAt(name.lastIndexOf(':'))
+          val real_path = ("temp" :: handler.broker.id :: sanitize_destination_part(connectionid)
:: sanitize_destination_part(rest.substring(1)) :: Nil).toArray
+          new TopicDestinationDTO( real_path ).temp(true)
+      })
     } else {
       dest.getCompositeDestinations.map { c =>
-        to_destination_dto(c)(0)
+        to_destination_dto(c, handler)(0)
       }
     }
   }
@@ -72,26 +70,21 @@ object DestinationConverter {
     import collection.JavaConversions._
 
     val rc = dest.map { dest =>
-      var temp = false // dest.temp_owner != null
-      val name = OPENWIRE_PARSER.encode_path(asScalaBuffer(dest.path).toList match {
-        case "ActiveMQ" :: "Temp" :: rest =>
-          temp = true
-          rest
-        case rest =>
-          rest
-      }).replaceAll(Pattern.quote("%58"), Matcher.quoteReplacement(":"))
 
+      val temp = dest.path.headOption == Some("temp")
       dest match {
         case dest:QueueDestinationDTO =>
           if( temp ) {
-            new ActiveMQTempQueue(name)
+            new ActiveMQTempQueue(dest.path.toList.drop(2).map(unsanitize_destination_part(_)).mkString(":"))
           } else {
+            val name = OPENWIRE_PARSER.encode_path(asScalaBuffer(dest.path).toList.map(unsanitize_destination_part(_)))
             new ActiveMQQueue(name)
           }
         case dest:TopicDestinationDTO =>
           if( temp ) {
-            new ActiveMQTempTopic(name)
+            new ActiveMQTempTopic(dest.path.toList.drop(2).map(unsanitize_destination_part(_)).mkString(":"))
           } else {
+            val name = OPENWIRE_PARSER.encode_path(asScalaBuffer(dest.path).toList.map(unsanitize_destination_part(_)))
             new ActiveMQTopic(name)
           }
       }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Thu Jan 26 22:46:12 2012
@@ -36,11 +36,10 @@ import codec.OpenWireFormat
 import command._
 import org.apache.activemq.apollo.openwire.dto.{OpenwireConnectionStatusDTO,OpenwireDTO}
 import org.apache.activemq.apollo.dto.{AcceptingConnectorDTO, TopicDestinationDTO, DurableSubscriptionDestinationDTO,
DestinationDTO}
-import org.apache.activemq.apollo.openwire.DestinationConverter._
 import org.apache.activemq.apollo.broker._
 import protocol._
 import security.SecurityContext
-
+import DestinationConverter._
 
 object OpenwireProtocolHandler extends Log {
   def unit:Unit = {}
@@ -97,7 +96,6 @@ class OpenwireProtocolHandler extends Pr
 
   private def queue = connection.dispatch_queue
 
-  var session_id: AsciiBuffer = _
   var wire_format: OpenWireFormat = _
   var login: Option[AsciiBuffer] = None
   var passcode: Option[AsciiBuffer] = None
@@ -111,6 +109,9 @@ class OpenwireProtocolHandler extends Pr
   var current_command: Object = _
 
   var codec:OpenwireCodec = _
+  var temp_destination_map = HashMap[ActiveMQDestination, DestinationDTO]()
+
+  def session_id = security_context.session_id
 
   override def create_connection_status = {
     var rc = new OpenwireConnectionStatusDTO
@@ -179,7 +180,6 @@ class OpenwireProtocolHandler extends Pr
   }
 
   override def on_transport_connected():Unit = {
-    security_context.connection_id = Some(connection.id)
     security_context.local_address = connection.transport.getLocalAddress
     security_context.remote_address = connection.transport.getRemoteAddress
 
@@ -421,11 +421,12 @@ class OpenwireProtocolHandler extends Pr
 
   def on_connection_info(info: ConnectionInfo) = {
     val id = info.getConnectionId()
-    if (!all_connections.contains(id)) {
+    if (connection_context==null) {
       new ConnectionContext(info).attach
 
       security_context.user = info.getUserName
       security_context.password = info.getPassword
+      security_context.session_id = Some(sanitize_destination_part(info.getConnectionId.toString))
 
       reset {
         if( host.authenticator!=null &&  host.authorizer!=null ) {
@@ -458,7 +459,7 @@ class OpenwireProtocolHandler extends Pr
   def on_session_info(info: SessionInfo) = {
     val id = info.getSessionId();
     if (!all_sessions.contains(id)) {
-      val parent = all_connections.get(id.getParentId()).getOrElse(die("Cannot add a session
to a connection that had not been registered."))
+      val parent = get_context(id.getParentId())
       new SessionContext(parent, info).attach
     }
     ack(info);
@@ -484,7 +485,7 @@ class OpenwireProtocolHandler extends Pr
   }
 
   def on_destination_info(info:DestinationInfo) = {
-    val destinations = to_destination_dto(info.getDestination)
+    val destinations = to_destination_dto(info.getDestination, this)
 //    if( info.getDestination.isTemporary ) {
 //      destinations.foreach(_.temp_owner = connection.id)
 //    }
@@ -506,7 +507,7 @@ class OpenwireProtocolHandler extends Pr
 
   def on_remove_info(info: RemoveInfo) = {
     info.getObjectId match {
-      case id: ConnectionId => all_connections.get(id).foreach(_.dettach)
+      case id: ConnectionId => Option(connection_context).foreach(_.dettach)
       case id: SessionId => all_sessions.get(id).foreach(_.dettach)
       case id: ProducerId => all_producers.get(id).foreach(_.dettach)
       case id: ConsumerId => all_consumers.get(id).foreach(_.dettach )
@@ -516,8 +517,15 @@ class OpenwireProtocolHandler extends Pr
     ack(info)
   }
 
+  def get_context(id:ConnectionId) = {
+    if(connection_context!=null && connection_context.info.getConnectionId == id)
+      connection_context
+    else
+      die("Cannot add a session to a connection that had not been registered.")
+  }
+  
   def on_transaction_info(info:TransactionInfo) = {
-    val parent = all_connections.get(info.getConnectionId()).getOrElse(die("Cannot add a
session to a connection that had not been registered."))
+    val parent = get_context(info.getConnectionId())
     val id = info.getTransactionId
     info.getType match {
       case TransactionInfo.BEGIN =>
@@ -591,7 +599,7 @@ class OpenwireProtocolHandler extends Pr
 
   def perform_send(msg:ActiveMQMessage, uow:StoreUOW=null): Unit = {
 
-    val destiantion = to_destination_dto(msg.getDestination)
+    val destiantion = to_destination_dto(msg.getDestination, this)
     val key = destiantion.toList
     producerRoutes.get(key) match {
       case null =>
@@ -685,8 +693,8 @@ class OpenwireProtocolHandler extends Pr
   //      host.createQueue(destination);
   //      return ack(info);
   //  }
-
-  val all_connections = new HashMap[ConnectionId, ConnectionContext]();
+  var connection_context:ConnectionContext= null
+  
   val all_sessions = new HashMap[SessionId, SessionContext]();
   val all_producers = new HashMap[ProducerId, ProducerContext]();
   val all_consumers = new HashMap[ConsumerId, ConsumerContext]();
@@ -701,15 +709,18 @@ class OpenwireProtocolHandler extends Pr
     def default_session_id = new SessionId(info.getConnectionId(), -1)
 
     def attach = {
+      if( connection_context!=null ) {
+        die("Only one logic connection is supported.")
+      }
       // create the default session.
       new SessionContext(this, new SessionInfo(default_session_id)).attach
-      all_connections.put(info.getConnectionId, this)
+      connection_context = this
     }
 
     def dettach = {
       sessions.values.toArray.foreach(_.dettach)
       transactions.values.toArray.foreach(_.dettach)
-      all_connections.remove(info.getConnectionId)
+      connection_context = null
     }
   }
 
@@ -811,7 +822,7 @@ class OpenwireProtocolHandler extends Pr
     def attach = {
 
       if( info.getDestination == null ) fail("destination was not set")
-      destination = to_destination_dto(info.getDestination)
+      destination = to_destination_dto(info.getDestination, OpenwireProtocolHandler.this)
 
       // if they are temp dests.. attach our owner id so that we don't
       // get rejected.

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Jan 26 22:46:12 2012
@@ -55,6 +55,7 @@ object BufferSupport {
 }
 
 import BufferSupport._
+import PathParser._
 
 object StompProtocolHandler extends Log {
 
@@ -566,7 +567,6 @@ class StompProtocolHandler extends Proto
   val security_context = new SecurityContext
   var waiting_on: ()=>String = WAITING_ON_CLIENT_REQUEST
   var config:StompDTO = _
-  var session_id:String = _
 
   var protocol_filters = List[ProtocolFilter]()
 
@@ -575,6 +575,8 @@ class StompProtocolHandler extends Proto
 
   var codec:StompCodec = _
 
+  def session_id = security_context.session_id
+
   implicit def toDestinationDTO(value:AsciiBuffer):Array[DestinationDTO] = {
     val rc = destination_parser.decode_multi_destination(value.toString)
     if( rc==null ) {
@@ -584,7 +586,7 @@ class StompProtocolHandler extends Proto
       if( dest.temp() ) {
         temp_destination_map.getOrElseUpdate(dest, {
           import scala.collection.JavaConversions._
-          val real_path= ("temp" :: broker.id :: connection.id.toString :: dest.path.toList).toArray
+          val real_path= ("temp" :: broker.id :: session_id.get :: dest.path.toList).toArray
           dest match {
             case dest:QueueDestinationDTO => new QueueDestinationDTO( real_path ).temp(true)
             case dest:TopicDestinationDTO => new TopicDestinationDTO( real_path ).temp(true)
@@ -833,7 +835,6 @@ class StompProtocolHandler extends Proto
       case _ => None
     }
 
-    security_context.connection_id = Some(connection.id)
     security_context.local_address = connection.transport.getLocalAddress
     security_context.remote_address = connection.transport.getRemoteAddress
     security_context.user = get(headers, LOGIN).map(decode_header _).getOrElse(null)
@@ -892,9 +893,7 @@ class StompProtocolHandler extends Proto
       var connected_headers = ListBuffer((VERSION, protocol_version))
 
       connected_headers += SERVER->encode_header("apache-apollo/"+Broker.version)
-      val v = encode_header("%s-%x-".format(this.host.config.id, this.host.session_counter.incrementAndGet))
-      session_id = v.toString 
-      connected_headers += SESSION->v
+      connected_headers += SESSION->encode_header(session_id.get)
 
       val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
       connected_headers += HEART_BEAT->outbound_heart_beat_header
@@ -930,6 +929,7 @@ class StompProtocolHandler extends Proto
         noop
       } else {
         this.host=host
+        security_context.session_id = Some("%s-%x-".format(sanitize_destination_part(this.host.config.id),
this.host.session_counter.incrementAndGet))
         connection_log = host.connection_log
         if( host.authenticator!=null &&  host.authorizer!=null ) {
           suspend_read("authenticating and authorizing connect")
@@ -1060,7 +1060,7 @@ class StompProtocolHandler extends Proto
     // Do we need to add the message id?
     if( get( headers, MESSAGE_ID) == None ) {
       message_id_counter += 1
-      rc ::= (MESSAGE_ID -> ascii(session_id+message_id_counter))
+      rc ::= (MESSAGE_ID -> ascii(session_id.get+message_id_counter))
     }
 
     if( config.add_timestamp_header!=null ) {

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala
Thu Jan 26 22:46:12 2012
@@ -20,8 +20,8 @@ import java.util.LinkedList
 import java.util.regex._
 import collection.JavaConversions._
 import org.apache.activemq.apollo.util.path.PathParser.PartFilter
-import util.matching.Regex
 import collection.mutable.ListBuffer
+import org.fusesource.hawtbuf.{DataByteArrayOutputStream, AsciiBuffer}
 
 /**
   * Holds the delimiters used to parse paths.
@@ -108,6 +108,44 @@ object PathParser {
     }
   }
 
+
+  def sanitize_destination_part(value:String) = {
+    val buffer = new AsciiBuffer(value)
+    val rc = new StringBuffer(value.length())
+    var i = 0
+    val l = buffer.length
+    while( i < l ) {
+      val c = buffer.data(i)
+      if(
+        ('a' <= c && c <= 'z') ||
+        ('A' <= c && c <= 'Z') ||
+        ('0' <= c && c <= '9') ||
+        c=='_' || c=='-' || c=='~' || c==':'
+      ) {
+        rc.append(c.toChar)
+      } else {
+        rc.append("%%%02x".format(c))
+      }
+      i += 1
+    }
+    rc.toString
+  }
+
+  def unsanitize_destination_part(value:String):String = {
+    val rc = new DataByteArrayOutputStream
+    var pos = value
+    while( pos.length() > 0 ) {
+      if( pos.startsWith("%") && pos.length()> 3 ) {
+        val dec = pos.substring(1,3)
+        rc.writeByte(Integer.parseInt(dec, 16))
+        pos = pos.substring(3);
+      } else {
+        rc.writeByte(pos.charAt(0))
+        pos = pos.substring(1)
+      }
+    }
+    rc.toBuffer.utf8().toString
+  }
 }
 
 class PathParser {
@@ -117,7 +155,7 @@ class PathParser {
   var regex_wildcard_start = "{"
   var regex_wildcard_end = "}"
   var path_separator = "."
-  var part_pattern = Pattern.compile("[ a-zA-Z0-9\\_\\-\\%\\~]+")
+  var part_pattern = Pattern.compile("[ a-zA-Z0-9\\_\\-\\%\\~\\:]+")
 
   def copy(other:PathParser) = {
     any_descendant_wildcard = other.any_descendant_wildcard

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Thu Jan
26 22:46:12 2012
@@ -1569,6 +1569,7 @@ header in the STOMP `SUBSCRIBE` frame to
 ### Destination Name Restrictions
 
 Destination names are restricted to using the characters `a-z`, `A-Z`, `0-9`,
-`_`, `-` `%`, `~`, ' ', or `.` in addition to composite separator `,` and the wild
-card `*`.
+`_`, `-` `%`, `~`, `:`, ' ', or `.` in addition to composite separator `,` and the wild
+card `*`.  Any other characters must be UTF-8 and then URL encoded if you wish to 
+preserve their significance.
 



Mime
View raw message