activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1128403 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...
Date Fri, 27 May 2011 17:19:59 GMT
Author: chirino
Date: Fri May 27 17:19:58 2011
New Revision: 1128403

URL: http://svn.apache.org/viewvc?rev=1128403&view=rev
Log:
Implemented durable subscription updating.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1128403&r1=1128402&r2=1128403&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Fri May 27 17:19:58 2011
@@ -363,7 +363,7 @@ class LocalRouter(val virtual_host:Virtu
     }
 
     def bind_dsub(queue:Queue) = {
-
+      assert_executing
       val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
       val path = queue.binding.destination
       val wildcard = PathParser.containsWildCards(path)
@@ -382,23 +382,15 @@ class LocalRouter(val virtual_host:Virtu
     }
 
     def unbind_dsub(queue:Queue) = {
-
-      val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
+      assert_executing
+      val destination = queue.destination_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
       val path = queue.binding.destination
-      val wildcard = PathParser.containsWildCards(path)
       var matches = get_destination_matches(path)
 
-      // We may need to create the topic...
-      if( !wildcard && matches.isEmpty ) {
-        create_destination(path, destination, null)
-        matches = get_destination_matches(path)
-      }
-
       durable_subscriptions_by_path.remove(path, queue)
       durable_subscriptions_by_id.remove(destination.subscription_id)
 
       matches.foreach( _.unbind_durable_subscription(destination, queue) )
-
     }
 
     override def bind(path: Path, destination: DestinationDTO, consumer: DeliveryConsumer,
security: SecurityContext) {
@@ -406,10 +398,39 @@ class LocalRouter(val virtual_host:Virtu
         case destination:DurableSubscriptionDestinationDTO =>
 
           val key = destination.subscription_id
-          val queue = durable_subscriptions_by_id.get( key ).getOrElse {
-            _create_queue(QueueBinding.create(destination))
+          val queue = durable_subscriptions_by_id.get( key ) match {
+            case Some(queue) =>
+              // We may need to update the bindings...
+              if( queue.destination_dto != destination) {
+
+                val binding = QueueBinding.create(destination)
+                if( queue.tune_persistent && queue.store_id == -1 ) {
+
+                  val record = new QueueRecord
+                  record.key = queue.store_id
+                  record.binding_data = binding.binding_data
+                  record.binding_kind = binding.binding_kind
+
+                  // Update the bindings
+                  virtual_host.store.add_queue(record) { rc => Unit }
+                }
+
+                // and then rebind the queue in the router.
+                unbind_dsub(queue)
+                queue.binding = binding
+                bind_dsub(queue)
+
+                // Make sure the update is visible in the queue's thread context..
+                queue.dispatch_queue {
+                  queue.binding = binding
+                }
+              }
+              queue
+            case None =>
+              _create_queue(QueueBinding.create(destination))
           }
 
+
           // Typically durable subs are only consumed by one connection at a time. So collocate
the
           // queue onto the consumer's dispatch queue.
           queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1128403&r1=1128402&r2=1128403&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri May 27 17:19:58 2011
@@ -45,31 +45,26 @@ import Queue._
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val router: LocalRouter, val store_id:Long, val binding:QueueBinding, var config:QueueDTO)
extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService
with DomainDestination with Dispatched {
-
-  override def toString: String =  {
-    "Queue(id:%d, binding:%s)".format(id, binding)
-  }
+class Queue(val router: LocalRouter, val store_id:Long, var binding:QueueBinding, var config:QueueDTO)
extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService
with DomainDestination with Dispatched {
 
   def id = binding.id
 
+  override def toString: String =  id
+
   def virtual_host = router.virtual_host
 
   var inbound_sessions = Set[DeliverySession]()
   var all_subscriptions = Map[DeliveryConsumer, Subscription]()
   var exclusive_subscriptions = ListBuffer[Subscription]()
 
-  val filter = binding.message_filter
+  def filter = binding.message_filter
 
-  override val dispatch_queue: DispatchQueue = createQueue(binding.id);
+  override val dispatch_queue: DispatchQueue = createQueue(id);
   virtual_host.broker.init_dispatch_queue(dispatch_queue)
 
   def destination_dto: DestinationDTO = binding.binding_dto
 
-  dispatch_queue {
-    debug("created queue for: " + binding.id)
-  }
-
+  debug("created queue: " + id)
 
   override def dispose: Unit = {
     ack_source.cancel
@@ -534,6 +529,7 @@ class Queue(val router: LocalRouter, val
       for (consumer <- values) {
         val sub = new Subscription(this, consumer)
         sub.open
+        consumer.release()
       }
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1128403&r1=1128402&r2=1128403&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Fri May 27 17:19:58 2011
@@ -20,8 +20,8 @@ import _root_.org.fusesource.hawtbuf._
 import dto.{StompConnectionStatusDTO, StompDTO}
 import org.fusesource.hawtdispatch._
 
-import Buffer._
 import org.apache.activemq.apollo.broker._
+import Buffer._
 import java.lang.String
 import protocol.{ProtocolFilter, HeartBeatMonitor, ProtocolHandler}
 import security.SecurityContext
@@ -224,30 +224,31 @@ class StompProtocolHandler extends Proto
     val selector:(String, BooleanExpression),
     override val browser:Boolean,
     override val exclusive:Boolean
-  ) extends BaseRetained with DeliveryConsumer {
+  ) extends Retained with DeliveryConsumer {
+//  ) extends BaseRetained with DeliveryConsumer {
 
 ////  The following comes in handy if we need to debug the
 ////  reference counts of the consumers.
 //
-//    val r = new BaseRetained
-//
-//    def setDisposer(p1: Runnable): Unit = r.setDisposer(p1)
-//    def retained: Int =r.retained
-//
-//    def printST(name:String) = {
-//      val e = new Exception
-//      println(name+": ")
-//      println("  "+e.getStackTrace.drop(1).take(4).mkString("\n  "))
-//    }
-//
-//    def retain: Unit = {
-//      printST("retain")
-//      r.retain
-//    }
-//    def release: Unit = {
-//      printST("release")
-//      r.release
-//    }
+    val r = new BaseRetained
+
+    def setDisposer(p1: Runnable): Unit = r.setDisposer(p1)
+    def retained: Int =r.retained
+
+    def printST(name:String) = {
+      val e = new Exception
+      println(name+": ")
+      println("  "+e.getStackTrace.drop(1).take(4).mkString("\n  "))
+    }
+
+    def retain: Unit = {
+      printST("retain")
+      r.retain
+    }
+    def release: Unit = {
+      printST("release")
+      r.release
+    }
 
     val dispatch_queue = StompProtocolHandler.this.dispatchQueue
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1128403&r1=1128402&r2=1128403&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Fri May 27 17:19:58 2011
@@ -642,6 +642,68 @@ class DurableSubscriptionTest extends St
 
   override val broker_config_uri: String = "xml:classpath:apollo-stomp-bdb.xml"
 
+  test("Duplicate SUBSCRIBE updates durable subscription bindings") {
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/topic/a\n" +
+      "id:sub1\n" +
+      "persistent:true\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
+
+    def get(expected:String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\n"+expected)
+    }
+
+    // Validate that the durable sub is bound to /topic/a
+    client.write(
+      "SEND\n" +
+      "destination:/topic/a\n" +
+      "\n" +
+      "1\n")
+    get("1\n")
+
+    client.write(
+      "UNSUBSCRIBE\n" +
+      "id:sub1\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
+
+    // Switch the durable sub to /topic/b
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/topic/b\n" +
+      "id:sub1\n" +
+      "persistent:true\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
+
+    // all these should get dropped
+    for ( i <- 1 to 500 ) {
+      client.write(
+        "SEND\n" +
+        "destination:/topic/a\n" +
+        "\n" +
+        "DROPPED\n")
+    }
+
+    // Not this one.. it's on the updated topic
+    client.write(
+      "SEND\n" +
+      "destination:/topic/b\n" +
+      "\n" +
+      "2\n")
+    get("2\n")
+
+  }
+
   test("Two durable subs contain the same messages") {
     connect("1.1")
 

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1128403&r1=1128402&r2=1128403&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Fri May
27 17:19:58 2011
@@ -1074,6 +1074,16 @@ Similarly, you can also subscribe to the
     
     ^@
 
+Unlike typical STOMP subscriptions id's which are local to the STOMP 
+client's connection, the durable subscription id's are global across
+a virtual host.  If two different connections use the same durable
+subscription id, then messages from the subscription will get load
+balanced across the two connections.  If the second connection uses
+a different `destination` or `selector` header, then updates 
+the original subscription, and the original connection will
+subsequently only receive messages matching the updated
+destination or selector.
+
 ### Browsing Subscriptions
 
 A normal subscription on a queue will consume messages so that no other



Mime
View raw message