activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1307020 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/test/scala/ apollo-stomp/src/test/resources/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/
Date Thu, 29 Mar 2012 17:59:13 GMT
Author: chirino
Date: Thu Mar 29 17:59:13 2012
New Revision: 1307020

URL: http://svn.apache.org/viewvc?rev=1307020&view=rev
Log:
Fixes APLO-170 : Aggregate destination metrics we incorrect even when queues are deleted when
they were not empty.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1307020&r1=1307019&r2=1307020&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Thu Mar 29 17:59:13 2012
@@ -32,6 +32,19 @@ import scala.collection.mutable.{HashSet
 
 object DestinationMetricsSupport {
 
+  def clear_non_counters(metrics:DestMetricsDTO) = {
+    metrics.queue_items = 0
+    metrics.queue_size = 0
+    metrics.producer_count = 0
+    metrics.consumer_count = 0
+    metrics.swapped_in_size_max = 0
+    metrics.swapped_in_size = 0
+    metrics.swapped_in_items = 0
+    metrics.swapping_in_size = 0
+    metrics.swapping_out_size = 0;
+    metrics.swapping_out_size = 0;
+  }
+
   def add_destination_metrics(to:DestMetricsDTO, from:DestMetricsDTO) = {
     to.enqueue_item_counter += from.enqueue_item_counter
     to.enqueue_size_counter += from.enqueue_size_counter
@@ -1279,15 +1292,29 @@ class LocalRouter(val virtual_host:Virtu
 
 
   def _destroy_queue(queue: Queue) {
-    queue.stop(dispatch_queue.runnable{
+    queue.stop(^{
+      var metrics = queue.get_queue_metrics
+      dispatch_queue {
 
-      queue.binding.unbind(this, queue)
-//      queues_by_binding.remove(queue.binding)
-      queues_by_store_id.remove(queue.store_id)
-      if (queue.tune_persistent) {
-        queue.dispatch_queue {
-          virtual_host.store.remove_queue(queue.store_id) {
-            x => Unit
+        queue.binding.unbind(this, queue)
+
+        for ( aggreator <-queue.address match {
+          case d:DurableSubscriptionDestinationDTO => Some(virtual_host.dead_dsub_metrics)
+          case t:TopicDestinationDTO => None
+          case _ => Some(virtual_host.dead_queue_metrics)
+        }) {
+
+          // Zero out all the NON counters since a removed queue is empty.
+          DestinationMetricsSupport.clear_non_counters(metrics)
+          DestinationMetricsSupport.add_destination_metrics(aggreator, metrics)
+        }
+
+        queues_by_store_id.remove(queue.store_id)
+        if (queue.tune_persistent) {
+          queue.dispatch_queue {
+            virtual_host.store.remove_queue(queue.store_id) {
+              x => Unit
+            }
           }
         }
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1307020&r1=1307019&r2=1307020&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Thu Mar 29 17:59:13 2012
@@ -472,14 +472,6 @@ class Queue(val router: LocalRouter, val
 
   def on_queue_flushed = {
     if(stop_listener_waiting_for_flush!=null) {
-      address match {
-        case d:DurableSubscriptionDestinationDTO =>
-          DestinationMetricsSupport.add_destination_metrics(virtual_host.dead_dsub_metrics,
get_queue_metrics)
-        case t:TopicDestinationDTO =>
-          // metrics are taken care of by topic
-        case _ =>
-          DestinationMetricsSupport.add_destination_metrics(virtual_host.dead_queue_metrics,
get_queue_metrics)
-      }
       stop_listener_waiting_for_flush.run()
       stop_listener_waiting_for_flush = null
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1307020&r1=1307019&r2=1307020&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
Thu Mar 29 17:59:13 2012
@@ -18,9 +18,9 @@ package org.apache.activemq.apollo.broke
 
 import org.apache.activemq.apollo.util.{ServiceControl, Logging, FunSuiteSupport}
 import java.net.InetSocketAddress
-import org.apache.activemq.apollo.dto.{QueueStatusDTO, TopicStatusDTO}
 import org.apache.activemq.apollo.util._
 import FileSupport._
+import org.apache.activemq.apollo.dto.{AggregateDestMetricsDTO, QueueStatusDTO, TopicStatusDTO}
 
 /**
  * <p>
@@ -70,6 +70,16 @@ class BrokerFunSuiteSupport extends FunS
     }.await()
   }
 
+  def delete_queue(name: String) = {
+    val host = broker.default_virtual_host
+    host.dispatch_queue.future {
+      val router = host.router.asInstanceOf[LocalRouter]
+      for( node<- router.local_queue_domain.destination_by_id.get(name) ) {
+        router._destroy_queue(node)
+      }
+    }.await()
+  }
+
   def topic_exists(name: String): Boolean = {
     val host = broker.default_virtual_host
     host.dispatch_queue.future {
@@ -86,6 +96,27 @@ class BrokerFunSuiteSupport extends FunS
     }
   }
 
+  def get_queue_metrics: AggregateDestMetricsDTO = {
+    val host = broker.default_virtual_host
+    sync(host) {
+      host.get_queue_metrics
+    }
+  }
+
+  def get_topic_metrics: AggregateDestMetricsDTO = {
+    val host = broker.default_virtual_host
+    sync(host) {
+      host.get_topic_metrics
+    }
+  }
+
+  def get_dsub_metrics: AggregateDestMetricsDTO = {
+    val host = broker.default_virtual_host
+    sync(host) {
+      host.get_dsub_metrics
+    }
+  }
+
   def queue_status(name: String): QueueStatusDTO = {
     val host = broker.default_virtual_host
     sync(host) {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1307020&r1=1307019&r2=1307020&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Thu Mar
29 17:59:13 2012
@@ -26,6 +26,7 @@
 
   </virtual_host>
 
+  <web_admin bind="http://0.0.0.0:61680"/>
   <connector id="tcp" bind="tcp://0.0.0.0:0"/>
   <connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1307020&r1=1307019&r2=1307020&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Thu Mar 29 17:59:13 2012
@@ -158,6 +158,72 @@ class StompTestSupport extends BrokerFun
  */
 class StompMetricsTest extends StompTestSupport {
 
+
+  test("Deleted qeueus are removed to aggregate queue-stats") {
+    connect("1.1")
+
+    val stat1 = get_queue_metrics
+
+    async_send("/queue/willdelete", 1)
+    async_send("/queue/willdelete", 2)
+    sync_send("/queue/willdelete", 3)
+
+    // not acked yet.
+    val stat2 = get_queue_metrics
+    stat2.producer_count should be(stat1.producer_count+1)
+    stat2.consumer_count should be(stat1.consumer_count)
+    stat2.enqueue_item_counter should be(stat1.enqueue_item_counter+3)
+    stat2.dequeue_item_counter should be(stat1.dequeue_item_counter+0)
+    stat2.queue_items should be(stat1.queue_items+3)
+
+    // Delete the queue
+    delete_queue("willdelete")
+
+    within(1, SECONDS) {
+      val stat3 = get_queue_metrics
+      stat3.producer_count should be(stat1.producer_count)
+      stat3.consumer_count should be(stat1.consumer_count)
+      stat3.enqueue_item_counter should be(stat1.enqueue_item_counter+3)
+      stat3.dequeue_item_counter should be(stat1.dequeue_item_counter)
+      stat3.queue_items should be(stat1.queue_items)
+    }
+  }
+
+  test("Old consumers on topic slow_consumer_policy='queue' does not affect the agregate
queue-metrics") {
+    connect("1.1")
+
+    subscribe("0", "/topic/queued.test1", "client");
+    sync_send("/topic/queued.test1", 1)
+
+    val stat1 = get_topic_metrics
+
+    async_send("/topic/queued.test1", 2)
+    async_send("/topic/queued.test1", 3)
+    val ack1 = assert_received(1)
+    val ack2 = assert_received(2)
+    val ack3 = assert_received(3)
+
+    // not acked yet.
+    val stat2 = get_topic_metrics
+    stat2.producer_count should be(stat1.producer_count)
+    stat2.consumer_count should be(stat1.consumer_count)
+    stat2.enqueue_item_counter should be(stat1.enqueue_item_counter+2)
+    stat2.dequeue_item_counter should be(stat1.dequeue_item_counter+0)
+    stat2.queue_items should be(stat1.queue_items+2)
+
+    // Close the subscription.
+    unsubscribe("0")
+
+    within(1, SECONDS) {
+      val stat3 = get_topic_metrics
+      stat3.producer_count should be(stat1.producer_count)
+      stat3.consumer_count should be(stat1.consumer_count-1)
+      stat3.enqueue_item_counter should be(stat1.enqueue_item_counter+2)
+      stat3.dequeue_item_counter should be(stat1.dequeue_item_counter+0)
+      stat3.queue_items should be(stat1.queue_items-1)
+    }
+  }
+
   test("New Topic Stats") {
     connect("1.1")
     subscribe("0", "/topic/newstats");



Mime
View raw message