activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1158415 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/...
Date Tue, 16 Aug 2011 18:39:07 GMT
Author: chirino
Date: Tue Aug 16 18:39:07 2011
New Revision: 1158415

URL: http://svn.apache.org/viewvc?rev=1158415&view=rev
Log:
Adding support for temp queue/topics which only allow the connection which created it to consume
from it.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1158415&r1=1158414&r2=1158415&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Tue Aug 16 18:39:07 2011
@@ -29,6 +29,42 @@ import java.util.{Arrays, ArrayList}
 import collection.mutable.{LinkedHashMap, HashMap}
 import collection.{Iterable, JavaConversions}
 
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait RouterListener {
+  def on_create(path:Path, destination:DestinationDTO, security:SecurityContext)
+  def on_destroy(path:Path, destination:DestinationDTO, security:SecurityContext)
+  def close
+}
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object RouterListenerFactory {
+
+  trait Provider {
+    def create(router:Router):RouterListener
+  }
+
+  val providers = new ClassFinder[Provider]("META-INF/services/org.apache.activemq.apollo/router-listener-factory.index",classOf[Provider])
+
+  def create(router:Router):List[RouterListener] = {
+    providers.singletons.map(_.create(router))
+  }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait DomainDestination {
 
   def id:String
@@ -112,6 +148,8 @@ object LocalRouter extends Log {
 class LocalRouter(val virtual_host:VirtualHost) extends BaseService with Router with Dispatched
{
   import LocalRouter._
 
+  val router_listeners = RouterListenerFactory.create(this)
+
   def dispatch_queue:DispatchQueue = virtual_host.dispatch_queue
 
   def auto_create_destinations = {
@@ -146,7 +184,7 @@ class LocalRouter(val virtual_host:Virtu
     }
 
     def can_destroy_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Option[String]
-    def destroy_destination(path:Path, destination:DestinationDTO):Unit
+    def destroy_destination(path:Path, destination:DestinationDTO, security: SecurityContext):Unit
 
     def can_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Option[String]
     def create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[D,String]
@@ -183,6 +221,15 @@ class LocalRouter(val virtual_host:Virtu
     def can_bind_one(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Boolean
     def can_bind_all(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Option[String]
= {
 
+      // Only allow the owner to bind.
+      if( destination.temp_owner != null ) {
+        for( connection <- consumer.connection) {
+          if( connection.id != destination.temp_owner.longValue() ) {
+            return Some("Not authorized to receive from the temporary destination.")
+          }
+        }
+      }
+
       val wildcard = PathParser.containsWildCards(path)
       var matches = get_destination_matches(path)
 
@@ -369,11 +416,14 @@ class LocalRouter(val virtual_host:Virtu
       Some("Topic destroy not yet implemented.")
     }
 
-    def destroy_destination(path:Path, destination: DestinationDTO): Unit = {
+    def destroy_destination(path:Path, destination: DestinationDTO, security: SecurityContext):
Unit = {
       val matches = get_destination_matches(path)
-//        matches.foreach { dest =>
-//          remove_destination(dest.path, dest)
-//        }
+      matches.foreach { dest =>
+        for( l <- router_listeners) {
+          l.on_destroy(path, destination, security)
+        }
+//        remove_destination(dest.path, dest)
+      }
     }
 
     def can_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Option[String]
= {
@@ -402,6 +452,10 @@ class LocalRouter(val virtual_host:Virtu
 
       val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO],
()=>topic_config(path), path.toString(destination_parser), path)
       add_destination(path, topic)
+
+      for( l <- router_listeners) {
+        l.on_create(path, destination, security)
+      }
       Success(topic)
     }
 
@@ -656,10 +710,13 @@ class LocalRouter(val virtual_host:Virtu
       }
     }
 
-    def destroy_destination(path:Path, destination: DestinationDTO): Unit = {
+    def destroy_destination(path:Path, destination: DestinationDTO, security: SecurityContext):
Unit = {
       val matches = get_destination_matches(path)
-      matches.foreach { dest =>
-        _destroy_queue(dest)
+      matches.foreach { queue =>
+        for( l <- router_listeners) {
+          l.on_destroy(queue.binding.destination, queue.binding.binding_dto, security)
+        }
+        _destroy_queue(queue)
       }
     }
 
@@ -771,12 +828,19 @@ class LocalRouter(val virtual_host:Virtu
               x match {
                 case Some(record)=>
                   if( record.binding_kind == TempQueueBinding.TEMP_KIND ) {
-                    // Drop temp queues on restart..
+                    // These are temp queues create to topic subscriptions which
+                    // avoid blocking producers.
                     virtual_host.store.remove_queue(queue_key){x=> task.run}
                   } else {
-                    dispatch_queue {
-                      _create_queue(QueueBinding.create(record.binding_kind, record.binding_data),
queue_key)
-                      task.run
+                    var binding = QueueBinding.create(record.binding_kind, record.binding_data)
+                    if( binding.binding_dto.temp_owner != null ) {
+                      // These are the temp queues clients create.
+                      virtual_host.store.remove_queue(queue_key){x=> task.run}
+                    } else {
+                      dispatch_queue {
+                        _create_queue(binding, queue_key)
+                        task.run
+                      }
                     }
                   }
                 case _ => task.run
@@ -966,7 +1030,7 @@ class LocalRouter(val virtual_host:Virtu
         Some(failures.mkString("; "))
       } else {
         paths.foreach { x=>
-          domain(x._2).destroy_destination(x._1, x._2)
+          domain(x._2).destroy_destination(x._1, x._2, security)
         }
         None
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala?rev=1158415&r1=1158414&r2=1158415&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
Tue Aug 16 18:39:07 2011
@@ -39,8 +39,8 @@ class SecurityContext {
   var certificates:Array[X509Certificate] = _
   var local_address:SocketAddress = _
   var remote_address:SocketAddress = _
-
   var login_context:LoginContext = _
+  var connection_id:Option[Long] = None
 
   private var _principles = Set[PrincipalDTO]()
   private var _subject:Subject = _

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?rev=1158415&r1=1158414&r2=1158415&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
Tue Aug 16 18:39:07 2011
@@ -38,6 +38,15 @@ abstract public class DestinationDTO {
     @XmlElement(name = "path")
     public List<String> path = new ArrayList<String>();
 
+    /**
+     * If the destination is a temporary destination, then it
+     * will have temp_owner set to the owner of the connection
+     * id which owns the destination.  Only the owner will be allowed
+     * to consume from the destination.
+     */
+    @XmlAttribute(name="temp_owner")
+    public Long temp_owner;
+
     public DestinationDTO() {
     }
 
@@ -60,27 +69,33 @@ abstract public class DestinationDTO {
         return sb.toString();
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (!(o instanceof DestinationDTO)) return false;
-
-        DestinationDTO that = (DestinationDTO) o;
-
-        if (path != null ? !path.equals(that.path) : that.path != null) return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return path != null ? path.hashCode() : 0;
-    }
-
-    @Override
-    public String toString() {
-        return "DestinationDTO{" +
-                "path=" + path +
-                '}';
-    }
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof DestinationDTO)) return false;
+
+    DestinationDTO that = (DestinationDTO) o;
+
+    if (path != null ? !path.equals(that.path) : that.path != null)
+      return false;
+    if (temp_owner != null ? !temp_owner.equals(that.temp_owner) : that.temp_owner != null)
+      return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = path != null ? path.hashCode() : 0;
+    result = 31 * result + (temp_owner != null ? temp_owner.hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "DestinationDTO{" +
+            "path=" + path +
+            ", temp_owner=" + temp_owner +
+            '}';
+  }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1158415&r1=1158414&r2=1158415&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Tue Aug 16 18:39:07 2011
@@ -712,6 +712,7 @@ class StompProtocolHandler extends Proto
       case _ => None
     }
 
+    security_context.connection_id = Some(connection.id)
     security_context.local_address = connection.transport.getLocalAddress
     security_context.remote_address = connection.transport.getRemoteAddress
     security_context.user = get(headers, LOGIN).map(decode_header _).getOrElse(null)



Mime
View raw message