activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r476189 - in /incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf: PerfProducer.java PerfRate.java SimpleDurableTopicTest.java SimpleNonPersistentQueueTest.java SimpleNonPersistentTopicTest.java SimpleTopicTest.java
Date Fri, 17 Nov 2006 16:04:58 GMT
Author: chirino
Date: Fri Nov 17 08:04:57 2006
New Revision: 476189

URL: http://svn.apache.org/viewvc?view=rev&rev=476189
Log:
Ehanced these performance tests so that the producers are running in their own threads.
Also change the dumping of stats to be time based and not based on the number of messages
received.

Modified:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfRate.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentTopicTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java?view=diff&rev=476189&r1=476188&r2=476189
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
Fri Nov 17 08:04:57 2006
@@ -17,44 +17,79 @@
  */
 package org.apache.activemq.perf;
 
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 /**
  * @version $Revision: 1.3 $
  */
-public class PerfProducer{
+public class PerfProducer implements Runnable {
     protected Connection connection;
     protected MessageProducer producer;
     protected PerfRate rate=new PerfRate();
-    public PerfProducer(ConnectionFactory fac,Destination dest) throws JMSException{
+	private byte[] payload;
+	private Session session;
+	private final CountDownLatch stopped = new CountDownLatch(1);
+	private boolean running;
+	
+    public PerfProducer(ConnectionFactory fac,Destination dest, byte[] palyload) throws JMSException{
         connection=fac.createConnection();
-        Session s=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        producer=s.createProducer(dest);
+        session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        producer=session.createProducer(dest);
+        this.payload = palyload;
     }
+    
     public void setDeliveryMode(int mode) throws JMSException{
         producer.setDeliveryMode(mode);
     }
-    public void start() throws JMSException{
-        connection.start();
-        rate.getRate();
-    }
-    public void stop() throws JMSException{
-        connection.stop();
-    }
+    
     public void shutDown() throws JMSException{
         connection.close();
     }
-    public void sendMessage(Message msg) throws JMSException{
-        producer.send(msg);
-        rate.increment();
-    }
+
     public PerfRate getRate(){
         return rate;
     }
+    	
+	synchronized public void start() throws JMSException{
+		if( !running ) {
+			running = true;
+	        connection.start();
+	        new Thread(this).start(); 
+	        rate.reset();
+		}
+    }
+    public void stop() throws JMSException, InterruptedException{
+    	synchronized(this) {
+    		running=false;
+    	}
+    	stopped.await();
+        connection.stop();
+    }
+	synchronized public boolean isRunning() {
+		return running;
+	}
+	
+	public void run() {
+        try {
+			while(isRunning()){
+			    BytesMessage msg;
+			    msg=session.createBytesMessage();
+			    msg.writeBytes(payload);
+			    producer.send(msg);
+			    rate.increment();
+			}
+		} catch (Throwable e) {
+			e.printStackTrace();
+		} finally {
+			stopped.countDown();
+		}
+	}
+	
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfRate.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfRate.java?view=diff&rev=476189&r1=476188&r2=476189
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfRate.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfRate.java
Fri Nov 17 08:04:57 2006
@@ -30,26 +30,34 @@
     public int getCount(){
         return totalCount;
     }
+
     public void increment(){
         totalCount++;
         count++;
     }
-    public void start(){
-        count=0;
-        startTime=System.currentTimeMillis();
-    }
+
     public int getRate(){
         long endTime=System.currentTimeMillis();
         long totalTime=endTime-startTime;
         int result=(int) ((count*1000)/totalTime);
         return result;
     }
+    
+    /**
+     * Resets the rate sampling.
+     */
+    public void reset() {
+        count=0;
+        startTime=System.currentTimeMillis();
+    }
+
     /**
      * @return Returns the totalCount.
      */
     public int getTotalCount(){
         return totalCount;
     }
+    
     /**
      * @param totalCount
      *            The totalCount to set.

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java?view=diff&rev=476189&r1=476188&r2=476189
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
Fri Nov 17 08:04:57 2006
@@ -21,14 +21,12 @@
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
 /**
  * @version $Revision: 1.3 $
  */
 public class SimpleDurableTopicTest extends SimpleTopicTest{
-    protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number)
throws JMSException{
-        PerfProducer pp=new PerfProducer(fac,dest);
+    protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number,
byte payload[]) throws JMSException{
+        PerfProducer pp=new PerfProducer(fac,dest, payload);
         pp.setDeliveryMode(DeliveryMode.PERSISTENT);
         return pp;
     }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java?view=diff&rev=476189&r1=476188&r2=476189
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
Fri Nov 17 08:04:57 2006
@@ -26,8 +26,8 @@
  */
 public class SimpleNonPersistentQueueTest extends SimpleQueueTest{
     
-    protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number)
throws JMSException{
-        PerfProducer pp=new PerfProducer(fac,dest);
+    protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number,
byte[] payload) throws JMSException{
+        PerfProducer pp=new PerfProducer(fac,dest,payload);
         pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
         return pp;
     }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentTopicTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentTopicTest.java?view=diff&rev=476189&r1=476188&r2=476189
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentTopicTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentTopicTest.java
Fri Nov 17 08:04:57 2006
@@ -26,8 +26,8 @@
  */
 public class SimpleNonPersistentTopicTest extends SimpleTopicTest{
     
-    protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number)
throws JMSException{
-        PerfProducer pp =  new PerfProducer(fac,dest);
+    protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number,
byte[] payload) throws JMSException{
+        PerfProducer pp =  new PerfProducer(fac,dest,payload);
         pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
         return pp;
     }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?view=diff&rev=476189&r1=476188&r2=476189
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
Fri Nov 17 08:04:57 2006
@@ -17,12 +17,12 @@
  */
 package org.apache.activemq.perf;
 
-import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Session;
+
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
@@ -42,11 +42,11 @@
     protected PerfProducer[] producers;
     protected PerfConsumer[] consumers;
     protected String DESTINATION_NAME=getClass().toString();
+    protected int SAMPLE_COUNT = 10;
+    protected long SAMPLE_INTERVAL = 2000;
     protected int NUMBER_OF_CONSUMERS=1;
     protected int NUMBER_OF_PRODUCERS=1;
-    protected BytesMessage payload;
     protected int PAYLOAD_SIZE=1024;
-    protected int MESSAGE_COUNT=100000;
     protected byte[] array=null;
     protected ConnectionFactory factory;
     protected Destination destination;
@@ -60,26 +60,23 @@
         if(broker==null){
             broker=createBroker();
         }
-        array=new byte[PAYLOAD_SIZE];
-        for(int i=0;i<array.length;i++){
-            array[i]=(byte) i;
-        }
         factory=createConnectionFactory();
         Connection con=factory.createConnection();
         Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        payload=session.createBytesMessage();
-        payload.writeBytes(array);
+        
         destination=createDestination(session,DESTINATION_NAME);
         con.close();
         producers=new PerfProducer[NUMBER_OF_PRODUCERS];
         consumers=new PerfConsumer[NUMBER_OF_CONSUMERS];
         for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
             consumers[i]=createConsumer(factory,destination,i);
-            consumers[i].start();
         }
         for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
-            producers[i]=createProducer(factory,destination,i);
-            producers[i].start();
+            array=new byte[PAYLOAD_SIZE];
+            for(int j=i;j<array.length;j++){
+                array[j]=(byte) j;
+            }
+            producers[i]=createProducer(factory,destination,i,array);
         }
         super.setUp();
     }
@@ -114,8 +111,8 @@
         return answer;
     }
 
-    protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number)
throws JMSException{
-        return new PerfProducer(fac,dest);
+    protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number,
byte[] payload) throws JMSException{
+        return new PerfProducer(fac,dest,payload);
     }
 
     protected PerfConsumer createConsumer(ConnectionFactory fac,Destination dest,int number)
throws JMSException{
@@ -136,17 +133,27 @@
         return cf;
     }
 
-    public void testPerformance() throws JMSException{
-        for(int i=0;i<MESSAGE_COUNT;i++){
-            if(i%10000==0){
-                dumpProducerRate();
-                dumpConsumerRate();
-            }
-            payload.clearBody();
-            payload.writeBytes(array);
-            for(int k=0;k<producers.length;k++){
-                producers[k].sendMessage(payload);
-            }
+    public void testPerformance() throws JMSException, InterruptedException{
+    	
+        for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
+            consumers[i].start();
+        }
+        for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
+            producers[i].start();
+        }
+        
+    	log.info("Sampling performance "+SAMPLE_COUNT+" times at a "+SAMPLE_INTERVAL+" ms interval.");
+        for(int i=0; i < SAMPLE_COUNT; i++){
+        	Thread.sleep(SAMPLE_INTERVAL);
+            dumpProducerRate();
+            dumpConsumerRate();
+        }
+        
+        for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
+            producers[i].stop();
+        }
+        for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
+            consumers[i].stop();
         }
     }
 
@@ -160,21 +167,21 @@
         count=count/producers.length;
         log.info("Producer rate = "+count+" msg/sec total count = "+totalCount);
         for(int i=0;i<producers.length;i++){
-            producers[i].getRate().start();
+            producers[i].getRate().reset();
         }
     }
 
     protected void dumpConsumerRate(){
-        int count=0;
+        int rate=0;
         int totalCount=0;
         for(int i=0;i<consumers.length;i++){
-            count+=consumers[i].getRate().getRate();
+            rate+=consumers[i].getRate().getRate();
             totalCount+=consumers[i].getRate().getTotalCount();
         }
-        count=count/consumers.length;
-        log.info("Consumer rate = "+count+" msg/sec total count = "+totalCount);
+        rate=rate/consumers.length;
+        log.info("Consumer rate = "+rate+" msg/sec total count = "+totalCount);
         for(int i=0;i<consumers.length;i++){
-            consumers[i].getRate().start();
+            consumers[i].getRate().reset();
         }
     }
 }



Mime
View raw message