activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r645881 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/
Date Tue, 08 Apr 2008 12:43:38 GMT
Author: rajdavies
Date: Tue Apr  8 05:43:36 2008
New Revision: 645881

URL: http://svn.apache.org/viewvc?rev=645881&view=rev
Log:
Updated to support multiple destinations

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
Tue Apr  8 05:43:36 2008
@@ -38,7 +38,7 @@
     protected Connection connection;
     protected MessageConsumer consumer;
     protected long sleepDuration;
-    protected boolean enableAudit = true;
+    protected boolean enableAudit = false;
     protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(16 * 1024,20);
 
     protected PerfRate rate = new PerfRate();
@@ -82,7 +82,7 @@
             if (enableAudit && !this.audit.isInOrder(msg.getJMSMessageID())) {
                 LOG.error("Message out of order!!" + msg);
             }
-            if (this.audit.isDuplicate(msg)){
+            if (enableAudit && this.audit.isDuplicate(msg)){
                 LOG.error("Duplicate Message!" + msg);
             }
         } catch (JMSException e1) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
Tue Apr  8 05:43:36 2008
@@ -38,6 +38,7 @@
     private Session session;
     private final CountDownLatch stopped = new CountDownLatch(1);
     private boolean running;
+    private int sleep = 0;
 
     public PerfProducer(ConnectionFactory fac, Destination dest, byte[] palyload) throws
JMSException {
         connection = fac.createConnection();
@@ -93,12 +94,23 @@
                 msg.writeBytes(payload);
                 producer.send(msg);
                 rate.increment();
+                if (sleep > 0) {
+                    Thread.sleep(sleep);
+                }
             }
         } catch (Throwable e) {
             e.printStackTrace();
         } finally {
             stopped.countDown();
         }
+    }
+
+    public int getSleep() {
+        return sleep;
+    }
+
+    public void setSleep(int sleep) {
+        this.sleep = sleep;
     }
 
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java
Tue Apr  8 05:43:36 2008
@@ -61,7 +61,7 @@
         factory = createConnectionFactory(bindAddress);
         Connection con = factory.createConnection();
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationName);
+        Destination destination = createDestination(session, destinationName);
         con.close();
         for (int i = 0; i < 3; i++) {
             Connection connection = factory.createConnection();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java
Tue Apr  8 05:43:36 2008
@@ -28,8 +28,8 @@
 public class SimpleDurableTopicNetworkTest extends SimpleNetworkTest {
     
     protected void setUp() throws Exception {
-        numberofProducers=6;
-        numberOfConsumers=6;
+        numberofProducers=60;
+        numberOfConsumers=60;
         samepleCount=1000;
         playloadSize = 1024;
         super.setUp();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
Tue Apr  8 05:43:36 2008
@@ -25,6 +25,15 @@
  * @version $Revision: 1.3 $
  */
 public class SimpleDurableTopicTest extends SimpleTopicTest {
+    
+    protected void setUp() throws Exception {
+        numberOfDestinations=6;
+        numberOfConsumers = 1;
+        numberofProducers = 1;
+        samepleCount=1000;
+        playloadSize = 1024;
+        super.setUp();
+    }
     protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number,
byte payload[]) throws JMSException {
         PerfProducer pp = new PerfProducer(fac, dest, payload);
         pp.setDeliveryMode(DeliveryMode.PERSISTENT);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java
Tue Apr  8 05:43:36 2008
@@ -17,9 +17,11 @@
 package org.apache.activemq.perf;
 
 import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
 import javax.jms.Session;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.commons.logging.Log;
@@ -29,45 +31,58 @@
 public class SimpleNetworkTest extends SimpleTopicTest {
 
     private static final Log LOG = LogFactory.getLog(SimpleNetworkTest.class);
-    protected String consumerBindAddress = "tcp://localhost:61616";
+    //protected String consumerBindAddress = "tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=1000,tcp://localhost:61617?wireFormat.maxInactivityDuration=1000";
+    protected String consumerBindAddress = "tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=2000&socket.tcpNoDelayEnabled=false";
     protected String producerBindAddress = "tcp://localhost:61617";
     protected static final String CONSUMER_BROKER_NAME = "Consumer";
     protected static final String PRODUCER_BROKER_NAME = "Producer";
     protected BrokerService consumerBroker;
     protected BrokerService producerBroker;
-    protected ConnectionFactory consumerFactory;
-    protected ConnectionFactory producerFactory;
+    protected ActiveMQConnectionFactory consumerFactory;
+    protected ActiveMQConnectionFactory producerFactory;
+    
     
     protected void setUp() throws Exception {
         if (consumerBroker == null) {
-            consumerBroker = createConsumerBroker(consumerBindAddress);
+           // consumerBroker = createConsumerBroker(consumerBindAddress);
         }
         if (producerBroker == null) {
             producerBroker = createProducerBroker(producerBindAddress);
         }
-        consumerFactory = createConnectionFactory("vm://"+CONSUMER_BROKER_NAME);
-        producerFactory = createConnectionFactory("vm://"+ PRODUCER_BROKER_NAME);
-        //consumerFactory = createConnectionFactory(consumerBindAddress);
-        //producerFactory = createConnectionFactory(producerBindAddress);
+        //consumerFactory = createConnectionFactory("vm://"+CONSUMER_BROKER_NAME);
+        //producerFactory = createConnectionFactory("vm://"+ PRODUCER_BROKER_NAME);
+        consumerFactory = createConnectionFactory("failover://("+consumerBindAddress + ","
+ producerBindAddress +")?randomize=false&backup=false");
+        //consumerFactory = createConnectionFactory("failover://("+consumerBindAddress+")?backup=true");
+        consumerFactory.setDispatchAsync(true);
+        ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
+        policy.setQueuePrefetch(100);
+        consumerFactory.setPrefetchPolicy(policy);
+        producerFactory = createConnectionFactory(producerBindAddress);
         Connection con = consumerFactory.createConnection();
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationName);
-        LOG.info("Testing against destination: " + destination);
-        LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers
+ " consumer(s)");
-        con.close();
-        producers = new PerfProducer[numberofProducers];
-        consumers = new PerfConsumer[numberOfConsumers];
-        for (int i = 0; i < numberOfConsumers; i++) {
-            consumers[i] = createConsumer(consumerFactory, destination, i);
-            consumers[i].setSleepDuration(consumerSleepDuration);
-        }
-        for (int i = 0; i < numberofProducers; i++) {
-            array = new byte[playloadSize];
-            for (int j = i; j < array.length; j++) {
-                array[j] = (byte)j;
+        
+        producers = new PerfProducer[numberofProducers*numberOfDestinations];
+        consumers = new PerfConsumer[numberOfConsumers*numberOfDestinations];
+        int consumerCount = 0;
+        int producerCount = 0;
+        for (int k =0; k < numberOfDestinations;k++) {
+            Destination destination = createDestination(session, destinationName+":"+k);
+            LOG.info("Testing against destination: " + destination);
+            for (int i = 0; i < numberOfConsumers; i++) {
+                consumers[consumerCount] = createConsumer(factory, destination, consumerCount);
+                consumers[consumerCount].setSleepDuration(consumerSleepDuration);
+                consumerCount++;
+            }
+            for (int i = 0; i < numberofProducers; i++) {
+                array = new byte[playloadSize];
+                for (int j = i; j < array.length; j++) {
+                    array[j] = (byte)j;
+                }
+                producers[producerCount] = createProducer(factory, destination, i, array);
+                producerCount++;
             }
-            producers[i] = createProducer(producerFactory, destination, i, array);
         }
+        con.close();
     }
 
     protected void tearDown() throws Exception {
@@ -96,6 +111,7 @@
     }
     
     protected void configureConsumerBroker(BrokerService answer,String uri) throws Exception
{
+        configureBroker(answer);
         answer.setPersistent(false);
         answer.setBrokerName(CONSUMER_BROKER_NAME);
         answer.setDeleteAllMessagesOnStartup(true);
@@ -111,13 +127,22 @@
     }
     
     protected void configureProducerBroker(BrokerService answer,String uri) throws Exception
{
+        configureBroker(answer);
         answer.setBrokerName(PRODUCER_BROKER_NAME);
+        answer.setMonitorConnectionSplits(false);
+        //answer.setSplitSystemUsageForProducersConsumers(true);
         answer.setPersistent(false);
         answer.setDeleteAllMessagesOnStartup(true);
-        NetworkConnector connector = answer.addNetworkConnector("static://"+consumerBindAddress);
-        connector.setDuplex(true);
+        NetworkConnector connector = answer.addNetworkConnector("static://tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=2000");
+        //connector.setNetworkTTL(3);
+        //connector.setDynamicOnly(true);
+        //connector.setDuplex(true);
         answer.addConnector(uri);
         answer.setUseShutdownHook(false);
+    }
+    
+    protected void configureBroker(BrokerService service) throws Exception{
+        
     }
 
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java
Tue Apr  8 05:43:36 2008
@@ -17,23 +17,30 @@
 
 package org.apache.activemq.perf;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Session;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+
 public class SimpleNonPersistentQueueNetworkTest extends SimpleNetworkTest {
 
-	protected void setUp() throws Exception {
-		numberOfConsumers = 10;
-	    numberofProducers = 10;
+	protected void setUp()throws Exception {
+	    numberOfDestinations =20;
 	    super.setUp();
 	}
 	protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number,
byte[] payload) throws JMSException {
         PerfProducer pp = new PerfProducer(fac, dest, payload);
-        pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        pp.setTimeToLive(1000);
+       pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+       // pp.setTimeToLive(1000);
+        //pp.setSleep(1);
         return pp;
     }
 	
@@ -41,11 +48,36 @@
         PerfConsumer consumer =  new PerfConsumer(fac, dest);
         boolean enableAudit = numberOfConsumers <= 1;
         System.out.println("Enable Audit = " + enableAudit);
-        consumer.setEnableAudit(enableAudit);
+        consumer.setEnableAudit(false);
         return consumer;
     }
 	
+	 public void testPerformance() throws JMSException, InterruptedException {
+	     //Thread.sleep(5000);
+	     super.testPerformance();
+	 }
+	
 	 protected Destination createDestination(Session s, String destinationName) throws JMSException
{
 	        return s.createQueue(destinationName);
+	 }
+	 
+	 protected void configureBroker(BrokerService answer) throws Exception {
+	        answer.setPersistent(false);
+	        answer.setMonitorConnectionSplits(true);
+	        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+	        final PolicyEntry entry = new PolicyEntry();
+	        entry.setQueue(">");
+	        entry.setMemoryLimit(1024 * 1024 * 100); // Set to 1 MB
+	        entry.setOptimizedDispatch(true);
+	        entry.setProducerFlowControl(true);
+	        entry.setMaxPageSize(10);
+	        entry.setLazyDispatch(false);
+	        policyEntries.add(entry);
+
+	        
+	        final PolicyMap policyMap = new PolicyMap();
+	        policyMap.setPolicyEntries(policyEntries);
+	        answer.setDestinationPolicy(policyMap);
+	        super.configureBroker(answer);
 	    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
Tue Apr  8 05:43:36 2008
@@ -16,20 +16,53 @@
  */
 package org.apache.activemq.perf;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+
 /**
  * @version $Revision: 1.3 $
  */
 public class SimpleNonPersistentQueueTest extends SimpleQueueTest {
 
+    protected void setUp() throws Exception {
+        numberOfConsumers = 10;
+        numberofProducers = 10;
+        //this.consumerSleepDuration=100;
+        super.setUp();
+    }
     protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number,
byte[] payload) throws JMSException {
         PerfProducer pp = new PerfProducer(fac, dest, payload);
         pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        pp.setTimeToLive(100);
+        //pp.setTimeToLive(100);
         return pp;
+    }
+    
+    protected void configureBroker(BrokerService answer,String uri) throws Exception {
+        answer.setPersistent(false);
+        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+        final PolicyEntry entry = new PolicyEntry();
+        entry.setQueue(">");
+        entry.setMemoryLimit(1024 * 1024 * 1); // Set to 1 MB
+        entry.setOptimizedDispatch(true);
+        entry.setLazyDispatch(true);
+        policyEntries.add(entry);
+
+        
+        final PolicyMap policyMap = new PolicyMap();
+        policyMap.setPolicyEntries(policyEntries);
+        answer.setDestinationPolicy(policyMap);
+        super.configureBroker(answer, uri);
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
Tue Apr  8 05:43:36 2008
@@ -31,9 +31,7 @@
     }
     
     protected void setUp() throws Exception {
-        numberOfConsumers = 1;
-        numberofProducers = 2;
-        this.consumerSleepDuration=0;
+      
         super.setUp();
     }
     

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
Tue Apr  8 05:43:36 2008
@@ -35,8 +35,8 @@
     private static final Log LOG = LogFactory.getLog(SimpleTopicTest.class);
     
     protected BrokerService broker;
-    // protected String
-    // bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false";
+    protected String clientURI="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false&wireFormat.maxInactivityDuration=50000";
+    //protected String clientURI="tcp://localhost:61616";
     protected String bindAddress="tcp://localhost:61616";
     //protected String bindAddress = "tcp://localhost:61616";
     //protected String bindAddress="vm://localhost?marshal=true";
@@ -46,12 +46,15 @@
     protected String destinationName = getClass().getName();
     protected int samepleCount = 20;
     protected long sampleInternal = 10000;
-    protected int numberOfConsumers = 1;
-    protected int numberofProducers = 0;
+    protected int numberOfDestinations=1;
+    protected int numberOfConsumers = 10;
+    protected int numberofProducers = 10;
+    protected int totalNumberOfProducers;
+    protected int totalNumberOfConsumers;
     protected int playloadSize = 1024;
     protected byte[] array;
     protected ConnectionFactory factory;
-    protected Destination destination;
+    
     protected long consumerSleepDuration=0;
 
     /**
@@ -63,26 +66,37 @@
         if (broker == null) {
             broker = createBroker(bindAddress);
         }
-        factory = createConnectionFactory(bindAddress);
+        factory = createConnectionFactory(clientURI);
         Connection con = factory.createConnection();
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationName);
-        LOG.info("Testing against destination: " + destination);
-        LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers
+ " consumer(s)");
-        con.close();
-        producers = new PerfProducer[numberofProducers];
-        consumers = new PerfConsumer[numberOfConsumers];
-        for (int i = 0; i < numberOfConsumers; i++) {
-            consumers[i] = createConsumer(factory, destination, i);
-            consumers[i].setSleepDuration(consumerSleepDuration);
-        }
-        for (int i = 0; i < numberofProducers; i++) {
-            array = new byte[playloadSize];
-            for (int j = i; j < array.length; j++) {
-                array[j] = (byte)j;
+       
+        
+        LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers
+ " consumer(s) per " + numberOfDestinations + " Destination(s)");
+       
+        totalNumberOfConsumers=numberOfConsumers*numberOfDestinations;
+        totalNumberOfProducers=numberofProducers*numberOfDestinations;
+        producers = new PerfProducer[totalNumberOfProducers];
+        consumers = new PerfConsumer[totalNumberOfConsumers];
+        int consumerCount = 0;
+        int producerCount = 0;
+        for (int k =0; k < numberOfDestinations;k++) {
+            Destination destination = createDestination(session, destinationName+":"+k);
+            LOG.info("Testing against destination: " + destination);
+            for (int i = 0; i < numberOfConsumers; i++) {
+                consumers[consumerCount] = createConsumer(factory, destination, consumerCount);
+                consumers[consumerCount].setSleepDuration(consumerSleepDuration);
+                consumerCount++;
+            }
+            for (int i = 0; i < numberofProducers; i++) {
+                array = new byte[playloadSize];
+                for (int j = i; j < array.length; j++) {
+                    array[j] = (byte)j;
+                }
+                producers[producerCount] = createProducer(factory, destination, i, array);
+                producerCount++;
             }
-            producers[i] = createProducer(factory, destination, i, array);
         }
+        con.close();
         super.setUp();
     }
 
@@ -136,10 +150,10 @@
     }
 
     public void testPerformance() throws JMSException, InterruptedException {
-        for (int i = 0; i < numberOfConsumers; i++) {
+        for (int i = 0; i < totalNumberOfConsumers; i++) {
             consumers[i].start();
         }
-        for (int i = 0; i < numberofProducers; i++) {
+        for (int i = 0; i < totalNumberOfProducers; i++) {
             producers[i].start();
         }
         LOG.info("Sampling performance " + samepleCount + " times at a " + sampleInternal
+ " ms interval.");
@@ -148,10 +162,10 @@
             dumpProducerRate();
             dumpConsumerRate();
         }
-        for (int i = 0; i < numberofProducers; i++) {
+        for (int i = 0; i < totalNumberOfProducers; i++) {
             producers[i].stop();
         }
-        for (int i = 0; i < numberOfConsumers; i++) {
+        for (int i = 0; i < totalNumberOfConsumers; i++) {
             consumers[i].stop();
         }
     }
@@ -159,30 +173,36 @@
     protected void dumpProducerRate() {
         int totalRate = 0;
         int totalCount = 0;
+        String producerString="Producers:";
         for (int i = 0; i < producers.length; i++) {
             PerfRate rate = producers[i].getRate().cloneAndReset();
             totalRate += rate.getRate();
             totalCount += rate.getTotalCount();
+            producerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];";
         }
         if (producers != null && producers.length > 0) {
             int avgRate = totalRate / producers.length;
             System.out.println("Avg producer rate = " + avgRate
                     + " msg/sec | Total rate = " + totalRate + ", sent = "
                     + totalCount);
+           // System.out.println(producerString);
         }
     }
 
     protected void dumpConsumerRate() {
         int totalRate = 0;
         int totalCount = 0;
+        String consumerString="Consumers:";
         for (int i = 0; i < consumers.length; i++) {
             PerfRate rate = consumers[i].getRate().cloneAndReset();
             totalRate += rate.getRate();
             totalCount += rate.getTotalCount();
+            consumerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];";
         }
         if (consumers != null && consumers.length > 0) {
             int avgRate = totalRate / consumers.length;
             System.out.println("Avg consumer rate = " + avgRate + " msg/sec | Total rate
= " + totalRate + ", received = " + totalCount);
+            System.out.println(consumerString);
         }
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
Tue Apr  8 05:43:36 2008
@@ -33,20 +33,15 @@
 public class SlowConsumerTopicTest extends SimpleTopicTest {
 
     protected PerfConsumer[] slowConsumers;
-    protected int numberOfSlowConsumers = 1;
-
+    
     protected void setUp() throws Exception {
-        numberOfConsumers = 0;
+        
         playloadSize = 10 * 1024;
         super.setUp();
-        slowConsumers = new SlowConsumer[numberOfSlowConsumers];
-        for (int i = 0; i < numberOfSlowConsumers; i++) {
-            slowConsumers[i] = createSlowConsumer(factory, destination, i);
-            slowConsumers[i].start();
-        }
     }
+   
 
-    protected PerfConsumer createSlowConsumer(ConnectionFactory fac, Destination dest, int
number) throws JMSException {
+    protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number)
throws JMSException {
         return new SlowConsumer(fac, dest);
     }
 



Mime
View raw message