activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1157716 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-util/src/main/scala/org/apache/activemq/apollo/util/
Date Mon, 15 Aug 2011 06:09:16 GMT
Author: chirino
Date: Mon Aug 15 06:09:16 2011
New Revision: 1157716

URL: http://svn.apache.org/viewvc?rev=1157716&view=rev
Log:
Better deal with the case where a connection tries to use a virtual host which is stopped.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Mon Aug 15 06:09:16 2011
@@ -875,20 +875,24 @@ class LocalRouter(val virtual_host:Virtu
   }
 
   def bind(destination: Array[DestinationDTO], consumer: DeliveryConsumer, security: SecurityContext)
= {
-    consumer.retain
-    val paths = destination.map(x=> (destination_parser.decode_path(x.path), x) )
-    dispatch_queue ! {
-      val failures = paths.flatMap(x=> domain(x._2).can_bind_all(x._1, x._2, consumer,
security) )
-      val rc = if( !failures.isEmpty ) {
-        Some(failures.mkString("; "))
-      } else {
-        paths.foreach { x=>
-          domain(x._2).bind(x._1, x._2, consumer, security)
+    if(!virtual_host.service_state.is_started) {
+      Some("virtual host stopped.")
+    } else {
+      consumer.retain
+      val paths = destination.map(x=> (destination_parser.decode_path(x.path), x) )
+      dispatch_queue ! {
+        val failures = paths.flatMap(x=> domain(x._2).can_bind_all(x._1, x._2, consumer,
security) )
+        val rc = if( !failures.isEmpty ) {
+          Some(failures.mkString("; "))
+        } else {
+          paths.foreach { x=>
+            domain(x._2).bind(x._1, x._2, consumer, security)
+          }
+          None
         }
-        None
+        consumer.release
+        rc
       }
-      consumer.release
-      rc
     }
   }
 
@@ -903,20 +907,24 @@ class LocalRouter(val virtual_host:Virtu
   }
 
   def connect(destinations: Array[DestinationDTO], producer: BindableDeliveryProducer, security:
SecurityContext) = {
-    producer.retain
-    val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
-    dispatch_queue ! {
-
-      val failures = paths.flatMap(x=> domain(x._2).can_connect_all(x._1, x._2, producer,
security) )
-      if( !failures.isEmpty ) {
-        producer.release
-        Some(failures.mkString("; "))
-      } else {
-        paths.foreach { x=>
-          domain(x._2).connect(x._1, x._2, producer, security)
+    if(!virtual_host.service_state.is_started) {
+      Some("virtual host stopped.")
+    } else {
+      producer.retain
+      val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
+      dispatch_queue ! {
+
+        val failures = paths.flatMap(x=> domain(x._2).can_connect_all(x._1, x._2, producer,
security) )
+        if( !failures.isEmpty ) {
+          producer.release
+          Some(failures.mkString("; "))
+        } else {
+          paths.foreach { x=>
+            domain(x._2).connect(x._1, x._2, producer, security)
+          }
+          producer.connected()
+          None
         }
-        producer.connected()
-        None
       }
     }
   }
@@ -932,28 +940,36 @@ class LocalRouter(val virtual_host:Virtu
   }
 
   def create(destinations:Array[DestinationDTO], security: SecurityContext) = dispatch_queue
! {
-    val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
-    val failures = paths.flatMap(x=> domain(x._2).can_create_destination(x._1, x._2, security)
)
-    if( !failures.isEmpty ) {
-      Some(failures.mkString("; "))
+    if(!virtual_host.service_state.is_started) {
+      Some("virtual host stopped.")
     } else {
-      paths.foreach { x=>
-        domain(x._2).create_destination(x._1, x._2, security)
+      val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
+      val failures = paths.flatMap(x=> domain(x._2).can_create_destination(x._1, x._2,
security) )
+      if( !failures.isEmpty ) {
+        Some(failures.mkString("; "))
+      } else {
+        paths.foreach { x=>
+          domain(x._2).create_destination(x._1, x._2, security)
+        }
+        None
       }
-      None
     }
   }
 
   def delete(destinations:Array[DestinationDTO], security: SecurityContext) = dispatch_queue
! {
-    val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
-    val failures = paths.flatMap(x=> domain(x._2).can_destroy_destination(x._1, x._2,
security) )
-    if( !failures.isEmpty ) {
-      Some(failures.mkString("; "))
+    if(!virtual_host.service_state.is_started) {
+      Some("virtual host stopped.")
     } else {
-      paths.foreach { x=>
-        domain(x._2).destroy_destination(x._1, x._2)
+      val paths = destinations.map(x=> (destination_parser.decode_path(x.path), x) )
+      val failures = paths.flatMap(x=> domain(x._2).can_destroy_destination(x._1, x._2,
security) )
+      if( !failures.isEmpty ) {
+        Some(failures.mkString("; "))
+      } else {
+        paths.foreach { x=>
+          domain(x._2).destroy_destination(x._1, x._2)
+        }
+        None
       }
-      None
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Mon Aug 15 06:09:16 2011
@@ -110,6 +110,10 @@ class VirtualHost(val broker: Broker, va
   var connection_log:Log = _
   var console_log:Log = _
 
+  // This gets set if client should get redirected to another address.
+  @volatile
+  var client_redirect:Option[String] = None
+
   override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
 
   /**

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Mon Aug 15 06:09:16 2011
@@ -400,6 +400,7 @@ object Stomp {
   val SESSION = ascii("session")
   val RESPONSE_ID = ascii("response-id")
   val SERVER = ascii("server")
+  val REDIRECT_HEADER = ascii("redirect")
 
   val BROWSER = ascii("browser")
   val EXCLUSIVE = ascii("exclusive")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Mon Aug 15 06:09:16 2011
@@ -548,6 +548,12 @@ class StompProtocolHandler extends Proto
     case x:Break=>
   }
 
+  private def async_die(headers:HeaderMap, body:String) = try {
+    die(headers, body)
+  } catch {
+    case x:Break=>
+  }
+
   private def die[T](msg:String, e:Throwable=null):T = {
     if( e!=null) {
       debug(e, "Shutting connection down due to: "+msg)
@@ -801,6 +807,11 @@ class StompProtocolHandler extends Proto
       if(host==null) {
         async_die("Invalid virtual host: "+host_header.get)
         noop
+      } else if(!host.service_state.is_started) {
+        var headers = (MESSAGE_HEADER, encode_header("Virtual host stopped")) :: Nil
+        host.client_redirect.foreach(x=> headers ::= REDIRECT_HEADER->encode_header(x)
)
+        async_die(headers, "")
+        noop
       } else {
         this.host=host
         if( host.authenticator!=null &&  host.authorizer!=null ) {

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala?rev=1157716&r1=1157715&r2=1157716&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
Mon Aug 15 06:09:16 2011
@@ -144,6 +144,8 @@ trait BaseService extends Service with D
               _service_state = new FAILED
               done
           }
+        case state:CREATED =>
+          done
         case state:STOPPED =>
           done
         case state:STOPPING =>



Mime
View raw message