Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 98470 invoked from network); 8 Apr 2008 12:44:14 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Apr 2008 12:44:14 -0000 Received: (qmail 58712 invoked by uid 500); 8 Apr 2008 12:44:14 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 58684 invoked by uid 500); 8 Apr 2008 12:44:14 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 58675 invoked by uid 99); 8 Apr 2008 12:44:14 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Apr 2008 05:44:14 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Apr 2008 12:43:29 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 20AA51A9832; Tue, 8 Apr 2008 05:43:48 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r645881 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/ Date: Tue, 08 Apr 2008 12:43:38 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080408124348.20AA51A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Tue Apr 8 05:43:36 2008 New Revision: 645881 URL: http://svn.apache.org/viewvc?rev=645881&view=rev Log: Updated to support multiple destinations Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java?rev=645881&r1=645880&r2=645881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java Tue Apr 8 05:43:36 2008 @@ -38,7 +38,7 @@ protected Connection connection; protected MessageConsumer consumer; protected long sleepDuration; - protected boolean enableAudit = true; + protected boolean enableAudit = false; protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(16 * 1024,20); protected PerfRate rate = new PerfRate(); @@ -82,7 +82,7 @@ if (enableAudit && !this.audit.isInOrder(msg.getJMSMessageID())) { LOG.error("Message out of order!!" + msg); } - if (this.audit.isDuplicate(msg)){ + if (enableAudit && this.audit.isDuplicate(msg)){ LOG.error("Duplicate Message!" + msg); } } catch (JMSException e1) { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java?rev=645881&r1=645880&r2=645881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java Tue Apr 8 05:43:36 2008 @@ -38,6 +38,7 @@ private Session session; private final CountDownLatch stopped = new CountDownLatch(1); private boolean running; + private int sleep = 0; public PerfProducer(ConnectionFactory fac, Destination dest, byte[] palyload) throws JMSException { connection = fac.createConnection(); @@ -93,12 +94,23 @@ msg.writeBytes(payload); producer.send(msg); rate.increment(); + if (sleep > 0) { + Thread.sleep(sleep); + } } } catch (Throwable e) { e.printStackTrace(); } finally { stopped.countDown(); } + } + + public int getSleep() { + return sleep; + } + + public void setSleep(int sleep) { + this.sleep = sleep; } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java?rev=645881&r1=645880&r2=645881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java Tue Apr 8 05:43:36 2008 @@ -61,7 +61,7 @@ factory = createConnectionFactory(bindAddress); Connection con = factory.createConnection(); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationName); + Destination destination = createDestination(session, destinationName); con.close(); for (int i = 0; i < 3; i++) { Connection connection = factory.createConnection(); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java?rev=645881&r1=645880&r2=645881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java Tue Apr 8 05:43:36 2008 @@ -28,8 +28,8 @@ public class SimpleDurableTopicNetworkTest extends SimpleNetworkTest { protected void setUp() throws Exception { - numberofProducers=6; - numberOfConsumers=6; + numberofProducers=60; + numberOfConsumers=60; samepleCount=1000; playloadSize = 1024; super.setUp(); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java?rev=645881&r1=645880&r2=645881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java Tue Apr 8 05:43:36 2008 @@ -25,6 +25,15 @@ * @version $Revision: 1.3 $ */ public class SimpleDurableTopicTest extends SimpleTopicTest { + + protected void setUp() throws Exception { + numberOfDestinations=6; + numberOfConsumers = 1; + numberofProducers = 1; + samepleCount=1000; + playloadSize = 1024; + super.setUp(); + } protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte payload[]) throws JMSException { PerfProducer pp = new PerfProducer(fac, dest, payload); pp.setDeliveryMode(DeliveryMode.PERSISTENT); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java?rev=645881&r1=645880&r2=645881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java Tue Apr 8 05:43:36 2008 @@ -17,9 +17,11 @@ package org.apache.activemq.perf; import javax.jms.Connection; -import javax.jms.ConnectionFactory; +import javax.jms.Destination; import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.network.NetworkConnector; import org.apache.commons.logging.Log; @@ -29,45 +31,58 @@ public class SimpleNetworkTest extends SimpleTopicTest { private static final Log LOG = LogFactory.getLog(SimpleNetworkTest.class); - protected String consumerBindAddress = "tcp://localhost:61616"; + //protected String consumerBindAddress = "tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=1000,tcp://localhost:61617?wireFormat.maxInactivityDuration=1000"; + protected String consumerBindAddress = "tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=2000&socket.tcpNoDelayEnabled=false"; protected String producerBindAddress = "tcp://localhost:61617"; protected static final String CONSUMER_BROKER_NAME = "Consumer"; protected static final String PRODUCER_BROKER_NAME = "Producer"; protected BrokerService consumerBroker; protected BrokerService producerBroker; - protected ConnectionFactory consumerFactory; - protected ConnectionFactory producerFactory; + protected ActiveMQConnectionFactory consumerFactory; + protected ActiveMQConnectionFactory producerFactory; + protected void setUp() throws Exception { if (consumerBroker == null) { - consumerBroker = createConsumerBroker(consumerBindAddress); + // consumerBroker = createConsumerBroker(consumerBindAddress); } if (producerBroker == null) { producerBroker = createProducerBroker(producerBindAddress); } - consumerFactory = createConnectionFactory("vm://"+CONSUMER_BROKER_NAME); - producerFactory = createConnectionFactory("vm://"+ PRODUCER_BROKER_NAME); - //consumerFactory = createConnectionFactory(consumerBindAddress); - //producerFactory = createConnectionFactory(producerBindAddress); + //consumerFactory = createConnectionFactory("vm://"+CONSUMER_BROKER_NAME); + //producerFactory = createConnectionFactory("vm://"+ PRODUCER_BROKER_NAME); + consumerFactory = createConnectionFactory("failover://("+consumerBindAddress + "," + producerBindAddress +")?randomize=false&backup=false"); + //consumerFactory = createConnectionFactory("failover://("+consumerBindAddress+")?backup=true"); + consumerFactory.setDispatchAsync(true); + ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy(); + policy.setQueuePrefetch(100); + consumerFactory.setPrefetchPolicy(policy); + producerFactory = createConnectionFactory(producerBindAddress); Connection con = consumerFactory.createConnection(); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationName); - LOG.info("Testing against destination: " + destination); - LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers + " consumer(s)"); - con.close(); - producers = new PerfProducer[numberofProducers]; - consumers = new PerfConsumer[numberOfConsumers]; - for (int i = 0; i < numberOfConsumers; i++) { - consumers[i] = createConsumer(consumerFactory, destination, i); - consumers[i].setSleepDuration(consumerSleepDuration); - } - for (int i = 0; i < numberofProducers; i++) { - array = new byte[playloadSize]; - for (int j = i; j < array.length; j++) { - array[j] = (byte)j; + + producers = new PerfProducer[numberofProducers*numberOfDestinations]; + consumers = new PerfConsumer[numberOfConsumers*numberOfDestinations]; + int consumerCount = 0; + int producerCount = 0; + for (int k =0; k < numberOfDestinations;k++) { + Destination destination = createDestination(session, destinationName+":"+k); + LOG.info("Testing against destination: " + destination); + for (int i = 0; i < numberOfConsumers; i++) { + consumers[consumerCount] = createConsumer(factory, destination, consumerCount); + consumers[consumerCount].setSleepDuration(consumerSleepDuration); + consumerCount++; + } + for (int i = 0; i < numberofProducers; i++) { + array = new byte[playloadSize]; + for (int j = i; j < array.length; j++) { + array[j] = (byte)j; + } + producers[producerCount] = createProducer(factory, destination, i, array); + producerCount++; } - producers[i] = createProducer(producerFactory, destination, i, array); } + con.close(); } protected void tearDown() throws Exception { @@ -96,6 +111,7 @@ } protected void configureConsumerBroker(BrokerService answer,String uri) throws Exception { + configureBroker(answer); answer.setPersistent(false); answer.setBrokerName(CONSUMER_BROKER_NAME); answer.setDeleteAllMessagesOnStartup(true); @@ -111,13 +127,22 @@ } protected void configureProducerBroker(BrokerService answer,String uri) throws Exception { + configureBroker(answer); answer.setBrokerName(PRODUCER_BROKER_NAME); + answer.setMonitorConnectionSplits(false); + //answer.setSplitSystemUsageForProducersConsumers(true); answer.setPersistent(false); answer.setDeleteAllMessagesOnStartup(true); - NetworkConnector connector = answer.addNetworkConnector("static://"+consumerBindAddress); - connector.setDuplex(true); + NetworkConnector connector = answer.addNetworkConnector("static://tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=2000"); + //connector.setNetworkTTL(3); + //connector.setDynamicOnly(true); + //connector.setDuplex(true); answer.addConnector(uri); answer.setUseShutdownHook(false); + } + + protected void configureBroker(BrokerService service) throws Exception{ + } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java?rev=645881&r1=645880&r2=645881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java Tue Apr 8 05:43:36 2008 @@ -17,23 +17,30 @@ package org.apache.activemq.perf; +import java.util.ArrayList; +import java.util.List; + import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; + public class SimpleNonPersistentQueueNetworkTest extends SimpleNetworkTest { - protected void setUp() throws Exception { - numberOfConsumers = 10; - numberofProducers = 10; + protected void setUp()throws Exception { + numberOfDestinations =20; super.setUp(); } protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException { PerfProducer pp = new PerfProducer(fac, dest, payload); - pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - pp.setTimeToLive(1000); + pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + // pp.setTimeToLive(1000); + //pp.setSleep(1); return pp; } @@ -41,11 +48,36 @@ PerfConsumer consumer = new PerfConsumer(fac, dest); boolean enableAudit = numberOfConsumers <= 1; System.out.println("Enable Audit = " + enableAudit); - consumer.setEnableAudit(enableAudit); + consumer.setEnableAudit(false); return consumer; } + public void testPerformance() throws JMSException, InterruptedException { + //Thread.sleep(5000); + super.testPerformance(); + } + protected Destination createDestination(Session s, String destinationName) throws JMSException { return s.createQueue(destinationName); + } + + protected void configureBroker(BrokerService answer) throws Exception { + answer.setPersistent(false); + answer.setMonitorConnectionSplits(true); + final List policyEntries = new ArrayList(); + final PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024 * 1024 * 100); // Set to 1 MB + entry.setOptimizedDispatch(true); + entry.setProducerFlowControl(true); + entry.setMaxPageSize(10); + entry.setLazyDispatch(false); + policyEntries.add(entry); + + + final PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(policyEntries); + answer.setDestinationPolicy(policyMap); + super.configureBroker(answer); } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java?rev=645881&r1=645880&r2=645881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java Tue Apr 8 05:43:36 2008 @@ -16,20 +16,53 @@ */ package org.apache.activemq.perf; +import java.util.ArrayList; +import java.util.List; + import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; + /** * @version $Revision: 1.3 $ */ public class SimpleNonPersistentQueueTest extends SimpleQueueTest { + protected void setUp() throws Exception { + numberOfConsumers = 10; + numberofProducers = 10; + //this.consumerSleepDuration=100; + super.setUp(); + } protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException { PerfProducer pp = new PerfProducer(fac, dest, payload); pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - pp.setTimeToLive(100); + //pp.setTimeToLive(100); return pp; + } + + protected void configureBroker(BrokerService answer,String uri) throws Exception { + answer.setPersistent(false); + final List policyEntries = new ArrayList(); + final PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024 * 1024 * 1); // Set to 1 MB + entry.setOptimizedDispatch(true); + entry.setLazyDispatch(true); + policyEntries.add(entry); + + + final PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(policyEntries); + answer.setDestinationPolicy(policyMap); + super.configureBroker(answer, uri); } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java?rev=645881&r1=645880&r2=645881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java Tue Apr 8 05:43:36 2008 @@ -31,9 +31,7 @@ } protected void setUp() throws Exception { - numberOfConsumers = 1; - numberofProducers = 2; - this.consumerSleepDuration=0; + super.setUp(); } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?rev=645881&r1=645880&r2=645881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java Tue Apr 8 05:43:36 2008 @@ -35,8 +35,8 @@ private static final Log LOG = LogFactory.getLog(SimpleTopicTest.class); protected BrokerService broker; - // protected String - // bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false"; + protected String clientURI="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false&wireFormat.maxInactivityDuration=50000"; + //protected String clientURI="tcp://localhost:61616"; protected String bindAddress="tcp://localhost:61616"; //protected String bindAddress = "tcp://localhost:61616"; //protected String bindAddress="vm://localhost?marshal=true"; @@ -46,12 +46,15 @@ protected String destinationName = getClass().getName(); protected int samepleCount = 20; protected long sampleInternal = 10000; - protected int numberOfConsumers = 1; - protected int numberofProducers = 0; + protected int numberOfDestinations=1; + protected int numberOfConsumers = 10; + protected int numberofProducers = 10; + protected int totalNumberOfProducers; + protected int totalNumberOfConsumers; protected int playloadSize = 1024; protected byte[] array; protected ConnectionFactory factory; - protected Destination destination; + protected long consumerSleepDuration=0; /** @@ -63,26 +66,37 @@ if (broker == null) { broker = createBroker(bindAddress); } - factory = createConnectionFactory(bindAddress); + factory = createConnectionFactory(clientURI); Connection con = factory.createConnection(); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationName); - LOG.info("Testing against destination: " + destination); - LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers + " consumer(s)"); - con.close(); - producers = new PerfProducer[numberofProducers]; - consumers = new PerfConsumer[numberOfConsumers]; - for (int i = 0; i < numberOfConsumers; i++) { - consumers[i] = createConsumer(factory, destination, i); - consumers[i].setSleepDuration(consumerSleepDuration); - } - for (int i = 0; i < numberofProducers; i++) { - array = new byte[playloadSize]; - for (int j = i; j < array.length; j++) { - array[j] = (byte)j; + + + LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers + " consumer(s) per " + numberOfDestinations + " Destination(s)"); + + totalNumberOfConsumers=numberOfConsumers*numberOfDestinations; + totalNumberOfProducers=numberofProducers*numberOfDestinations; + producers = new PerfProducer[totalNumberOfProducers]; + consumers = new PerfConsumer[totalNumberOfConsumers]; + int consumerCount = 0; + int producerCount = 0; + for (int k =0; k < numberOfDestinations;k++) { + Destination destination = createDestination(session, destinationName+":"+k); + LOG.info("Testing against destination: " + destination); + for (int i = 0; i < numberOfConsumers; i++) { + consumers[consumerCount] = createConsumer(factory, destination, consumerCount); + consumers[consumerCount].setSleepDuration(consumerSleepDuration); + consumerCount++; + } + for (int i = 0; i < numberofProducers; i++) { + array = new byte[playloadSize]; + for (int j = i; j < array.length; j++) { + array[j] = (byte)j; + } + producers[producerCount] = createProducer(factory, destination, i, array); + producerCount++; } - producers[i] = createProducer(factory, destination, i, array); } + con.close(); super.setUp(); } @@ -136,10 +150,10 @@ } public void testPerformance() throws JMSException, InterruptedException { - for (int i = 0; i < numberOfConsumers; i++) { + for (int i = 0; i < totalNumberOfConsumers; i++) { consumers[i].start(); } - for (int i = 0; i < numberofProducers; i++) { + for (int i = 0; i < totalNumberOfProducers; i++) { producers[i].start(); } LOG.info("Sampling performance " + samepleCount + " times at a " + sampleInternal + " ms interval."); @@ -148,10 +162,10 @@ dumpProducerRate(); dumpConsumerRate(); } - for (int i = 0; i < numberofProducers; i++) { + for (int i = 0; i < totalNumberOfProducers; i++) { producers[i].stop(); } - for (int i = 0; i < numberOfConsumers; i++) { + for (int i = 0; i < totalNumberOfConsumers; i++) { consumers[i].stop(); } } @@ -159,30 +173,36 @@ protected void dumpProducerRate() { int totalRate = 0; int totalCount = 0; + String producerString="Producers:"; for (int i = 0; i < producers.length; i++) { PerfRate rate = producers[i].getRate().cloneAndReset(); totalRate += rate.getRate(); totalCount += rate.getTotalCount(); + producerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];"; } if (producers != null && producers.length > 0) { int avgRate = totalRate / producers.length; System.out.println("Avg producer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", sent = " + totalCount); + // System.out.println(producerString); } } protected void dumpConsumerRate() { int totalRate = 0; int totalCount = 0; + String consumerString="Consumers:"; for (int i = 0; i < consumers.length; i++) { PerfRate rate = consumers[i].getRate().cloneAndReset(); totalRate += rate.getRate(); totalCount += rate.getTotalCount(); + consumerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];"; } if (consumers != null && consumers.length > 0) { int avgRate = totalRate / consumers.length; System.out.println("Avg consumer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", received = " + totalCount); + System.out.println(consumerString); } } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java?rev=645881&r1=645880&r2=645881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java Tue Apr 8 05:43:36 2008 @@ -33,20 +33,15 @@ public class SlowConsumerTopicTest extends SimpleTopicTest { protected PerfConsumer[] slowConsumers; - protected int numberOfSlowConsumers = 1; - + protected void setUp() throws Exception { - numberOfConsumers = 0; + playloadSize = 10 * 1024; super.setUp(); - slowConsumers = new SlowConsumer[numberOfSlowConsumers]; - for (int i = 0; i < numberOfSlowConsumers; i++) { - slowConsumers[i] = createSlowConsumer(factory, destination, i); - slowConsumers[i].start(); - } } + - protected PerfConsumer createSlowConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { + protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { return new SlowConsumer(fac, dest); }