activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1083007 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/
Date Fri, 18 Mar 2011 18:38:39 GMT
Author: chirino
Date: Fri Mar 18 18:38:38 2011
New Revision: 1083007

URL: http://svn.apache.org/viewvc?rev=1083007&view=rev
Log:
Consolidating the can_connect / can_bind logic in the LocalRouter.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1083007&r1=1083006&r2=1083007&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Fri Mar 18 18:38:38 2011
@@ -34,11 +34,9 @@ trait DomainDestination {
   def id:Long
   def name:String
 
-  def can_bind(destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Boolean
   def bind (destination:DestinationDTO, consumer:DeliveryConsumer)
   def unbind (consumer:DeliveryConsumer, persistent:Boolean)
 
-  def can_connect(destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean
   def connect (destination:DestinationDTO, producer:BindableDeliveryProducer)
   def disconnect (producer:BindableDeliveryProducer)
 
@@ -142,12 +140,12 @@ class LocalRouter(val host:VirtualHost) 
       // binds any matching wild card subs and producers...
       import JavaConversions._
       consumers_by_path.get( path ).foreach { x=>
-        if( dest.can_bind(x.destination, x.consumer, x.security) ) {
+        if( can_bind_one(path, x.destination, x.consumer, x.security) ) {
           dest.bind(x.destination, x.consumer)
         }
       }
       producers_by_path.get( path ).foreach { x=>
-        if( dest.can_connect(x.destination, x.producer, x.security) ) {
+        if( can_connect_one(path, x.destination, x.producer, x.security) ) {
           dest.connect(x.destination, x.producer)
         }
       }
@@ -157,8 +155,8 @@ class LocalRouter(val host:VirtualHost) 
       destination_by_path.remove(path, dest)
       destination_by_id.remove(dest.id)
     }
-
-    def can_bind(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Result[Zilch,
String] = {
+    def can_bind_one(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Boolean
+    def can_bind_all(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Result[Zilch,
String] = {
 
       val wildcard = PathParser.containsWildCards(path)
       var matches = get_destination_matches(path)
@@ -177,8 +175,8 @@ class LocalRouter(val host:VirtualHost) 
         }
 
         matches.foreach { dest =>
-          if( !dest.can_bind(destination, consumer, security) ) {
-            return Failure("Not authorized to reveive from the destination.")
+          if( !can_bind_one(path, destination, consumer, security) ) {
+            return Failure("Not authorized to receive from the destination.")
           }
         }
       }
@@ -188,7 +186,7 @@ class LocalRouter(val host:VirtualHost) 
     def bind(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Unit
= {
       var matches = get_destination_matches(path)
       matches.foreach { dest=>
-        if( dest.can_bind(destination, consumer, security) ) {
+        if( can_bind_one(path, destination, consumer, security) ) {
           dest.bind(destination, consumer)
         }
       }
@@ -213,13 +211,22 @@ class LocalRouter(val host:VirtualHost) 
 
     }
 
-    def can_connect(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer,
security:SecurityContext):Result[Zilch, String] = {
+    def can_connect_one(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer,
security:SecurityContext):Boolean
+
+    def can_connect_all(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer,
security:SecurityContext):Result[Zilch, String] = {
 
       val wildcard = PathParser.containsWildCards(path)
       var matches = get_destination_matches(path)
 
-      // Should we attempt to auto create the destination?
-      if( !wildcard ) {
+      if( wildcard ) {
+
+        // Wild card sends never fail authorization... since a destination
+        // may get crated later which the user is authorized to use.
+        Success(Zilch)
+
+      } else {
+
+        // Should we attempt to auto create the destination?
         if ( matches.isEmpty && auto_create_destinations ) {
           val rc = create_destination(path, security)
           if( rc.failed ) {
@@ -227,23 +234,24 @@ class LocalRouter(val host:VirtualHost) 
           }
           matches = get_destination_matches(path)
         }
+
         if( matches.isEmpty ) {
           return Failure("The destination does not exist.")
         }
 
-        matches.foreach { dest =>
-          if( !dest.can_connect(destination, producer, security) ) {
-            return Failure("Not authorized to send to the destination.")
-          }
+        // since this is not a wild card, we should have only matched one..
+        assert( matches.size == 1 )
+        if( !can_connect_one(path, destination, producer, security) ) {
+          return Failure("Not authorized to send to the destination.")
         }
-      }
-      Success(Zilch)
 
+        Success(Zilch)
+      }
     }
 
     def connect(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer,
security:SecurityContext):Unit = {
       get_destination_matches(path).foreach { dest=>
-        if( dest.can_connect(destination, producer, security) ) {
+        if( can_connect_one(path, destination, producer, security) ) {
           dest.connect(destination, producer)
         }
       }
@@ -262,29 +270,12 @@ class LocalRouter(val host:VirtualHost) 
   val topic_domain = new TopicDomain
   class TopicDomain extends Domain[Topic] {
 
-    val topic_id_counter = new LongCounter
+    val topic_id_counter = new LongCounter()
 
     // Stores durable subscription queues.
     val durable_subscriptions_by_path = new PathMap[Queue]()
     val durable_subscriptions_by_id = HashMap[(String,String), Queue]()
 
-
-    override def can_bind(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer,
security:SecurityContext):Result[Zilch, String] = {
-      var rc = super.can_bind(path, destination, consumer, security)
-      if( !rc.failed ) {
-        destination match {
-          case destination:DurableSubscriptionDestinationDTO=>
-            // So the user can subscribe to the topic.. but can he create durable sub??
-            val qc = DurableSubscriptionQueueBinding.create(destination).config(host).asInstanceOf[DurableSubscriptionDTO]
-            if( !can_create_ds(qc, security) ) {
-               return Failure("Not authorized to create the durable subscription.")
-            }
-          case _ =>
-        }
-      }
-      rc
-    }
-
     def get_or_create_durable_subscription(destination:DurableSubscriptionDestinationDTO):Queue
= {
       val key = (destination.client_id, destination.subscription_id)
       durable_subscriptions_by_id.get( key ).getOrElse {
@@ -363,6 +354,31 @@ class LocalRouter(val host:VirtualHost) 
       Success(topic)
     }
 
+    def can_bind_one(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Boolean
= {
+      val config = topic_config(path)
+      val authorizer = host.authorizer
+      if( authorizer!=null && security!=null && !authorizer.can_receive_from(security,
host, config) ) {
+        return false;
+      }
+
+      destination match {
+        case destination:DurableSubscriptionDestinationDTO=>
+          // So the user can subscribe to the topic.. but can he create durable sub??
+          val qc = DurableSubscriptionQueueBinding.create(destination).config(host).asInstanceOf[DurableSubscriptionDTO]
+          if( !can_create_ds(qc, security) ) {
+             return false;
+          }
+        case _ =>
+      }
+      true
+    }
+
+    def can_connect_one(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer,
security:SecurityContext):Boolean = {
+      val config = topic_config(path)
+      val authorizer = host.authorizer
+      !(authorizer!=null && security!=null && !authorizer.can_send_to(security,
host, config) )
+    }
+
   }
 
   val queue_domain = new QueueDomain
@@ -414,6 +430,31 @@ class LocalRouter(val host:VirtualHost) 
       }
 
     }
+
+    def can_bind_one(path:Path, dto:DestinationDTO, consumer:DeliveryConsumer, security:
SecurityContext):Boolean = {
+      val binding = QueueDomainQueueBinding.create(dto)
+      val config = binding.config(host)
+      if(  host.authorizer!=null && security!=null ) {
+        if( consumer.browser ) {
+          if( !host.authorizer.can_receive_from(security, host, config) ) {
+            return false;
+          }
+        } else {
+          if( !host.authorizer.can_consume_from(security, host, config) ) {
+            return false
+          }
+        }
+      }
+      return true;
+    }
+
+    def can_connect_one(path:Path, dto:DestinationDTO, producer:BindableDeliveryProducer,
security:SecurityContext):Boolean = {
+      val binding = QueueDomainQueueBinding.create(dto)
+      val config = binding.config(host)
+      val authorizer = host.authorizer
+      !( authorizer!=null && security!=null && !authorizer.can_send_to(security,
host, config) )
+    }
+
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -524,7 +565,7 @@ class LocalRouter(val host:VirtualHost) 
   //
   /////////////////////////////////////////////////////////////////////////////
 
-  def domain(destination: DestinationDTO) = destination match {
+  def domain(destination: DestinationDTO):Domain[_ <: DomainDestination] = destination
match {
     case x:TopicDestinationDTO => topic_domain
     case x:DurableSubscriptionDestinationDTO => topic_domain
     case x:QueueDestinationDTO => queue_domain
@@ -535,7 +576,7 @@ class LocalRouter(val host:VirtualHost) 
     consumer.retain
     val paths = destination.map(x=> (DestinationParser.decode_path(x.name), x) )
     dispatch_queue ! {
-      val failures = paths.map(x=> domain(x._2).can_bind(x._1, x._2, consumer, security)
).flatMap( _.failure_option )
+      val failures = paths.map(x=> domain(x._2).can_bind_all(x._1, x._2, consumer, security)
).flatMap( _.failure_option )
       val rc = if( !failures.isEmpty ) {
         Failure(failures.mkString("; "))
       } else {
@@ -564,7 +605,7 @@ class LocalRouter(val host:VirtualHost) 
     val paths = destinations.map(x=> (DestinationParser.decode_path(x.name), x) )
     dispatch_queue ! {
 
-      val failures = paths.map(x=> domain(x._2).can_connect(x._1, x._2, producer, security)
).flatMap( _.failure_option )
+      val failures = paths.map(x=> domain(x._2).can_connect_all(x._1, x._2, producer,
security) ).flatMap( _.failure_option )
 
       if( !failures.isEmpty ) {
         producer.release

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1083007&r1=1083006&r2=1083007&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Mar 18 18:38:38 2011
@@ -539,21 +539,6 @@ class Queue(val router: LocalRouter, val
 
   def disconnected() = throw new RuntimeException("unsupported")
 
-  def can_bind(destination:DestinationDTO, consumer:DeliveryConsumer, security: SecurityContext):Boolean
= {
-    if(  host.authorizer!=null && security!=null ) {
-      if( consumer.browser ) {
-        if( !host.authorizer.can_receive_from(security, host, config) ) {
-          return false;
-        }
-      } else {
-        if( !host.authorizer.can_consume_from(security, host, config) ) {
-          return false
-        }
-      }
-    }
-    return true;
-  }
-
   def bind(destination:DestinationDTO, consumer: DeliveryConsumer) = {
     bind(consumer::Nil)
   }
@@ -561,15 +546,6 @@ class Queue(val router: LocalRouter, val
     unbind(consumer::Nil)
   }
 
-  def can_connect(destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean
= {
-    val authorizer = host.authorizer
-    if( authorizer!=null && security!=null && !authorizer.can_send_to(security,
host, config) ) {
-      false
-    } else {
-      true
-    }
-  }
-
   def connect (destination:DestinationDTO, producer:BindableDeliveryProducer) = {
     import OptionSupport._
     if( config.unified.getOrElse(false) ) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1083007&r1=1083006&r2=1083007&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Fri Mar 18 18:38:38 2011
@@ -41,15 +41,6 @@ class Topic(val router:LocalRouter, val 
 
   def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
 
-  def can_bind(destination: DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext)
= {
-    val authorizer = router.host.authorizer
-    if( authorizer!=null && security!=null && !authorizer.can_receive_from(security,
router.host, config) ) {
-      false
-    } else {
-      true
-    }
-  }
-
   def is_same_ds(sub1:DurableSubscriptionDestinationDTO, sub2:DurableSubscriptionDestinationDTO)
= {
     (sub1.client_id, sub1.subscription_id) == (sub2.client_id, sub2.subscription_id)
   }
@@ -173,15 +164,6 @@ class Topic(val router:LocalRouter, val 
     }
   }
 
-  def can_connect(destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean
= {
-    val authorizer = router.host.authorizer
-    if( authorizer!=null && security!=null && !authorizer.can_send_to(security,
router.host, config) ) {
-      false
-    } else {
-      true
-    }
-  }
-
   def connect (destination:DestinationDTO, producer:BindableDeliveryProducer) = {
     producers += producer
     producer.bind(consumers.toList ::: durable_subscriptions.toList)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1083007&r1=1083006&r2=1083007&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Fri Mar 18 18:38:38 2011
@@ -1255,7 +1255,7 @@ class StompSecurityTest extends StompTes
 
     val frame = client.receive()
     frame should startWith("ERROR\n")
-    frame should include("message:Not authorized to reveive from the destination.\n")
+    frame should include("message:Not authorized to receive from the destination.\n")
   }
 
 //  test("Consume authorized and JMSXUserID is set on message") {



Mime
View raw message