activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961093 - in /activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test: resources/org/apache/activemq/apollo/broker/perf/report.html scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
Date Wed, 07 Jul 2010 03:52:12 GMT
Author: chirino
Date: Wed Jul  7 03:52:11 2010
New Revision: 961093

URL: http://svn.apache.org/viewvc?rev=961093&view=rev
Log:
beefing up the combinations tested

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/report.html
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/report.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/report.html?rev=961093&r1=961092&r2=961093&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/report.html
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/report.html
Wed Jul  7 03:52:11 2010
@@ -19,7 +19,7 @@
       .chart-graph {
         float:right; width: 66%; height: 15em;  margin: 1em 0 1em 1em;
       }
-      .chart-section { clear:both; margin-top: 1em; }
+      .chart-section { clear:both; margin-top: 1em; padding-left:2em; }
       .clear { clear:both; }
     </style>
     <script type='text/javascript' src='http://www.google.com/jsapi'></script>
@@ -30,35 +30,63 @@
       google.setOnLoadCallback(function() {
         data = new google.visualization.DataTable();
         data.addColumn('string', 'version');
-        data.addColumn('number', '1->1->0 producer');
-        data.addColumn('number', '1->1->1 producer');
-        data.addColumn('number', '1->1->1 consumer');
-        data.addColumn('number', '2->2->2 producer');
-        data.addColumn('number', '2->2->2 producer sd');
-        data.addColumn('number', '2->2->2 consumer');
-        data.addColumn('number', '2->2->2 consumer sd');
-        data.addColumn('number', '10->1->10 producer');
-        data.addColumn('number', '10->1->10 producer sd');
-        data.addColumn('number', '10->1->10 consumer');
-        data.addColumn('number', '10->1->10 consumer sd');
-        data.addColumn('number', '10->1->1 producer');
-        data.addColumn('number', '10->1->1 producer sd');
-        data.addColumn('number', '10->1->1 consumer');
-        data.addColumn('number', '1->1->10 producer');
-        data.addColumn('number', '1->1->10 consumer');
-        data.addColumn('number', '1->1->10 consumer sd');
-        data.addColumn('number', '10->10->10 producer');
-        data.addColumn('number', '10->10->10 producer sd');
-        data.addColumn('number', '10->10->10 consumer');
-        data.addColumn('number', '10->10->10 consumer sd');
-        data.addColumn('number', '1->1->[1 slow,1 fast] producer');
-        data.addColumn('number', '1->1->[1 slow,1 fast] producer sd');
-        data.addColumn('number', '1->1->[1 slow,1 fast] consumer');
-        data.addColumn('number', '1->1->[1 slow,1 fast] consumer sd');
-        data.addColumn('number', '2->2->[1,1 selecting] producer');
-        data.addColumn('number', '2->2->[1,1 selecting] producer sd');
-        data.addColumn('number', '2->2->[1,1 selecting] consumer');
-        data.addColumn('number', '2->2->[1,1 selecting] consumer sd');
+        data.addColumn('number', 'queue 20 1->1->1 producer');
+        data.addColumn('number', 'queue 20 1->1->1 consumer');
+        data.addColumn('number', 'queue 20 2->2->2 producer');
+        data.addColumn('number', 'queue 20 2->2->2 producer sd');
+        data.addColumn('number', 'queue 20 2->2->2 consumer');
+        data.addColumn('number', 'queue 20 2->2->2 consumer sd');
+        data.addColumn('number', 'queue 20 4->4->4 producer');
+        data.addColumn('number', 'queue 20 4->4->4 producer sd');
+        data.addColumn('number', 'queue 20 4->4->4 consumer');
+        data.addColumn('number', 'queue 20 4->4->4 consumer sd');
+        data.addColumn('number', 'queue 20 8->8->8 producer');
+        data.addColumn('number', 'queue 20 8->8->8 producer sd');
+        data.addColumn('number', 'queue 20 8->8->8 consumer');
+        data.addColumn('number', 'queue 20 8->8->8 consumer sd');
+        data.addColumn('number', 'queue 20 10->10->10 producer');
+        data.addColumn('number', 'queue 20 10->10->10 producer sd');
+        data.addColumn('number', 'queue 20 10->10->10 consumer');
+        data.addColumn('number', 'queue 20 10->10->10 consumer sd');
+        data.addColumn('number', 'queue 20 10->1->1 producer');
+        data.addColumn('number', 'queue 20 10->1->1 producer sd');
+        data.addColumn('number', 'queue 20 10->1->1 consumer');
+        data.addColumn('number', 'queue 20 1->1->10 producer');
+        data.addColumn('number', 'queue 20 1->1->10 consumer');
+        data.addColumn('number', 'queue 20 1->1->10 consumer sd');
+        data.addColumn('number', 'queue 20 10->1->10 producer');
+        data.addColumn('number', 'queue 20 10->1->10 producer sd');
+        data.addColumn('number', 'queue 20 10->1->10 consumer');
+        data.addColumn('number', 'queue 20 10->1->10 consumer sd');
+        data.addColumn('number', 'topic 20 1->1->0 producer');
+        data.addColumn('number', 'topic 20 1->1->1 producer');
+        data.addColumn('number', 'topic 20 1->1->1 consumer');
+        data.addColumn('number', 'topic 20 2->2->2 producer');
+        data.addColumn('number', 'topic 20 2->2->2 producer sd');
+        data.addColumn('number', 'topic 20 2->2->2 consumer');
+        data.addColumn('number', 'topic 20 2->2->2 consumer sd');
+        data.addColumn('number', 'topic 20 4->4->4 producer');
+        data.addColumn('number', 'topic 20 4->4->4 producer sd');
+        data.addColumn('number', 'topic 20 4->4->4 consumer');
+        data.addColumn('number', 'topic 20 4->4->4 consumer sd');
+        data.addColumn('number', 'topic 20 8->8->8 producer');
+        data.addColumn('number', 'topic 20 8->8->8 producer sd');
+        data.addColumn('number', 'topic 20 8->8->8 consumer');
+        data.addColumn('number', 'topic 20 8->8->8 consumer sd');
+        data.addColumn('number', 'topic 20 10->10->10 producer');
+        data.addColumn('number', 'topic 20 10->10->10 producer sd');
+        data.addColumn('number', 'topic 20 10->10->10 consumer');
+        data.addColumn('number', 'topic 20 10->10->10 consumer sd');
+        data.addColumn('number', 'topic 20 10->1->1 producer');
+        data.addColumn('number', 'topic 20 10->1->1 producer sd');
+        data.addColumn('number', 'topic 20 10->1->1 consumer');
+        data.addColumn('number', 'topic 20 1->1->10 producer');
+        data.addColumn('number', 'topic 20 1->1->10 consumer');
+        data.addColumn('number', 'topic 20 1->1->10 consumer sd');
+        data.addColumn('number', 'topic 20 10->1->10 producer');
+        data.addColumn('number', 'topic 20 10->1->10 producer sd');
+        data.addColumn('number', 'topic 20 10->1->10 consumer');
+        data.addColumn('number', 'topic 20 10->1->10 consumer sd');
 
         for( var i=0; i <  data.getNumberOfColumns(); i ++) {
           cols_index_map[data.getColumnLabel(i)] = i;
@@ -66,8 +94,7 @@
 
         var data_array = [
 // DATA-START
-      ['555b061e6782136a1a49926f3d1f6b6ab716fdbb', 151405.66, 11196.00, 13107.33, 16008.33,
5532.50, 14253.00, 2875.50, 622.00, 152.36, 3214.00, 1031.43, 4872.33, 696.11, 4779.67, 2176.27,
20076.31, 69.34, 8170.94, 92.81, 8493.50, 421.33, 103.67, 155.50, 19.67, 29.50, 12049.98,
123.00, 12020.66, 18.00],
-      ['555b061e6782136a1a49926f3d1f6b6ab716fdbb', 140957.00, 14202.33, 14158.67, 12854.67,
622.00, 14241.33, 1423.00, 6833.06, 1155.71, 8560.15, 1052.10, 9759.91, 1442.56, 6537.13,
2590.80, 26000.33, 127.60, 13044.33, 535.73, 13001.67, 629.49, 1450.85, 0.00, 19.66, 29.50,
13612.00, 3198.00, 13327.33, 3027.00]
+      ['commit f1ce02f4ec7bec08a46319895c4b6b30bc9e73df', 9508.81, 9471.25, 6116.33, 466.50,
7064.00, 1577.00, 8186.94, 4294.59, 5738.09, 1043.13, 6012.67, 2299.88, 6369.00, 1371.98,
6529.00, 1356.70, 2517.33, 528.98, 3104.83, 695.42, 2237.94, 4976.00, 4998.00, 19.68, 3110.00,
481.80, 2014.00, 253.15, 113277.23, 9637.79, 9489.50, 9948.68, 4976.00, 9981.01, 1635.50,
11084.94, 1791.63, 9300.80, 637.45, 9537.33, 602.25, 9084.00, 647.53, 9834.33, 646.34, 8926.00,
486.86, 4983.32, 490.75, 5365.60, 1655.36, 16238.52, 51.49, 0.00, 0.00, 0.00, 0.00]
 // DATA-END
         ];
         try {
@@ -104,47 +131,54 @@
           commit version.
         </p>
 
+        <h2>General</h2>
+
         <div class='chart-section'>
-          <div id='partioned_scaling' class='chart-graph'></div>
+          <div id='raw_producer_rate' class='chart-graph'></div>
           <script type='text/javascript'>
             google.setOnLoadCallback(function() {
-              chart('partioned_scaling',
-                ['version', '1->1->1 consumer', '2->2->2 consumer', '10->10->10
consumer'],
+              chart('raw_producer_rate',
+                ['version', 'topic 20 1->1->0 producer'],
                 {showCategories:false, legend: 'bottom', smoothLine:true, titleX:'changes
over time', titleY:'messages/sec', enableTooltip:true }
               );
             });
           </script>
-          <h2>Partitioned Scaling</h2>
-          Compares how well the broker scales as partitioned load is increased on it.  Each
destination
-          has only one producer and one consumer attached.
+          <h3>Max Producer Rate</h3>
+          The maximum rate that a single producer can send to a broker.  The producer is
setup
+          on a topic with no consumers.  This rate is has a close correlation to your processor
+          speed.
         </div>
 
+        <div class="clear"></div>
+        <h2>Queue Performce</h2>
+        <p></p>
         <div class='chart-section'>
-          <div id='raw_producer_rate' class='chart-graph'></div>
+          <div id='queue_partioned_scaling' class='chart-graph'></div>
           <script type='text/javascript'>
             google.setOnLoadCallback(function() {
-              chart('raw_producer_rate',
-                ['version', '1->1->0 producer'],
+              chart('queue_partioned_scaling',
+                ['version', 'queue 20 1->1->1 consumer', 'queue 20 2->2->2 consumer',
'queue 20 4->4->4 consumer', 'queue 20 8->8->8 consumer', 'queue 20 10->10->10
consumer'],
                 {showCategories:false, legend: 'bottom', smoothLine:true, titleX:'changes
over time', titleY:'messages/sec', enableTooltip:true }
               );
             });
           </script>
-          <h2>Max Producer Rate</h2>
-          Compares the maximum producer rate for when there is no attached consumers vs when
there is
-          one attached consumer.
+          <h3>Partitioned Scaling</h3>
+          Compares how well the broker scales as partitioned load is increased on it.  Each
destination
+          has only one producer and one consumer attached.  This should scale up on machines
with many
+          processing cores. 
         </div>
 
         <div class='chart-section'>
-          <div id='fan_in_rate' class='chart-graph'></div>
+          <div id='queue_fan_in_rate' class='chart-graph'></div>
           <script type='text/javascript'>
             google.setOnLoadCallback(function() {
-              chart('fan_in_rate',
-                ['version', '10->1->1 producer', '10->1->1 consumer', '10->1->1
producer sd'],
+              chart('queue_fan_in_rate',
+                ['version', 'queue 20 10->1->1 producer', 'queue 20 10->1->1
consumer', 'queue 20 10->1->1 producer sd'],
                 {showCategories:false, legend: 'bottom', smoothLine:true, titleX:'changes
over time', titleY:'messages/sec', enableTooltip:true }
               );
             });
           </script>
-          <h2>Producer Fan In</h2>
+          <h3>Producer Fan In</h3>
           Total producer and consumer rates for when there are many producers on one destination
           but only one consumer.  The 'producer sd' is standard deviation of the message
rate across the
           producers.
@@ -152,16 +186,16 @@
 
 
         <div class='chart-section'>
-          <div id='fan_out_rate' class='chart-graph'></div>
+          <div id='queue_fan_out_rate' class='chart-graph'></div>
           <script type='text/javascript'>
             google.setOnLoadCallback(function() {
-              chart('fan_out_rate',
-                ['version', '1->1->10 producer', '1->1->10 consumer', '1->1->10
consumer sd'],
+              chart('queue_fan_out_rate',
+                ['version', 'queue 20 1->1->10 producer', 'queue 20 1->1->10
consumer', 'queue 20 1->1->10 consumer sd'],
                 {showCategories:false, legend: 'bottom', smoothLine:true, titleX:'changes
over time', titleY:'messages/sec', enableTooltip:true }
               );
             });
           </script>
-          <h2>Consumer Fan Out</h2>
+          <h3>Consumer Fan Out</h3>
           Total producer and consumer rates for when there are many consumers on one destination
           but only one producer.  The 'consumer sd' is standard deviation of the message
rate across the
           consumers.
@@ -169,16 +203,16 @@
 
 
         <div class='chart-section'>
-          <div id='fan_inout_rate' class='chart-graph'></div>
+          <div id='queue_fan_inout_rate' class='chart-graph'></div>
           <script type='text/javascript'>
             google.setOnLoadCallback(function() {
-              chart('fan_inout_rate',
-                ['version', '10->1->10 producer', '10->1->10 consumer', '10->1->10
producer sd', '10->1->10 consumer sd'],
+              chart('queue_fan_inout_rate',
+                ['version', 'queue 20 10->1->10 producer', 'queue 20 10->1->10
consumer', 'queue 20 10->1->10 producer sd', 'queue 20 10->1->10 consumer sd'],
                 {showCategories:false, legend: 'bottom', smoothLine:true, titleX:'changes
over time', titleY:'messages/sec', enableTooltip:true }
               );
             });
           </script>
-          <h2>Producer Fan In and Consumer Fan Out</h2>
+          <h3>Producer Fan In and Consumer Fan Out</h3>
           Total producer and consumer rates for when there are many producers and many consumers
on one destination.  The 'consumer sd' and 'producer sd'
           are the standard deviation numbers of the producers and consumers.
         </div>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961093&r1=961092&r2=961093&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
Wed Jul  7 03:52:11 2010
@@ -37,30 +37,21 @@ import org.apache.activemq.util.{IOHelpe
 import scala.util.matching.Regex
 
 object BaseBrokerPerfSupport {
-  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
+  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "2"))
   var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "3000"))
-  var IO_WORK_AMOUNT = 0
-  var FANIN_COUNT = 10
-  var FANOUT_COUNT = 10
-  var PRIORITY_LEVELS = 10
-  var USE_INPUT_QUEUES = true
+
+  // Set to use tcp IO
+  protected var TCP = true;
+  // set to force marshalling even in the NON tcp case.
+  protected var FORCE_MARSHALLING = true;
 
   var USE_KAHA_DB = true;
   var PURGE_STORE = true;
-  var PERSISTENT = false;
-  var DURABLE = false;
-
-  // Set to test against ptp queues instead of topics:
-  var PTP = false;
 
   // Set to put senders and consumers on separate brokers.
   var MULTI_BROKER = false;
 
-  // Set to use tcp IO
-  protected var TCP = true;
-
-  // set to force marshalling even in the NON tcp case.
-  protected var FORCE_MARSHALLING = true;
+  var DUMP_REPORT_COLS = true;
 }
 
 abstract class BaseBrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach {
@@ -148,414 +139,393 @@ abstract class BaseBrokerPerfSupport ext
     template match {
       case report_parser(report_header, _, report_footer) =>
         val version = new String(ProcessSupport.system("git", "rev-list", "--max-count=1",
"HEAD").toByteArray).trim
-
         if( !report_data.isEmpty ) {
           report_data += ",\n"
         }
-        report_data += "            // version, "+spread_sheet_stats.map(_._1).mkString(",
")+"\n"
         report_data += "            ['commit "+version+"', "+spread_sheet_stats.map(x=>String.format("%.2f",x._2)).mkString(",
")+"]\n"
-
-        IOHelper.writeText(csvfile, report_header+report_data+report_footer)  
+        IOHelper.writeText(csvfile, report_header+report_data+report_footer)
       case _ =>
         println("could not parse template report file");
     }
 
     println("Updated: "+csvfile);
+    
+    if( DUMP_REPORT_COLS ) {
+      spread_sheet_stats.map(_._1).foreach{x=>
+        println("          data.addColumn('number', '"+x+"');");
+      }
+    }
   }
 
-//
-  def getBrokerWireFormat() = "multi"
+  // Test all the combinations
+  for( PTP<- List(true,false) ; PERSISTENT <- List(false); DURABLE <- List(false)
; size <- List(20,1024,1024*256)) {
 
-  def getRemoteWireFormat(): String
+    val prefix = (if( PTP ) "queue " else "topic ") +(if( PERSISTENT ) "persistent " else
"")+(if((size%1024)==0) (size/1024)+"k" else size.toString)+" "
+    val suffix = (if( DURABLE ) " durable" else "")
 
-  /**
-   * Used to benchmark what is the raw speed of sending messages one way.
-   * Divide by 2 and compare against 1-1-1 to figure out what the broker dispatching
-   * overhead is.
-   */
-  if (!PTP) {
-    test("1->1->0") {
-      producerCount = 1;
-      destCount = 1;
-
-      createConnections();
-
-      // Start 'em up.
-      startClients();
-      try {
-        reportRates();
-      } finally {
-        stopServices();
-      }
-    }
-  }
-
-  /**
-   * The baseline of the performance of going from 1 producer to 1 consumer.
-   */
-  test("1->1->1") {
-    println(testName)
-    producerCount = 1;
-    destCount = 1;
-    consumerCount = 1;
-
-    createConnections();
-    // Start 'em up.
-    startClients();
-    try {
-      reportRates();
-    } finally {
-      stopServices();
-    }
-  }
-
-  /**
-   * To compare against the performance of the 1-1-1 case... If you have
-   * linear scalability then, this should be twice as fast.
-   */
-  test("2->2->2") {
-    producerCount = 2;
-    destCount = 2;
-    consumerCount = 2;
-
-    createConnections();
-
-    // Start 'em up.
-    startClients();
-    try {
-      reportRates();
-    } finally {
-      stopServices();
-    }
-  }
-
-  /**
-   * To see how high producer and consumer contention on a destination performs.
-   */
-  test(format("%d->1->%d", FANIN_COUNT, FANOUT_COUNT)) {
-    producerCount = FANIN_COUNT;
-    consumerCount = FANOUT_COUNT;
-    destCount = 1;
-
-    createConnections();
-
-    // Start 'em up.
-    startClients();
-    try {
-      reportRates();
-    } finally {
-      stopServices();
-    }
-  }
-
-  /**
-   * To see how high producer contention on a destination performs.
-   */
-  test(format("%d->1->1", FANIN_COUNT)) {
-    producerCount = FANIN_COUNT;
-    destCount = 1;
-    consumerCount = 1;
-
-    createConnections();
-
-    // Start 'em up.
-    startClients();
-    try {
-      reportRates();
-    } finally {
-      stopServices();
-    }
-  }
-
-  /**
-   * To see how high consumer contention on a destination performs.
-   */
-  test(format("1->1->%d", FANOUT_COUNT)) {
-    producerCount = 1;
-    destCount = 1;
-    consumerCount = FANOUT_COUNT;
-
-    createConnections();
-
-    // Start 'em up.
-    startClients();
-    try {
-      reportRates();
-    } finally {
-      stopServices();
-    }
-  }
-
-  /**
-   * To test how an overload situation affects scalability.  Compare to the
-   * scalability trend of 1-1-1 to 2-2-2
-   */
-  test("10->10->10") {
-    producerCount = 10;
-    destCount = 10;
-    consumerCount = 10;
-
-    createConnections();
-
-    // Start 'em up.
-    startClients();
-    try {
-      reportRates();
-    } finally {
-      stopServices();
-    }
-  }
-
-  /**
-   *  Tests 1 producers sending to 1 destination with 1 slow and 1 fast consumer.
-   *
-   * queue case: the producer should not slow down since it can dispatch to the
-   *             fast consumer
-   *
-   * topic case: the producer should slow down since it HAS to dispatch to the
-   *             slow consumer.
-   *
-   */
-  test("1->1->[1 slow,1 fast]") {
-    producerCount = 2;
-    destCount = 1;
-    consumerCount = 2;
-
-    createConnections();
-    consumers.get(0).thinkTime = 50;
-
-    // Start 'em up.
-    startClients();
-    try {
-      reportRates();
-    } finally {
-      stopServices();
-    }
-  }
-
-  test("2->2->[1,1 selecting]") {
-    producerCount = 2;
-    destCount = 2;
-    consumerCount = 2;
-
-    createConnections();
-
-    // Add properties to match producers to their consumers
-    for (i <- 0 until consumerCount) {
-      var property = "match" + i;
-      consumers.get(i).selector = property;
-      producers.get(i).property = property;
-    }
-
-    // Start 'em up.
-    startClients();
-    try {
-      reportRates();
-    } finally {
-      stopServices();
-    }
-  }
-
-  /**
-   * Test sending with 1 high priority sender. The high priority sender should
-   * have higher throughput than the other low priority senders.
-   *
-   * @throws Exception
-   */
-  test("[1 high, 1 normal]->1->1") {
-    producerCount = 2;
-    destCount = 1;
-    consumerCount = 1;
-
-    createConnections();
-    var producer = producers.get(0);
-    producer.priority = 1
-    producer.rate.setName("High Priority Producer Rate");
-
-    consumers.get(0).thinkTime = 1;
-
-    // Start 'em up.
-    startClients();
-    try {
-      println("Checking rates...");
-      for (i <- 0 until PERFORMANCE_SAMPLES) {
-        var p = new Period();
-        Thread.sleep(SAMPLE_PERIOD);
-        println(producer.rate.getRateSummary(p));
-        println(totalProducerRate.getRateSummary(p));
-        println(totalConsumerRate.getRateSummary(p));
-        totalProducerRate.reset();
-        totalConsumerRate.reset();
+    if( PTP && DURABLE ) {
+      // skip this iteration since queues and durable subs don't mix.
+    } else {
+
+      /**
+       * Used to benchmark what is the raw speed of sending messages one way.
+       * Divide by 2 and compare against 1-1-1 to figure out what the broker dispatching
+       * overhead is.
+       */
+      if (!PTP) {
+        test(prefix+"1->1->0"+suffix) {
+          producerCount = 1;
+          destCount = 1;
+
+          createConnections();
+
+          // Start 'em up.
+          startClients();
+          try {
+            reportRates();
+          } finally {
+            stopServices();
+          }
+        }
       }
 
-    } finally {
-      stopServices();
+      /**
+       * Test increasing partitioned load.  Should linearly scale up on multi-core
+       * machines until CPU usage is maxed.
+       */
+      for( count <- List(1, 2, 4, 8, 10) ) {
+        test(format("%s%d->%d->%d%s", prefix, count, count, count, suffix)) {
+          println(testName)
+          producerCount = count;
+          destCount = count;
+          consumerCount = count;
+
+          createConnections();
+          // Start 'em up.
+          startClients();
+          try {
+            reportRates();
+          } finally {
+            stopServices();
+          }
+        }
+      }
+
+      /**
+       * Test the effects of high producer and consumer contention on a single
+       * destination.
+       */
+      for( (producers, consumers) <- List((10, 1), (1, 10), (10, 10)) ) {
+        test(format("%s%d->1->%d%s", prefix, producers, consumers, suffix)) {
+          producerCount = producers;
+          consumerCount = consumers;
+          destCount = 1;
+
+          createConnections();
+
+          // Start 'em up.
+          startClients();
+          try {
+            reportRates();
+          } finally {
+            stopServices();
+          }
+        }
+      }
+
+//    /**
+//     *  Tests 1 producers sending to 1 destination with 1 slow and 1 fast consumer.
+//     *
+//     * queue case: the producer should not slow down since it can dispatch to the
+//     *             fast consumer
+//     *
+//     * topic case: the producer should slow down since it HAS to dispatch to the
+//     *             slow consumer.
+//     *
+//     */
+//    test("1->1->[1 slow,1 fast]") {
+//      producerCount = 2;
+//      destCount = 1;
+//      consumerCount = 2;
+//
+//      createConnections();
+//      consumers.get(0).thinkTime = 50;
+//
+//      // Start 'em up.
+//      startClients();
+//      try {
+//        reportRates();
+//      } finally {
+//        stopServices();
+//      }
+//    }
+//
+//    test("2->2->[1,1 selecting]") {
+//      producerCount = 2;
+//      destCount = 2;
+//      consumerCount = 2;
+//
+//      createConnections();
+//
+//      // Add properties to match producers to their consumers
+//      for (i <- 0 until consumerCount) {
+//        var property = "match" + i;
+//        consumers.get(i).selector = property;
+//        producers.get(i).property = property;
+//      }
+//
+//      // Start 'em up.
+//      startClients();
+//      try {
+//        reportRates();
+//      } finally {
+//        stopServices();
+//      }
+//    }
+
+//    /**
+//     * Test sending with 1 high priority sender. The high priority sender should
+//     * have higher throughput than the other low priority senders.
+//     *
+//     * @throws Exception
+//     */
+//    test("[1 high, 1 normal]->1->1") {
+//      producerCount = 2;
+//      destCount = 1;
+//      consumerCount = 1;
+//
+//      createConnections();
+//      var producer = producers.get(0);
+//      producer.priority = 1
+//      producer.rate.setName("High Priority Producer Rate");
+//
+//      consumers.get(0).thinkTime = 1;
+//
+//      // Start 'em up.
+//      startClients();
+//      try {
+//        println("Checking rates...");
+//        for (i <- 0 until PERFORMANCE_SAMPLES) {
+//          var p = new Period();
+//          Thread.sleep(SAMPLE_PERIOD);
+//          println(producer.rate.getRateSummary(p));
+//          println(totalProducerRate.getRateSummary(p));
+//          println(totalConsumerRate.getRateSummary(p));
+//          totalProducerRate.reset();
+//          totalConsumerRate.reset();
+//        }
+//
+//      } finally {
+//        stopServices();
+//      }
+//    }
+
+//    /**
+//     * Test sending with 1 high priority sender. The high priority sender should
+//     * have higher throughput than the other low priority senders.
+//     *
+//     * @throws Exception
+//     */
+//    test("[1 high, 1 mixed, 1 normal]->1->1") {
+//      producerCount = 2;
+//      destCount = 1;
+//      consumerCount = 1;
+//
+//      createConnections();
+//      var producer = producers.get(0);
+//      producer.priority = 1;
+//      producer.priorityMod = 3;
+//      producer.rate.setName("High Priority Producer Rate");
+//
+//      consumers.get(0).thinkTime = 1
+//
+//      // Start 'em up.
+//      startClients();
+//      try {
+//
+//        println("Checking rates...");
+//        for (i <- 0 until PERFORMANCE_SAMPLES) {
+//          var p = new Period();
+//          Thread.sleep(SAMPLE_PERIOD);
+//          println(producer.rate.getRateSummary(p));
+//          println(totalProducerRate.getRateSummary(p));
+//          println(totalConsumerRate.getRateSummary(p));
+//          totalProducerRate.reset();
+//          totalConsumerRate.reset();
+//        }
+//
+//      } finally {
+//        stopServices();
+//      }
+//    }
+
     }
-  }
 
-  /**
-   * Test sending with 1 high priority sender. The high priority sender should
-   * have higher throughput than the other low priority senders.
-   *
-   * @throws Exception
-   */
-  test("[1 high, 1 mixed, 1 normal]->1->1") {
-    producerCount = 2;
-    destCount = 1;
-    consumerCount = 1;
-
-    createConnections();
-    var producer = producers.get(0);
-    producer.priority = 1;
-    producer.priorityMod = 3;
-    producer.rate.setName("High Priority Producer Rate");
-
-    consumers.get(0).thinkTime = 1
-
-    // Start 'em up.
-    startClients();
-    try {
+    def createConnections() = {
 
-      println("Checking rates...");
-      for (i <- 0 until PERFORMANCE_SAMPLES) {
-        var p = new Period();
-        Thread.sleep(SAMPLE_PERIOD);
-        println(producer.rate.getRateSummary(p));
-        println(totalProducerRate.getRateSummary(p));
-        println(totalConsumerRate.getRateSummary(p));
-        totalProducerRate.reset();
-        totalConsumerRate.reset();
+      if (MULTI_BROKER) {
+        sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
+        rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
+        brokers.add(sendBroker);
+        brokers.add(rcvBroker);
+      } else {
+        sendBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
+        rcvBroker = sendBroker
+        brokers.add(sendBroker);
+      }
+
+      startBrokers();
+
+      var dests = new Array[Destination](destCount);
+
+      for (i <- 0 until destCount) {
+        val domain = if (PTP) {Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
+        val name = new AsciiBuffer("dest" + (i + 1))
+        var bean = new SingleDestination(domain, name)
+        dests(i) = bean;
+        if (PTP) {
+          sendBroker.defaultVirtualHost.createQueue(dests(i));
+          if (MULTI_BROKER) {
+            rcvBroker.defaultVirtualHost.createQueue(dests(i));
+          }
+        }
+      }
+
+      for (i <- 0 until producerCount) {
+        var destination = dests(i % destCount);
+        var producer = _createProducer(i, destination);
+        producer.persistentDelivery = PERSISTENT;
+        producers.add(producer);
+      }
+
+      for (i <- 0 until consumerCount) {
+        var destination = dests(i % destCount);
+        var consumer = _createConsumer(i, destination);
+        consumer.durable = DURABLE;
+        consumers.add(consumer);
       }
 
-    } finally {
-      stopServices();
+      // Create MultiBroker connections:
+      // if (multibroker) {
+      // Pipe<Message> pipe = new Pipe<Message>();
+      // sendBroker.createBrokerConnection(rcvBroker, pipe);
+      // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+      // }
     }
-  }
 
-  def reportRates() = {
-    val best_sample = PERFORMANCE_SAMPLES/2
+    def _createConsumer(i: Int, destination: Destination): RemoteConsumer = {
 
-    println("Checking "+(if (PTP) "ptp" else "topic")+" rates...");
-    for (i <- 0 until PERFORMANCE_SAMPLES) {
-      var p = new Period();
-      Thread.sleep(SAMPLE_PERIOD);
-      if( producerCount > 0 ) {
-        println(totalProducerRate.getRateSummary(p));
+      var consumer = createConsumer();
+      consumer.brokerPerfTest = this
+
+      consumer.uri = rcvBroker.connectUris.head
+      consumer.destination = destination
+      consumer.name = "consumer" + (i + 1)
+      consumer.totalConsumerRate = totalConsumerRate
+      return consumer;
+    }
+
+
+    def _createProducer(id: Int, destination: Destination): RemoteProducer = {
+      var producer = createProducer();
+      producer.brokerPerfTest = this
+      producer.uri = sendBroker.connectUris.head
+      producer.producerId = id + 1
+      producer.name = "producer" + (id + 1)
+      producer.destination = destination
+      producer.messageIdGenerator = msgIdGenerator
+      producer.totalProducerRate = totalProducerRate
+      producer.payloadSize = size;
+      producer
+    }
+
+    def stopServices() = {
+      stopping.set(true);
+      val tracker = new CompletionTracker("test shutdown")
+      for (broker <- brokers) {
+        broker.stop(tracker.task("broker"));
       }
-      if( consumerCount > 0 ) {
-        println(totalConsumerRate.getRateSummary(p));
+      for (connection <- producers) {
+        connection.stop(tracker.task(connection.toString));
       }
+      for (connection <- consumers) {
+        connection.stop(tracker.task(connection.toString));
+      }
+      println("waiting for services to stop");
+      tracker.await
+    }
 
-      if( i == best_sample ) {
-        if( producerCount > 0 ) {
-          spread_sheet_stats = spread_sheet_stats ::: ( testName+" :: producer rate", totalProducerRate.total(p)
) :: Nil
-          if( producerCount > 1 ) {
-            spread_sheet_stats = spread_sheet_stats ::: ( testName+" :: producer deviation",
totalProducerRate.deviation ) :: Nil
-          }
-        }
-        if( consumerCount > 0 ) {
-          spread_sheet_stats = spread_sheet_stats ::: ( testName+" :: consumer rate", totalConsumerRate.total(p)
) :: Nil
-          if( consumerCount > 1 ) {
-            spread_sheet_stats = spread_sheet_stats ::: ( testName+" :: consumer deviation",
totalConsumerRate.deviation ) :: Nil
-          }
-        }
+    def startBrokers() = {
+      val tracker = new CompletionTracker("test broker startup")
+      for (broker <- brokers) {
+        broker.start(tracker.task("broker"));
       }
-      
-      totalProducerRate.reset();
-      totalConsumerRate.reset();
+      tracker.await
     }
 
 
-  }
+    def startClients() = {
+      var tracker = new CompletionTracker("test consumer startup")
+      for (connection <- consumers) {
+        connection.start(tracker.task(connection.toString));
+      }
+      tracker.await
+      tracker = new CompletionTracker("test producer startup")
+      for (connection <- producers) {
+        connection.start(tracker.task(connection.toString));
+      }
+      tracker.await
+    }
 
-  def createConnections() = {
+    def reportRates() = {
+      val best_sample = PERFORMANCE_SAMPLES/2
 
-    if (MULTI_BROKER) {
-      sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
-      rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
-      brokers.add(sendBroker);
-      brokers.add(rcvBroker);
-    } else {
-      sendBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
-      rcvBroker = sendBroker
-      brokers.add(sendBroker);
-    }
-
-    startBrokers();
-
-    var dests = new Array[Destination](destCount);
-
-    for (i <- 0 until destCount) {
-      val domain = if (PTP) {Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
-      val name = new AsciiBuffer("dest" + (i + 1))
-      var bean = new SingleDestination(domain, name)
-      dests(i) = bean;
-      if (PTP) {
-        sendBroker.defaultVirtualHost.createQueue(dests(i));
-        if (MULTI_BROKER) {
-          rcvBroker.defaultVirtualHost.createQueue(dests(i));
+      println("Checking "+(if (PTP) "ptp" else "topic")+" rates...");
+      for (i <- 0 until PERFORMANCE_SAMPLES) {
+        var p = new Period();
+        Thread.sleep(SAMPLE_PERIOD);
+        if( producerCount > 0 ) {
+          println(totalProducerRate.getRateSummary(p));
+        }
+        if( consumerCount > 0 ) {
+          println(totalConsumerRate.getRateSummary(p));
         }
-      }
-    }
 
-    for (i <- 0 until producerCount) {
-      var destination = dests(i % destCount);
-      var producer = createProducer(i, destination);
-      producer.persistentDelivery = PERSISTENT;
-      producers.add(producer);
-    }
-
-    for (i <- 0 until consumerCount) {
-      var destination = dests(i % destCount);
-      var consumer = createConsumer(i, destination);
-      consumer.durable = DURABLE;
-      consumers.add(consumer);
-    }
-
-    // Create MultiBroker connections:
-    // if (multibroker) {
-    // Pipe<Message> pipe = new Pipe<Message>();
-    // sendBroker.createBrokerConnection(rcvBroker, pipe);
-    // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
-    // }
-  }
-
-  def createConsumer(i: Int, destination: Destination): RemoteConsumer = {
-
-    var consumer = createConsumer();
-    consumer.brokerPerfTest = this
-
-    consumer.uri = rcvBroker.connectUris.head
-    consumer.destination = destination
-    consumer.name = "consumer" + (i + 1)
-    consumer.totalConsumerRate = totalConsumerRate
-    return consumer;
-  }
+        if( i == best_sample ) {
+          if( producerCount > 0 ) {
+            spread_sheet_stats = spread_sheet_stats ::: ( testName+" producer", totalProducerRate.total(p)
) :: Nil
+            if( producerCount > 1 ) {
+              spread_sheet_stats = spread_sheet_stats ::: ( testName+" producer sd", totalProducerRate.deviation
) :: Nil
+            }
+          }
+          if( consumerCount > 0 ) {
+            spread_sheet_stats = spread_sheet_stats ::: ( testName+" consumer", totalConsumerRate.total(p)
) :: Nil
+            if( consumerCount > 1 ) {
+              spread_sheet_stats = spread_sheet_stats ::: ( testName+" consumer sd", totalConsumerRate.deviation
) :: Nil
+            }
+          }
+        }
 
-  protected def createConsumer(): RemoteConsumer
+        totalProducerRate.reset();
+        totalConsumerRate.reset();
+      }
 
-  private def createProducer(id: Int, destination: Destination): RemoteProducer = {
-    var producer = createProducer();
-    producer.brokerPerfTest = this
-    producer.uri = sendBroker.connectUris.head
-    producer.producerId = id + 1
-    producer.name = "producer" + (id + 1)
-    producer.destination = destination
-    producer.messageIdGenerator = msgIdGenerator
-    producer.totalProducerRate = totalProducerRate
-    producer
+    }
   }
-
+  
+  protected def createConsumer(): RemoteConsumer
   protected def createProducer(): RemoteProducer
 
-  private def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
+  def getBrokerWireFormat() = "multi"
+  def getRemoteWireFormat(): String
+  def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
     val broker = new Broker()
     broker.transportServers.add(TransportFactory.bind(bindURI))
     broker.connectUris.add(connectUri)
     broker
   }
 
-  protected def createStore(broker: Broker): Store = {
+  def createStore(broker: Broker): Store = {
     val store = if (USE_KAHA_DB) {
       StoreFactory.createStore("hawtdb");
     } else {
@@ -566,44 +536,6 @@ abstract class BaseBrokerPerfSupport ext
     store
   }
 
-  private def stopServices() = {
-    stopping.set(true);
-    val tracker = new CompletionTracker("test shutdown")
-    for (broker <- brokers) {
-      broker.stop(tracker.task("broker"));
-    }
-    for (connection <- producers) {
-      connection.stop(tracker.task(connection.toString));
-    }
-    for (connection <- consumers) {
-      connection.stop(tracker.task(connection.toString));
-    }
-    println("waiting for services to stop");
-    tracker.await
-  }
-
-  private def startBrokers() = {
-    val tracker = new CompletionTracker("test broker startup")
-    for (broker <- brokers) {
-      broker.start(tracker.task("broker"));
-    }
-    tracker.await
-  }
-
-
-  private def startClients() = {
-    var tracker = new CompletionTracker("test consumer startup")
-    for (connection <- consumers) {
-      connection.start(tracker.task(connection.toString));
-    }
-    tracker.await
-    tracker = new CompletionTracker("test producer startup")
-    for (connection <- producers) {
-      connection.start(tracker.task(connection.toString));
-    }
-    tracker.await
-  }
-
 }
 
 abstract class RemoteConsumer extends Connection {



Mime
View raw message