activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1352930 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/test/scala/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/
Date Fri, 22 Jun 2012 15:37:46 GMT
Author: chirino
Date: Fri Jun 22 15:37:45 2012
New Revision: 1352930

URL: http://svn.apache.org/viewvc?rev=1352930&view=rev
Log:
Fixes APLO-211: Incorrect dequeued counter for topics

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1352930&r1=1352929&r2=1352930&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Fri Jun 22 15:37:45 2012
@@ -439,7 +439,6 @@ class Topic(val router:LocalRouter, val 
   def unbind (consumer:DeliveryConsumer, persistent:Boolean) = {
 
     for(proxy <- consumers.remove(consumer)) {
-      add_dequeue_counters(topic_metrics, proxy.link)
       val list = consumer_queues.remove(consumer) match {
         case Some(queue) =>
           queue.unbind(List(consumer))
@@ -465,6 +464,7 @@ class Topic(val router:LocalRouter, val 
           }
           List(queue)
         case None =>
+          add_dequeue_counters(topic_metrics, proxy.link)
           List(consumer)
       }
       producers.keys.foreach({ r=>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1352930&r1=1352929&r2=1352930&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
Fri Jun 22 15:37:45 2012
@@ -141,6 +141,8 @@ class BrokerFunSuiteSupport extends FunS
     Option(broker.web_server).flatMap(_.uris().find(_.getScheme == scheme)).get
   }
 
+  def json(value:Any) = org.apache.activemq.apollo.dto.JsonCodec.encode(value).ascii().toString;
+
 }
 
 class MultiBrokerTestSupport extends FunSuiteSupport {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1352930&r1=1352929&r2=1352930&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Fri Jun 22 15:37:45 2012
@@ -21,13 +21,13 @@ import org.scalatest.BeforeAndAfterEach
 import java.lang.String
 import java.util.concurrent.TimeUnit._
 import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.dto.KeyStorageDTO
 import java.util.concurrent.atomic.AtomicLong
 import FileSupport._
 import java.net.InetSocketAddress
 import java.nio.channels.DatagramChannel
 import org.fusesource.hawtbuf.AsciiBuffer
 import org.apache.activemq.apollo.broker._
+import org.apache.activemq.apollo.dto.{TopicStatusDTO, KeyStorageDTO}
 
 class StompTestSupport extends BrokerFunSuiteSupport with ShouldMatchers with BeforeAndAfterEach
{
 
@@ -72,6 +72,16 @@ class StompTestSupport extends BrokerFun
     c
   }
 
+  def disconnect(c: StompClient = client) = {
+    val rid = receipt_counter.incrementAndGet()
+    client.write(
+      "DISCONNECT\n" +
+      "receipt:"+rid+"\n" +
+      "\n")
+    wait_for_receipt(""+rid, c)
+    close(c)
+  }
+
   def close(c: StompClient = client) = c.close()
 
   val receipt_counter = new AtomicLong()
@@ -171,6 +181,25 @@ class StompTestSupport extends BrokerFun
  */
 class StompMetricsTest extends StompTestSupport {
 
+  test("slow_consumer_policy='queue' metrics stay consistent on consumer close (APLO-211)")
{
+    connect("1.1")
+
+    subscribe("0", "/topic/queued.APLO-211", "client");
+    async_send("/topic/queued.APLO-211", 1)
+    assert_received(1)(true)
+
+    val stat1 = topic_status("queued.APLO-211").metrics
+    disconnect()
+
+    within(3, SECONDS) {
+      val stat2 = topic_status("queued.APLO-211").metrics
+      stat2.producer_count should be(stat1.producer_count-1)
+      stat2.consumer_count should be(stat1.consumer_count-1)
+      stat2.enqueue_item_counter should be(stat1.enqueue_item_counter)
+      stat2.dequeue_item_counter should be(stat1.dequeue_item_counter)
+    }
+  }
+
 
   test("Deleted qeueus are removed to aggregate queue-stats") {
     connect("1.1")



Mime
View raw message