activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1240840 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/test/resources/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/
Date Sun, 05 Feb 2012 21:54:14 GMT
Author: chirino
Date: Sun Feb  5 21:54:13 2012
New Revision: 1240840

URL: http://svn.apache.org/viewvc?rev=1240840&view=rev
Log:
All topic consumer (including durable consumers) are now proxied so that the stats are more
consistent.  Durable sub updates now do not loose track of what they had previously subscribed
to.  Added some tests to verify topic stats are consistent.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1240840&r1=1240839&r2=1240840&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Sun Feb  5 21:54:13 2012
@@ -575,31 +575,58 @@ class LocalRouter(val virtual_host:Virtu
       assert_executing
       val address = queue.address.asInstanceOf[SubscriptionAddress]
       add_destination(address.path, queue)
+      bind_topics(queue, address, address.topics)
+    }
 
-      import collection.JavaConversions._
-      address.topics.foreach { topic =>
+    def rebind(queue:Queue, binding:DurableSubscriptionQueueBinding) = {
 
-        val wildcard = PathParser.containsWildCards(topic.path)
-        var matches = local_topic_domain.get_destination_matches(topic.path)
+      // Ok figure out what was added or removed.
+      val prev:Set[BindAddress] = queue.address.asInstanceOf[SubscriptionAddress].topics.toSet
+      val next:Set[BindAddress] = binding.address.topics.toSet
+      val existing = prev.intersect(next)
+      val added = next -- existing
+      val removed = prev -- next
 
-        // We may need to create the topic...
-        if( !wildcard && matches.isEmpty ) {
-          local_topic_domain.create_destination(topic, null)
-          matches = local_topic_domain.get_destination_matches(topic.path)
-        }
-        matches.foreach( _.bind_durable_subscription(address, queue) )
+      if(!added.isEmpty) {
+        bind_topics(queue, binding.address, added)
+      }
+      if(!removed.isEmpty) {
+        unbind_topics(queue, removed)
+      }
+
+      // Make sure the update is visible in the queue's thread context..
+      queue.binding = binding
+      queue.dispatch_queue {
+        queue.binding = binding
       }
     }
 
     def unbind(queue:Queue) = {
       assert_executing
       val address = queue.address.asInstanceOf[SubscriptionAddress]
+      unbind_topics(queue, address.topics)
+      remove_destination(address.path, queue)
+    }
 
-      address.topics.foreach { topic =>
+    def unbind_topics(queue: Queue, topics: Traversable[_ <: BindAddress]) {
+      topics.foreach { topic =>
         var matches = local_topic_domain.get_destination_matches(topic.path)
-        matches.foreach( _.unbind_durable_subscription(queue) )
+        matches.foreach(_.unbind_durable_subscription(queue))
+      }
+    }
+
+    def bind_topics(queue: Queue, address: SubscriptionAddress, topics: Traversable[_ <:
BindAddress]) {
+      topics.foreach { topic =>
+        val wildcard = PathParser.containsWildCards(topic.path)
+        var matches = local_topic_domain.get_destination_matches(topic.path)
+
+        // We may need to create the topic...
+        if (!wildcard && matches.isEmpty) {
+          local_topic_domain.create_destination(topic, null)
+          matches = local_topic_domain.get_destination_matches(topic.path)
+        }
+        matches.foreach(_.bind_durable_subscription(address, queue))
       }
-      remove_destination(address.path, queue)
     }
 
     def destroy_destination(address: DestinationAddress, security: SecurityContext): Unit
= {
@@ -666,14 +693,8 @@ class LocalRouter(val virtual_host:Virtu
               }
 
               // and then rebind the queue in the router.
-              unbind(queue)
-              queue.binding = binding
-              bind(queue)
-
-              // Make sure the update is visible in the queue's thread context..
-              queue.dispatch_queue {
-                queue.binding = binding
-              }
+              rebind(queue, binding)
+
             }
           case _ =>
         }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1240840&r1=1240839&r2=1240840&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Sun Feb  5 21:54:13 2012
@@ -17,7 +17,6 @@
 package org.apache.activemq.apollo.broker
 
 import org.apache.activemq.apollo.util._
-import path.Path
 import scala.collection.immutable.List
 import org.apache.activemq.apollo.dto._
 import java.util.concurrent.TimeUnit
@@ -300,7 +299,7 @@ class Topic(val router:LocalRouter, val 
   }
 
   def check_idle {
-    if (producers.isEmpty && consumers.isEmpty && durable_subscriptions.isEmpty)
{
+    if (producers.isEmpty && consumers.isEmpty) {
       if (idled_at==0) {
         val previously_idle_at = now
         idled_at = previously_idle_at
@@ -321,8 +320,8 @@ class Topic(val router:LocalRouter, val 
   def bind(address: BindAddress, consumer:DeliveryConsumer) = {
 
     val target = address.domain match {
-      case "queue"=>
-        // this is the mirrored queue case..
+      case "queue" | "dsub"=>
+        // durable sub or mirrored queue case.
         consumer
       case "topic"=>
         slow_consumer_policy match {
@@ -362,6 +361,10 @@ class Topic(val router:LocalRouter, val 
             link.kind = "queue"
             link.id = queue.id
             link.label = queue.id
+          case x:DurableSubscriptionQueueBinding =>
+            link.kind = "dsub"
+            link.id = queue.id
+            link.label = queue.id
         }
       case _ =>
         for(connection <- target.connection) {
@@ -420,31 +423,15 @@ class Topic(val router:LocalRouter, val 
   def bind_durable_subscription(address: SubscriptionAddress, queue:Queue)  = {
     if( !durable_subscriptions.contains(queue) ) {
       durable_subscriptions += queue
-      val list = List(queue)
-      producers.keys.foreach({ r=>
-        r.bind(list)
-      })
-      consumer_queues.foreach{case (consumer, q)=>
-        if( q==queue ) {
-          bind(address, consumer)
-        }
-      }
+      bind(address, queue)
     }
     check_idle
   }
 
   def unbind_durable_subscription(queue:Queue)  = {
     if( durable_subscriptions.contains(queue) ) {
+      unbind(queue, false)
       durable_subscriptions -= queue
-      val list = List(queue)
-      producers.keys.foreach({ r=>
-        r.unbind(list)
-      })
-      consumer_queues.foreach{case (consumer, q)=>
-        if( q==queue ) {
-          unbind(consumer, false)
-        }
-      }
     }
     check_idle
   }
@@ -462,7 +449,7 @@ class Topic(val router:LocalRouter, val 
     }
     producers.put(producer, link)
     topic_metrics.producer_counter += 1
-    producer.bind(producer_tracker::consumers.values.toList ::: durable_subscriptions.toList)
+    producer.bind(producer_tracker::consumers.values.toList )
     check_idle
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1240840&r1=1240839&r2=1240840&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Sun Feb
 5 21:54:13 2012
@@ -22,6 +22,7 @@
     <host_name>localhost</host_name>
 
     <queue name="mirrored.**" mirrored="true"/>
+    <topic name="queued.**" slow_consumer_policy="queue"/>
 
   </virtual_host>
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1240840&r1=1240839&r2=1240840&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Sun Feb  5 21:54:13 2012
@@ -19,14 +19,14 @@ package org.apache.activemq.apollo.stomp
 import org.scalatest.matchers.ShouldMatchers
 import org.scalatest.BeforeAndAfterEach
 import java.lang.String
-import org.apache.activemq.apollo.util.{FileSupport, Logging, FunSuiteSupport, ServiceControl}
-import FileSupport._
-import org.apache.activemq.apollo.dto.KeyStorageDTO
 import java.net.InetSocketAddress
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.broker.{LocalRouter, KeyStorage, Broker, BrokerFactory}
-import org.fusesource.hawtbuf.Buffer._
 import java.util.concurrent.TimeUnit._
+import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.dto.{QueueStatusDTO, TopicStatusDTO, KeyStorageDTO}
+import java.util.concurrent.atomic.AtomicLong
+import FileSupport._
 
 class StompTestSupport extends FunSuiteSupport with ShouldMatchers with BeforeAndAfterEach
with Logging {
   var broker: Broker = null
@@ -88,6 +88,75 @@ class StompTestSupport extends FunSuiteS
     c
   }
 
+  val receipt_counter = new AtomicLong()
+
+  def sync_send(dest:String, body:Any, c: StompClient = client) = {
+    val rid = receipt_counter.incrementAndGet()
+    c.write(
+      "SEND\n" +
+      "destination:"+dest+"\n" +
+      "receipt:"+rid+"\n" +
+      "\n" +
+      body)
+    wait_for_receipt(""+rid, c)
+  }
+
+  def async_send(dest:String, body:Any, c: StompClient = client) = {
+    c.write(
+      "SEND\n" +
+      "destination:"+dest+"\n" +
+      "\n" +
+      body)
+  }
+
+  def subscribe(id:String, dest:String, mode:String="auto", persistent:Boolean=false, c:
StompClient = client) = {
+    val rid = receipt_counter.incrementAndGet()
+    c.write(
+      "SUBSCRIBE\n" +
+      "destination:"+dest+"\n" +
+      "id:"+id+"\n" +
+      (if(persistent) "persistent:true\n" else "")+
+      "ack:"+mode+"\n"+
+      "receipt:"+rid+"\n" +
+      "\n")
+    wait_for_receipt(""+rid, c)
+  }
+
+  def unsubscribe(id:String, c: StompClient = client) = {
+    val rid = receipt_counter.incrementAndGet()
+    client.write(
+      "UNSUBSCRIBE\n" +
+      "id:"+id+"\n" +
+      "receipt:"+rid+"\n" +
+      "\n")
+    wait_for_receipt(""+rid, c)
+  }
+
+  def assert_received(body:Any, sub:String=null, c: StompClient=client)={
+    val frame = c.receive()
+    frame should startWith("MESSAGE\n")
+    if(sub!=null) {
+      frame should include ("subscription:"+sub+"\n")
+    }
+    body match {
+      case null =>
+      case body:scala.util.matching.Regex => frame should endWith regex(body)
+      case body => frame should endWith("\n\n"+body)
+    }
+    // return a func that can ack the message.
+    ()=> {
+      val sub_regex = """(?s).*\nsubscription:([^\n]+)\n.*""".r
+      val msgid_regex = """(?s).*\nmessage-id:([^\n]+)\n.*""".r
+      val sub_regex(sub) = frame
+      val msgid_regex(msgid) = frame
+      c.write(
+        "ACK\n" +
+        "subscription:"+sub+"\n" +
+        "message-id:"+msgid+"\n" +
+        "\n")
+    }
+  }
+
   def wait_for_receipt(id:String, c: StompClient = client): Unit = {
     val frame = c.receive()
     frame should startWith("RECEIPT\n")
@@ -95,7 +164,7 @@ class StompTestSupport extends FunSuiteS
   }
 
   def queue_exists(name:String):Boolean = {
-    val host = broker.virtual_hosts.get(ascii("default")).get
+    val host = broker.default_virtual_host
     host.dispatch_queue.future {
       val router = host.router.asInstanceOf[LocalRouter]
       router.local_queue_domain.destination_by_id.get(name).isDefined
@@ -103,15 +172,192 @@ class StompTestSupport extends FunSuiteS
   }
 
   def topic_exists(name:String):Boolean = {
-    val host = broker.virtual_hosts.get(ascii("default")).get
+    val host = broker.default_virtual_host
     host.dispatch_queue.future {
       val router = host.router.asInstanceOf[LocalRouter]
       router.local_topic_domain.destination_by_id.get(name).isDefined
     }.await()
   }
 
+  def topic_status(name:String):TopicStatusDTO = {
+    val host = broker.default_virtual_host
+    sync(host) {
+      val router = host.router.asInstanceOf[LocalRouter]
+      router.local_topic_domain.destination_by_id.get(name).get.status
+    }
+  }
+
+  def queue_status(name:String):QueueStatusDTO = {
+    val host = broker.default_virtual_host
+    sync(host) {
+      val router = host.router.asInstanceOf[LocalRouter]
+      router.local_queue_domain.destination_by_id.get(name).get.status(false)
+    }
+  }
+
+  def dsub_status(name:String):QueueStatusDTO = {
+    val host = broker.default_virtual_host
+    sync(host) {
+      val router = host.router.asInstanceOf[LocalRouter]
+      router.local_dsub_domain.destination_by_id.get(name).get.status(false)
+    }
+  }
+
 }
 
+/**
+ * These test cases check to make sure the broker stats are consistent with what
+ * would be expected.
+ */
+class StompMetricsTest extends StompTestSupport {
+
+  test("Topic Stats") {
+    connect("1.1")
+
+    sync_send("/topic/stats", 1)
+    val stat1 = topic_status("stats")
+    stat1.producers.size() should be(1)
+    stat1.consumers.size() should be(0)
+    stat1.dsubs.size() should be(0)
+    stat1.metrics.enqueue_item_counter should be(1)
+    stat1.metrics.dequeue_item_counter should be(0)
+    stat1.metrics.queue_items should be(0)
+
+    subscribe("0", "/topic/stats");
+    async_send("/topic/stats", 2)
+    async_send("/topic/stats", 3)
+    assert_received(2)
+    assert_received(3)
+
+    val stat2 = topic_status("stats")
+    stat2.producers.size() should be(1)
+    stat2.consumers.size() should be(1)
+    stat2.dsubs.size() should be(0)
+    stat2.metrics.enqueue_item_counter should be(3)
+    stat2.metrics.dequeue_item_counter should be(2)
+    stat2.metrics.queue_items should be(0)
+    client.close()
+
+    within(1, SECONDS) {
+      val stat3 = topic_status("stats")
+      stat3.producers.size() should be(0)
+      stat3.consumers.size() should be(0)
+      stat3.dsubs.size() should be(0)
+      stat3.metrics.enqueue_item_counter should be(3)
+      stat3.metrics.dequeue_item_counter should be(2)
+      stat3.metrics.queue_items should be(0)
+    }
+  }
+
+  test("Topic slow_consumer_policy='queue' Stats") {
+    connect("1.1")
+
+    sync_send("/topic/queued.stats", 1)
+    val stat1 = topic_status("queued.stats")
+    stat1.producers.size() should be(1)
+    stat1.consumers.size() should be(0)
+    stat1.dsubs.size() should be(0)
+    stat1.metrics.enqueue_item_counter should be(1)
+    stat1.metrics.dequeue_item_counter should be(0)
+    stat1.metrics.queue_items should be(0)
+
+    subscribe("0", "/topic/queued.stats", "client");
+    async_send("/topic/queued.stats", 2)
+    async_send("/topic/queued.stats", 3)
+    val ack2 = assert_received(2)
+    val ack3 = assert_received(3)
+
+    // not acked yet.
+    val stat2 = topic_status("queued.stats")
+    stat2.producers.size() should be(1)
+    stat2.consumers.size() should be(1)
+    stat2.dsubs.size() should be(0)
+    stat2.metrics.enqueue_item_counter should be(3)
+    stat2.metrics.dequeue_item_counter should be(0)
+    stat2.metrics.queue_items should be(2)
+
+    // Ack now..
+    ack2() ; ack3()
+
+    within(1, SECONDS) {
+      val stat3 = topic_status("queued.stats")
+      stat3.producers.size() should be(1)
+      stat3.consumers.size() should be(1)
+      stat3.dsubs.size() should be(0)
+      stat3.metrics.enqueue_item_counter should be(3)
+      stat3.metrics.dequeue_item_counter should be(2)
+      stat3.metrics.queue_items should be(0)
+    }
+
+    unsubscribe("0")
+    client.close()
+    within(1, SECONDS) {
+      val stat4 = topic_status("queued.stats")
+      stat4.producers.size() should be(0)
+      stat4.consumers.size() should be(0)
+      stat4.dsubs.size() should be(0)
+      stat4.metrics.enqueue_item_counter should be(3)
+      stat4.metrics.dequeue_item_counter should be(2)
+      stat4.metrics.queue_items should be(0)
+    }
+  }
+
+  test("Topic Durable Sub Stats.") {
+    connect("1.1")
+
+    sync_send("/topic/dsubed.stats", 1)
+    val stat1 = topic_status("dsubed.stats")
+    stat1.producers.size() should be(1)
+    stat1.consumers.size() should be(0)
+    stat1.dsubs.size() should be(0)
+    stat1.metrics.enqueue_item_counter should be(1)
+    stat1.metrics.dequeue_item_counter should be(0)
+    stat1.metrics.queue_items should be(0)
+    
+    subscribe("dsub1", "/topic/dsubed.stats", "client", true);
+    async_send("/topic/dsubed.stats", 2)
+    async_send("/topic/dsubed.stats", 3)
+    val ack2 = assert_received(2)
+    val ack3 = assert_received(3)
+
+    // not acked yet.
+    val stat2 = topic_status("dsubed.stats")
+    stat2.producers.size() should be(1)
+    stat2.consumers.size() should be(1)
+    stat2.dsubs.size() should be(1)
+    stat2.metrics.enqueue_item_counter should be(3)
+    stat2.metrics.dequeue_item_counter should be(0)
+    stat2.metrics.queue_items should be(2)
+
+    // Ack SOME now..
+    ack2();
+
+    within(1, SECONDS) {
+      val stat3 = topic_status("dsubed.stats")
+      stat3.producers.size() should be(1)
+      stat3.consumers.size() should be(1)
+      stat3.dsubs.size() should be(1)
+      stat3.metrics.enqueue_item_counter should be(3)
+      stat3.metrics.dequeue_item_counter should be(1)
+      stat3.metrics.queue_items should be(1)
+    }
+
+    unsubscribe("dsub1")
+    client.close()
+    within(1, SECONDS) {
+      val stat4 = topic_status("dsubed.stats")
+      stat4.producers.size() should be(0)
+      stat4.consumers.size() should be(1)
+      stat4.dsubs.size() should be(1)
+      stat4.metrics.enqueue_item_counter should be(3)
+      stat4.metrics.dequeue_item_counter should be(1)
+      stat4.metrics.queue_items should be(1)
+    }
+  }
+
+}
+
+
 class Stomp10ConnectTest extends StompTestSupport {
 
   test("Stomp 1.0 CONNECT") {
@@ -248,7 +494,6 @@ class Stomp11HeartBeatTest extends Stomp
   }
 
 }
-
 class StompDestinationTest extends StompTestSupport {
 
   // This is the test case for https://issues.apache.org/jira/browse/APLO-88



Mime
View raw message