qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raj...@apache.org
Subject svn commit: r1146508 - in /qpid/trunk/qpid/java/tools: bin/ src/main/java/org/apache/qpid/tools/
Date Wed, 13 Jul 2011 22:42:46 GMT
Author: rajith
Date: Wed Jul 13 22:42:46 2011
New Revision: 1146508

URL: http://svn.apache.org/viewvc?rev=1146508&view=rev
Log:
Merge branch 'QPID-3265' into trunk

Modified:
    qpid/trunk/qpid/java/tools/bin/perf_report.sh
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java

Modified: qpid/trunk/qpid/java/tools/bin/perf_report.sh
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/bin/perf_report.sh?rev=1146508&r1=1146507&r2=1146508&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/bin/perf_report.sh (original)
+++ qpid/trunk/qpid/java/tools/bin/perf_report.sh Wed Jul 13 22:42:46 2011
@@ -18,9 +18,8 @@
 # under the License.
 #
 
-# This will run the 8 use cases defined below and produce
-# a report in tabular format. Refer to the documentation
-# for more details.
+# This will run the following test cases defined below and produce
+# a report in tabular format.
 
 SUB_MEM=-Xmx1024M
 PUB_MEM=-Xmx1024M
@@ -82,7 +81,7 @@ echo "----------------------------------
 # setting very low values to start with and experiment while increasing them slowly.
 
 # Test 1 Trans Queue
-#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10"
 
 # Test 2 Dura Queue
 run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true
-Dwarmup_count=1 -Dmsg_count=10"

Modified: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java?rev=1146508&r1=1146507&r2=1146508&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java (original)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java Wed Jul 13
22:42:46 2011
@@ -21,14 +21,10 @@
 package org.apache.qpid.tools;
 
 import java.text.DecimalFormat;
-import java.util.Hashtable;
 
 import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
 
 import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQConnection;
@@ -42,13 +38,43 @@ public class PerfBase
     Destination feedbackDest;
     DecimalFormat df = new DecimalFormat("###.##");
 
+    enum MessageType {
+        BYTES, TEXT, MAP, OBJECT;
+
+        public static MessageType getType(String s) throws Exception
+        {
+            if ("text".equalsIgnoreCase(s))
+            {
+                return TEXT;
+            }
+            else if ("bytes".equalsIgnoreCase(s))
+            {
+                return BYTES;
+            }
+            /*else if ("map".equalsIgnoreCase(s))
+            {
+                return MAP;
+            }
+            else if ("object".equalsIgnoreCase(s))
+            {
+                return OBJECT;
+            }*/
+            else
+            {
+                throw new Exception("Unsupported message type");
+            }
+        }
+    };
+
+    MessageType msgType = MessageType.BYTES;
+
     public PerfBase()
     {
         params = new TestParams();
     }
 
     public void setUp() throws Exception
-    {        
+    {
 
         if (params.getHost().equals("") || params.getPort() == -1)
         {
@@ -63,6 +89,8 @@ public class PerfBase
                                     params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
 
         dest = new AMQAnyDestination(params.getAddress());
+        msgType = MessageType.getType(params.getMessageType());
+        System.out.println("Using " + msgType + " messages");
     }
 
     public void handleError(Exception e,String msg)

Modified: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java?rev=1146508&r1=1146507&r2=1146508&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java (original)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java Wed Jul
13 22:42:46 2011
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.tools;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -87,6 +91,9 @@ public class PerfConsumer extends PerfBa
     boolean transacted = false;
     int transSize = 0;
 
+    boolean printStdDev = false;
+    List<Long> sample;
+
     final Object lock = new Object();
 
     public PerfConsumer()
@@ -102,6 +109,11 @@ public class PerfConsumer extends PerfBa
         // Storing the following two for efficiency
         transacted = params.isTransacted();
         transSize = params.getTransactionSize();
+        printStdDev = params.isPrintStdDev();
+        if (printStdDev)
+        {
+            sample = new ArrayList<Long>(params.getMsgCount());
+        }
     }
 
     public void warmup()throws Exception
@@ -112,19 +124,16 @@ public class PerfConsumer extends PerfBa
         while (!start)
         {
             Message msg = consumer.receive();
-            if (msg instanceof TextMessage)
+            if (msg.getBooleanProperty("End"))
             {
-                if (((TextMessage)msg).getText().equals("End"))
+                start = true;
+                MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
+                temp.send(session.createMessage());
+                if (params.isTransacted())
                 {
-                    start = true;
-                    MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
-                    temp.send(session.createMessage());
-                    if (params.isTransacted())
-                    {
-                        session.commit();
-                    }
-                    temp.close();
+                    session.commit();
                 }
+                temp.close();
             }
         }
     }
@@ -161,9 +170,25 @@ public class PerfConsumer extends PerfBa
         System.out.println(new StringBuilder("Max Latency         : ").
                            append(maxLatency).
                            append(" ms").toString());
+        if (printStdDev)
+        {
+            System.out.println(new StringBuilder("Std Dev             : ").
+                               append(calculateStdDev(avgLatency)).toString());
+        }
         System.out.println("Completed the test......\n");
     }
 
+    public double calculateStdDev(double mean)
+    {
+        double v = 0;
+        for (double latency: sample)
+        {
+            v = v + Math.pow((latency-mean), 2);
+        }
+        v = v/sample.size();
+        return Math.round(Math.sqrt(v));
+    }
+
     public void notifyCompletion(Destination replyTo) throws Exception
     {
         MessageProducer tmp = session.createProducer(replyTo);
@@ -187,7 +212,13 @@ public class PerfConsumer extends PerfBa
     {
         try
         {
-            if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+            // To figure out the decoding overhead of text
+            if (msgType == MessageType.TEXT)
+            {
+                ((TextMessage)msg).getText();
+            }
+
+            if (msg.getBooleanProperty("End"))
             {
                 notifyCompletion(msg.getJMSReplyTo());
 
@@ -216,6 +247,10 @@ public class PerfConsumer extends PerfBa
                 maxLatency = Math.max(maxLatency, latency);
                 minLatency = Math.min(minLatency, latency);
                 totalLatency = totalLatency + latency;
+                if (printStdDev)
+                {
+                    sample.add(latency);
+                }
             }
 
         }
@@ -252,16 +287,16 @@ public class PerfConsumer extends PerfBa
                 cons.test();
             }
         };
-        
+
         Thread t;
         try
         {
-            t = Threading.getThreadFactory().createThread(r);                      
+            t = Threading.getThreadFactory().createThread(r);
         }
         catch(Exception e)
         {
             throw new Error("Error creating consumer thread",e);
         }
-        t.start(); 
+        t.start();
     }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java?rev=1146508&r1=1146507&r2=1146508&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java (original)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java Wed Jul
13 22:42:46 2011
@@ -54,6 +54,8 @@ import org.apache.qpid.thread.Threading;
  */
 public class PerfProducer extends PerfBase
 {
+    private static long SEC = 60000;
+
     MessageProducer producer;
     Message msg;
     Object payload;
@@ -63,36 +65,8 @@ public class PerfProducer extends PerfBa
     boolean durable = false;
     Random random;
     int msgSizeRange = 1024;
-
-    enum MessageType {
-        BYTES, TEXT, MAP, OBJECT;
-
-        public static MessageType getType(String s) throws Exception
-        {
-            if ("text".equalsIgnoreCase(s))
-            {
-                return TEXT;
-            }
-            else if ("bytes".equalsIgnoreCase(s))
-            {
-                return BYTES;
-            }
-            /*else if ("map".equalsIgnoreCase(s))
-            {
-                return MAP;
-            }
-            else if ("object".equalsIgnoreCase(s))
-            {
-                return OBJECT;
-            }*/
-            else
-            {
-                throw new Exception("Unsupported message type");
-            }
-        }
-    };
-
-    MessageType msgType = MessageType.BYTES;
+    boolean rateLimitProducer = false;
+    double rateFactor = 0.4;
 
     public PerfProducer()
     {
@@ -105,9 +79,11 @@ public class PerfProducer extends PerfBa
         feedbackDest = session.createTemporaryQueue();
 
         durable = params.isDurable();
-        msgType = MessageType.getType(params.getMessageType());
-
-        System.out.println("Using " + msgType + " messages");
+        rateLimitProducer = params.getRate() > 0 ? true : false;
+        if (rateLimitProducer)
+        {
+            System.out.println("The test will attempt to limit the producer to " + params.getRate()
+ " msg/sec");
+        }
 
         // if message caching is enabled we pre create the message
         // else we pre create the payload
@@ -204,7 +180,8 @@ public class PerfProducer extends PerfBa
         {
             producer.send(getNextMessage());
         }
-        Message msg = session.createTextMessage("End");
+        Message msg = session.createMessage();
+        msg.setBooleanProperty("End", true);
         msg.setJMSReplyTo(feedbackDest);
         producer.send(msg);
 
@@ -230,16 +207,30 @@ public class PerfProducer extends PerfBa
         boolean transacted = params.isTransacted();
         int tranSize =  params.getTransactionSize();
 
+        long limit = (long)(params.getRate() * rateFactor);
+        long timeLimit = (long)(SEC * rateFactor);
+
         long start = System.currentTimeMillis();
+        long interval = start;
         for(int i=0; i < count; i++ )
         {
             Message msg = getNextMessage();
-            msg.setJMSTimestamp(System.currentTimeMillis());
             producer.send(msg);
             if ( transacted && ((i+1) % tranSize == 0))
             {
                 session.commit();
             }
+
+            if (rateLimitProducer && i%limit == 0)
+            {
+                long elapsed = System.currentTimeMillis() - interval;
+                if (elapsed < timeLimit)
+                {
+                    Thread.sleep(elapsed);
+                }
+                interval = System.currentTimeMillis();
+
+            }
         }
         long time = System.currentTimeMillis() - start;
         double rate = ((double)count/(double)time)*1000;
@@ -252,7 +243,8 @@ public class PerfProducer extends PerfBa
     public void waitForCompletion() throws Exception
     {
         MessageConsumer tmp = session.createConsumer(feedbackDest);
-        Message msg = session.createTextMessage("End");
+        Message msg = session.createMessage();
+        msg.setBooleanProperty("End", true);
         msg.setJMSReplyTo(feedbackDest);
         producer.send(msg);
 

Modified: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java?rev=1146508&r1=1146507&r2=1146508&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java (original)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java Wed Jul
13 22:42:46 2011
@@ -67,6 +67,10 @@ public class TestParams
 
     private String msgType = "bytes";
 
+    private boolean printStdDev = false;
+
+    private long rate = -1;
+
     public TestParams()
     {
 
@@ -88,6 +92,8 @@ public class TestParams
         warmup_count = Integer.getInteger("warmup_count",warmup_count);
         random_msg_size = Boolean.getBoolean("random_msg_size");
         msgType = System.getProperty("msg_type","bytes");
+        printStdDev = Boolean.getBoolean("print_std_dev");
+        rate = Long.getLong("rate",-1);
     }
 
     public String getUrl()
@@ -174,4 +180,14 @@ public class TestParams
     {
         return msgType;
     }
+
+    public boolean isPrintStdDev()
+    {
+        return printStdDev;
+    }
+
+    public long getRate()
+    {
+        return rate;
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message