From commits-return-19104-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Fri Jun 22 15:38:08 2012 Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 866D09B02 for ; Fri, 22 Jun 2012 15:38:08 +0000 (UTC) Received: (qmail 20101 invoked by uid 500); 22 Jun 2012 15:38:08 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 20065 invoked by uid 500); 22 Jun 2012 15:38:08 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 20058 invoked by uid 99); 22 Jun 2012 15:38:08 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jun 2012 15:38:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jun 2012 15:38:07 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3342923889BF for ; Fri, 22 Jun 2012 15:37:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1352930 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/test/scala/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ Date: Fri, 22 Jun 2012 15:37:46 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120622153747.3342923889BF@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Jun 22 15:37:45 2012 New Revision: 1352930 URL: http://svn.apache.org/viewvc?rev=1352930&view=rev Log: Fixes APLO-211: Incorrect dequeued counter for topics Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1352930&r1=1352929&r2=1352930&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Fri Jun 22 15:37:45 2012 @@ -439,7 +439,6 @@ class Topic(val router:LocalRouter, val def unbind (consumer:DeliveryConsumer, persistent:Boolean) = { for(proxy <- consumers.remove(consumer)) { - add_dequeue_counters(topic_metrics, proxy.link) val list = consumer_queues.remove(consumer) match { case Some(queue) => queue.unbind(List(consumer)) @@ -465,6 +464,7 @@ class Topic(val router:LocalRouter, val } List(queue) case None => + add_dequeue_counters(topic_metrics, proxy.link) List(consumer) } producers.keys.foreach({ r=> Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1352930&r1=1352929&r2=1352930&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Fri Jun 22 15:37:45 2012 @@ -141,6 +141,8 @@ class BrokerFunSuiteSupport extends FunS Option(broker.web_server).flatMap(_.uris().find(_.getScheme == scheme)).get } + def json(value:Any) = org.apache.activemq.apollo.dto.JsonCodec.encode(value).ascii().toString; + } class MultiBrokerTestSupport extends FunSuiteSupport { Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1352930&r1=1352929&r2=1352930&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Fri Jun 22 15:37:45 2012 @@ -21,13 +21,13 @@ import org.scalatest.BeforeAndAfterEach import java.lang.String import java.util.concurrent.TimeUnit._ import org.apache.activemq.apollo.util._ -import org.apache.activemq.apollo.dto.KeyStorageDTO import java.util.concurrent.atomic.AtomicLong import FileSupport._ import java.net.InetSocketAddress import java.nio.channels.DatagramChannel import org.fusesource.hawtbuf.AsciiBuffer import org.apache.activemq.apollo.broker._ +import org.apache.activemq.apollo.dto.{TopicStatusDTO, KeyStorageDTO} class StompTestSupport extends BrokerFunSuiteSupport with ShouldMatchers with BeforeAndAfterEach { @@ -72,6 +72,16 @@ class StompTestSupport extends BrokerFun c } + def disconnect(c: StompClient = client) = { + val rid = receipt_counter.incrementAndGet() + client.write( + "DISCONNECT\n" + + "receipt:"+rid+"\n" + + "\n") + wait_for_receipt(""+rid, c) + close(c) + } + def close(c: StompClient = client) = c.close() val receipt_counter = new AtomicLong() @@ -171,6 +181,25 @@ class StompTestSupport extends BrokerFun */ class StompMetricsTest extends StompTestSupport { + test("slow_consumer_policy='queue' metrics stay consistent on consumer close (APLO-211)") { + connect("1.1") + + subscribe("0", "/topic/queued.APLO-211", "client"); + async_send("/topic/queued.APLO-211", 1) + assert_received(1)(true) + + val stat1 = topic_status("queued.APLO-211").metrics + disconnect() + + within(3, SECONDS) { + val stat2 = topic_status("queued.APLO-211").metrics + stat2.producer_count should be(stat1.producer_count-1) + stat2.consumer_count should be(stat1.consumer_count-1) + stat2.enqueue_item_counter should be(stat1.enqueue_item_counter) + stat2.dequeue_item_counter should be(stat1.dequeue_item_counter) + } + } + test("Deleted qeueus are removed to aggregate queue-stats") { connect("1.1")