activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1002779 - in /activemq/activemq-apollo/trunk: apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/
Date Wed, 29 Sep 2010 17:40:19 GMT
Author: chirino
Date: Wed Sep 29 17:40:19 2010
New Revision: 1002779

URL: http://svn.apache.org/viewvc?rev=1002779&view=rev
Log:
Decoupling the deep queue scenario a bit from the base class.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/Metric.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricAggregator.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricCounter.java

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1002779&r1=1002778&r2=1002779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
Wed Sep 29 17:40:19 2010
@@ -40,8 +40,6 @@ abstract class BrokerPerfSupport extends
   var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "6"))
   var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
 
-  var MAX_MESSAGES = 0
-
   protected var TCP = true // Set to use tcp IO
 
   var USE_KAHA_DB = true
@@ -69,7 +67,9 @@ abstract class BrokerPerfSupport extends
   protected var destCount = 0
 
   protected var totalProducerRate:MetricAggregator = null
-  protected var totalConsumerRate:MetricAggregator = null 
+  protected var totalConsumerRate:MetricAggregator = null
+  var totalMessageSent = 0L
+  var totalMessageReceived = 0L
 
   protected var sendBroker: Broker = null
   protected var rcvBroker: Broker = null
@@ -97,7 +97,9 @@ abstract class BrokerPerfSupport extends
     sendBroker=null
     producerCount = 0
     consumerCount = 0
-    destCount =0
+    destCount = 0
+    totalMessageSent = 0
+    totalMessageReceived = 0
   }
 
   override protected def beforeAll(configMap: Map[String, Any]) = {
@@ -274,7 +276,6 @@ abstract class BrokerPerfSupport extends
     consumer.destination = destination
     consumer.name = "Consumer:" + (i + 1)
     consumer.rateAggregator = totalConsumerRate
-    consumer.maxMessages = MAX_MESSAGES    
     consumer.init
     
     return consumer
@@ -296,7 +297,6 @@ abstract class BrokerPerfSupport extends
     producer.messageIdGenerator = msgIdGenerator
     producer.rateAggregator = totalProducerRate
     producer.payloadSize = MESSAGE_SIZE
-    producer.maxMessages = MAX_MESSAGES
     producer.init
     producer
   }
@@ -345,33 +345,14 @@ abstract class BrokerPerfSupport extends
     tracker.await
   }
 
-  def messagesSent() : Long = {
-    var sum = 0
-    producers.foreach((producer:RemoteConnection) => sum += producer.messageCount)
-    sum
-  }
+  def fixed_sampling = true
+  def keep_sampling = false
 
-  def messagesReceived() : Long = {
-    var sum = 0
-    consumers.foreach((consumer:RemoteConnection) => sum += consumer.messageCount)
-    sum
-  }
-  
   def reportRates() = {
 
-    println("Warming up...")
-    Thread.sleep(SAMPLE_PERIOD)
-    totalProducerRate.reset()
-    totalConsumerRate.reset()
-
-    println("Sampling rates")
-
     case class Summary(producer:java.lang.Float, pdev:java.lang.Float, consumer:java.lang.Float,
cdev:java.lang.Float)
-
     var best = 0
-
     import scala.collection.mutable.ArrayBuffer
-
     val sample_rates = new ArrayBuffer[Summary]()
 
     def fillRateSummary(i: Int): Unit = {
@@ -392,32 +373,29 @@ abstract class BrokerPerfSupport extends
         best = i
       }
 
-      totalProducerRate.reset()
-      totalConsumerRate.reset()
+      totalMessageSent += totalProducerRate.reset()
+      totalMessageReceived += totalConsumerRate.reset()
     }
 
-    // either we want to do x number of samples or sample over the course of x number of
messages
-    if ( MAX_MESSAGES == 0 ) {
+    // either we want to do x number of samples or we want to keep sampling while some condition
is true.
+    if ( fixed_sampling ) {
+
+      // Do 1 period of warm up that's not counted...
+      println("Warming up...")
+      Thread.sleep(SAMPLE_PERIOD)
+      totalMessageSent +=  totalProducerRate.reset()
+      totalMessageSent +=  totalConsumerRate.reset()
+
+      println("Sampling rates")
       for (i <- 0 until PERFORMANCE_SAMPLES) {
         fillRateSummary(i)
       }
     } else {
-      var clientsRunning = true
+      println("Sampling rates")
       var i = 0
-      
-      while (clientsRunning) {
+      while( keep_sampling ) {
         fillRateSummary(i)
-        i = i + 1
-        clientsRunning = false
-
-        def checkForRunningClients(connection: Connection) = {
-          if (connection.stopped == false) {
-            clientsRunning = true
-          }
-        }
-
-        producers.foreach(checkForRunningClients)
-        consumers.foreach(checkForRunningClients)
+        i += 1
       }
     }
 
@@ -448,7 +426,6 @@ abstract class RemoteConnection extends 
   var destination: Destination = null
 
   var messageCount = 0
-  var maxMessages = 0
 
   def init = {
     if( rate.getName == null ) {
@@ -500,16 +477,6 @@ abstract class RemoteConnection extends 
 
   protected def incrementMessageCount() = {
     messageCount = messageCount + 1
-    if( maxMessages > 0 ) {
-      if ( messageCount % (maxMessages / 10) == 0 ) {
-        trace(name + " message count : " + messageCount)
-      }
-      if (messageCount == maxMessages) {
-        trace(name + " message count (" + messageCount + ") max (" + maxMessages + ") reached,
stopping connection")
-        doStop
-      }
-
-    }
   }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala?rev=1002779&r1=1002778&r2=1002779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
Wed Sep 29 17:40:19 2010
@@ -27,6 +27,21 @@ import java.net.URL
 trait DeepQueueScenarios extends PersistentScenario {
 
   PERSISTENT = true
+  val MIN_MESSAGES = 100000
+
+  override def fixed_sampling = false
+
+  override def keep_sampling:Boolean = {
+    if( producerCount > 0 && totalMessageSent < MIN_MESSAGES ) {
+      println("Waiting for producers: %s/%s".format(totalMessageSent, MIN_MESSAGES));
+      return true
+    }
+    if ( consumerCount > 0 && totalMessageReceived < MIN_MESSAGES ) {
+      println("Waiting for consumers: %s/%s".format(totalMessageReceived, MIN_MESSAGES));
+      return true
+    }
+    return false
+  }
 
   override def reportResourceTemplate():URL = { classOf[DeepQueueScenarios].getResource("persistent-report.html")
}
 
@@ -36,12 +51,8 @@ trait DeepQueueScenarios extends Persist
 
   for ( load <- partitionedLoad ; messageSize <- List(20,1024)  ) {
 
-    val totalMessages = 100000
-    val numMessages = totalMessages / load
-
     def benchmark(name: String)(func: => Unit) {
       test(name) {
-        MAX_MESSAGES = numMessages
         PTP = true
         MESSAGE_SIZE = messageSize
         destCount = 1;
@@ -49,7 +60,7 @@ trait DeepQueueScenarios extends Persist
       }
     }
 
-    val info = "queue " + numMessages + " " + (if((messageSize%1024)==0) (messageSize/1024)+"k"
else messageSize+"b" ) + " with " + load + " "
+    val info = "queue " + MIN_MESSAGES + " " + (if((messageSize%1024)==0) (messageSize/1024)+"k"
else messageSize+"b" ) + " with " + load + " "
 
     benchmark("En" + info + "producer(s)") {
       PURGE_STORE = true
@@ -63,7 +74,7 @@ trait DeepQueueScenarios extends Persist
       } finally {
         stopServices();
       }
-      this.assert(messagesSent == totalMessages, "Unexpected number of messages sent!")
+      this.assert(totalMessageSent > MIN_MESSAGES, "Unexpected number of messages sent!")
     }
 
     benchmark("De" + info + "consumer(s)") {
@@ -78,7 +89,7 @@ trait DeepQueueScenarios extends Persist
       } finally {
         stopServices();
       }
-      this.assert(messagesReceived == totalMessages, "Unexpected number of messages received!")
+      this.assert(totalMessageReceived > MIN_MESSAGES, "Unexpected number of messages
received!")
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/Metric.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/Metric.java?rev=1002779&r1=1002778&r2=1002779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/Metric.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/Metric.java
Wed Sep 29 17:40:19 2010
@@ -53,6 +53,6 @@ abstract public class Metric {
         return String.format("%s: %(,.2f %s/s", name, period.rate(counter()), unit);
     }
 
-    abstract public void reset();
+    abstract public long reset();
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricAggregator.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricAggregator.java?rev=1002779&r1=1002778&r2=1002779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricAggregator.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricAggregator.java
Wed Sep 29 17:40:19 2010
@@ -142,10 +142,12 @@ public class MetricAggregator extends Me
     }
 
     @Override
-    public void reset() {
+    public long reset() {
+        long rc = 0;
         for (Metric metric : metrics) {
-            metric.reset();
+            rc += metric.reset();
         }
+        return rc;
     }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricCounter.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricCounter.java?rev=1002779&r1=1002778&r2=1002779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricCounter.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/metric/MetricCounter.java
Wed Sep 29 17:40:19 2010
@@ -40,8 +40,8 @@ public class MetricCounter extends Metri
     }
 
     @Override
-    public void reset() {
-        counter.set(0);
+    public long reset() {
+        return counter.getAndSet(0);
     }
 
 }



Mime
View raw message