activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r916765 - /activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/systest/StompLoadTest.java
Date Fri, 26 Feb 2010 17:17:32 GMT
Author: chirino
Date: Fri Feb 26 17:17:32 2010
New Revision: 916765

URL: http://svn.apache.org/viewvc?rev=916765&view=rev
Log:
better load test client.. clean up even on failure and provide a throughput report during
the test

Modified:
    activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/systest/StompLoadTest.java

Modified: activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/systest/StompLoadTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/systest/StompLoadTest.java?rev=916765&r1=916764&r2=916765&view=diff
==============================================================================
--- activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/systest/StompLoadTest.java
(original)
+++ activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/systest/StompLoadTest.java
Fri Feb 26 17:17:32 2010
@@ -3,6 +3,7 @@
 import java.net.Socket;
 import java.net.URI;
 import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import junit.framework.TestCase;
 
@@ -11,6 +12,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import static java.lang.String.*;
+import static java.util.concurrent.TimeUnit.*;
+
 /**
  * 
  * Simulates load on the Stomp connector. All producers/consumers open/close a
@@ -27,11 +31,15 @@
     final int producerSleep = 10;
     final int consumerSleep = 10;
     final int msgCount = 10000;
-    final int producerCount = 5;
-    final int consumerCount = 5;
+    final int producerCount = 10;
+    final int consumerCount = 10;
     final int testTime = 30 * 60 * 1000;
-    final String bindAddress = "stomp://0.0.0.0:61612";
+    final int sampleInterval = 5 * 1000;
+    final String bindAddress = "stomp://0.0.0.0:61613";
 
+    AtomicLong producerCounter = new AtomicLong();
+    AtomicLong consumerCounter = new AtomicLong();
+    
     public void testLoad() throws Exception {
 
         for (int i = 0; i < producerCount; i++) {
@@ -44,15 +52,29 @@
             consumerThread.start();
         }
 
-        Thread.sleep(testTime);
+        int samples = testTime/sampleInterval;
+        long start = System.nanoTime();
+        for( int i=0; i < samples; i++ ) {
+            Thread.sleep(sampleInterval);
+            long end = System.nanoTime();
+            printRate("Producer", producerCounter, end-start);
+            printRate("Consumer", consumerCounter, end-start);
+            start = end;
+        }
+    }
+
+    static final long NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS);
+    
+    private void printRate(String name, AtomicLong counter, long nanos) {
+        long c = counter.getAndSet(0);
+        float rate_per_second = ((1.0f*c/nanos)*NANOS_PER_SECOND);
+        LOG.info(format("%s rate: %,.3f per second", name, rate_per_second));
     }
 
-    public StompConnection createConnection() throws Exception {
-        StompConnection conn = new StompConnection();
+    public void connect(StompConnection conn) throws Exception {
         URI connectUri = new URI(bindAddress);
         conn.open(new Socket(connectUri.getHost(), connectUri.getPort()));
         conn.connect("", "");
-        return conn;
     }
 
     class ProducerThread extends Thread {
@@ -65,15 +87,20 @@
 
         public void run() {
             for (int i = 0; i < msgCount; i++) {
+                StompConnection conn = new StompConnection();
                 try {
-                    StompConnection conn = createConnection();
-                    String msg = "test message " + i;
-                    LOG.info(name + " sending " + msg);
+                    connect(conn);
+                    String msg = "Message #" + i+" from "+name;
                     conn.send("/queue/test", msg);
-                    conn.disconnect();
+                    producerCounter.incrementAndGet();
                     Thread.sleep(producerSleep);
                 } catch (Exception e) {
                     e.printStackTrace();
+                } finally {
+                    try {
+                        conn.disconnect();
+                    } catch (Exception ignore) {
+                    }
                 }
             }
         }
@@ -89,18 +116,27 @@
 
         public void run() {
             for (int i = 0; i < msgCount; i++) {
+                StompConnection conn = new StompConnection();
                 try {
-                    StompConnection conn = createConnection();
+                    connect(conn);
                     HashMap<String, String> headers = new HashMap<String, String>();
                     headers.put("activemq.prefetchSize", "1");
                     conn.subscribe("/queue/test", "client", headers);
-                    StompFrame frame = conn.receive(1000);
+                    StompFrame frame = conn.receive(1*1000);
                     conn.ack(frame);
-                    LOG.info(name + " received " + frame.getBody());
-                    conn.disconnect();
+                    consumerCounter.incrementAndGet();
                     Thread.sleep(consumerSleep);
                 } catch (Exception e) {
                     e.printStackTrace();
+                } finally {
+                    try {
+                        conn.disconnect();
+                    } catch (Exception ignore) {
+                    }
+                    try {
+                        conn.close();
+                    } catch (Exception ignore) {
+                    }
                 }
             }
         }



Mime
View raw message