qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raj...@apache.org
Subject svn commit: r1152412 - in /qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools: LatencyTest.java PerfBase.java PerfConsumer.java PerfProducer.java
Date Sat, 30 Jul 2011 02:22:35 GMT
Author: rajith
Date: Sat Jul 30 02:22:35 2011
New Revision: 1152412

URL: http://svn.apache.org/viewvc?rev=1152412&view=rev
Log:
QPID-3358 Modified the producer and consumer to support multiple iterations to ensure we can
run the test for longer durations.
Also added support for creating unique destinations based on the default destination.
This makes it easy to run multiple producers and consumers with their
unique queue with little configuration. The code can make use of an externally specified prefix
 when creating these destinations, there by allowing scripts to provide meaningful names for
identifying
 queues for debuging/diagnostic purposes.

Modified:
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java

Modified: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java?rev=1152412&r1=1152411&r2=1152412&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java (original)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java Sat Jul
30 02:22:35 2011
@@ -77,7 +77,7 @@ public class LatencyTest extends PerfBas
 
     public LatencyTest()
     {
-        super();
+        super("");
         warmedUp = lock.newCondition();
         testCompleted = lock.newCondition();
         // Storing the following two for efficiency
@@ -314,7 +314,7 @@ public class LatencyTest extends PerfBas
 
     public static void main(String[] args)
     {
-        final LatencyTest latencyTest = new LatencyTest();        
+        final LatencyTest latencyTest = new LatencyTest();
         Runnable r = new Runnable()
         {
             public void run()
@@ -334,16 +334,16 @@ public class LatencyTest extends PerfBas
                 }
             }
         };
-        
+
         Thread t;
         try
         {
-            t = Threading.getThreadFactory().createThread(r);                      
+            t = Threading.getThreadFactory().createThread(r);
         }
         catch(Exception e)
         {
             throw new Error("Error creating latency test thread",e);
         }
-        t.start(); 
+        t.start();
     }
-}
\ No newline at end of file
+}

Modified: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java?rev=1152412&r1=1152411&r2=1152412&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java (original)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java Sat Jul 30
02:22:35 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.tools;
 
+import java.net.InetAddress;
 import java.text.DecimalFormat;
 import java.util.UUID;
 
@@ -32,6 +33,10 @@ import javax.jms.Session;
 
 import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
 
 public class PerfBase
 {
@@ -57,11 +62,12 @@ public class PerfBase
     Destination myControlQueue;
     Destination controllerQueue;
     DecimalFormat df = new DecimalFormat("###.##");
-    String id = UUID.randomUUID().toString();
-    String myControlQueueAddr = id + ";{create: always}";
+    String id;
+    String myControlQueueAddr;
 
     MessageProducer sendToController;
     MessageConsumer receiveFromController;
+    String prefix = "";
 
     enum OPCode {
         REGISTER_CONSUMER, REGISTER_PRODUCER,
@@ -69,7 +75,8 @@ public class PerfBase
         CONSUMER_READY, PRODUCER_READY,
         PRODUCER_START,
         RECEIVED_END_MSG, CONSUMER_STOP,
-        RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS
+        RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS,
+        CONTINUE_TEST, STOP_TEST
     };
 
     enum MessageType {
@@ -102,14 +109,24 @@ public class PerfBase
 
     MessageType msgType = MessageType.BYTES;
 
-    public PerfBase()
+    public PerfBase(String prefix)
     {
         params = new TestParams();
+        String host = "";
+        try
+        {
+            host = InetAddress.getLocalHost().getHostName();
+        }
+        catch (Exception e)
+        {
+        }
+        id = host + "-" + UUID.randomUUID().toString();
+        this.prefix = prefix;
+        this.myControlQueueAddr = id + ";{create: always}";
     }
 
     public void setUp() throws Exception
     {
-
         if (params.getHost().equals("") || params.getPort() == -1)
         {
             con = new AMQConnection(params.getUrl());
@@ -124,7 +141,7 @@ public class PerfBase
 
         controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-        dest = new AMQAnyDestination(params.getAddress());
+        dest = createDestination();
         controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
         myControlQueue = session.createQueue(myControlQueueAddr);
         msgType = MessageType.getType(params.getMessageType());
@@ -134,9 +151,38 @@ public class PerfBase
         receiveFromController = controllerSession.createConsumer(myControlQueue);
     }
 
+    private Destination createDestination() throws Exception
+    {
+        if (params.isUseUniqueDests())
+        {
+            System.out.println("Prefix : " + prefix);
+            Address addr = Address.parse(params.getAddress());
+            AMQAnyDestination temp = new AMQAnyDestination(params.getAddress());
+            int type = ((AMQSession_0_10)session).resolveAddressType(temp);
+
+            if ( type == AMQDestination.TOPIC_TYPE)
+            {
+                addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions());
+                System.out.println("Setting subject : " + addr);
+            }
+            else
+            {
+                addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions());
+                System.out.println("Setting name : " + addr);
+            }
+
+            return new AMQAnyDestination(addr);
+        }
+        else
+        {
+            return new AMQAnyDestination(params.getAddress());
+        }
+    }
+
     public synchronized void sendMessageToController(MapMessage m) throws Exception
     {
         m.setString(ID, id);
+        m.setString(REPLY_ADDR,myControlQueueAddr);
         sendToController.send(m);
     }
 
@@ -152,6 +198,14 @@ public class PerfBase
 
     }
 
+    public boolean continueTest() throws Exception
+    {
+        MapMessage m = (MapMessage)receiveFromController.receive();
+        OPCode code = OPCode.values()[m.getInt(CODE)];
+        System.out.println("Received Code : " + code);
+        return (code == OPCode.CONTINUE_TEST);
+    }
+
     public void tearDown() throws Exception
     {
         session.close();

Modified: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java?rev=1152412&r1=1152411&r2=1152412&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java (original)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java Sat Jul
30 02:22:35 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.tools;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 import javax.jms.MapMessage;
 import javax.jms.Message;
@@ -29,6 +30,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.TextMessage;
 
+import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.thread.Threading;
 
 /**
@@ -49,7 +51,7 @@ import org.apache.qpid.thread.Threading;
  * b) They are on separate machines that have their time synced via a Time Server
  *
  * In order to calculate latency the producer inserts a timestamp
- * hen the message is sent. The consumer will note the current time the message is
+ * when the message is sent. The consumer will note the current time the message is
  * received and will calculate the latency as follows
  * latency = rcvdTime - msg.getJMSTimestamp()
  *
@@ -57,13 +59,9 @@ import org.apache.qpid.thread.Threading;
  * variance in latencies.
  *
  * Avg latency is measured by adding all latencies and dividing by the total msgs.
- * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
  *
  * Throughput
  * ===========
- * System throughput is calculated as follows
- * rcvdMsgCount/(rcvdTime - testStartTime)
- *
  * Consumer rate is calculated as
  * rcvdMsgCount/(rcvdTime - startTime)
  *
@@ -83,7 +81,6 @@ public class PerfConsumer extends PerfBa
     long minLatency = Long.MAX_VALUE;
     long totalLatency = 0;  // to calculate avg latency.
     int rcvdMsgCount = 0;
-    long testStartTime = 0; // to measure system throughput
     long startTime = 0;     // to measure consumer throughput
     long rcvdTime = 0;
     boolean transacted = false;
@@ -94,9 +91,9 @@ public class PerfConsumer extends PerfBa
 
     final Object lock = new Object();
 
-    public PerfConsumer()
+    public PerfConsumer(String prefix)
     {
-        super();
+        super(prefix);
         System.out.println("Consumer ID : " + id);
     }
 
@@ -104,26 +101,20 @@ public class PerfConsumer extends PerfBa
     {
         super.setUp();
         consumer = session.createConsumer(dest);
+        System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName()
+ "\n");
 
         // Storing the following two for efficiency
         transacted = params.isTransacted();
         transSize = params.getTransactionSize();
         printStdDev = params.isPrintStdDev();
-        if (printStdDev)
-        {
-            sample = new ArrayList<Long>(params.getMsgCount());
-        }
-
         MapMessage m = controllerSession.createMapMessage();
         m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
-        m.setString(REPLY_ADDR,myControlQueueAddr);
         sendMessageToController(m);
     }
 
     public void warmup()throws Exception
     {
         receiveFromController(OPCode.CONSUMER_STARTWARMUP);
-        boolean start = false;
         Message msg = consumer.receive();
         // This is to ensure we drain the queue before we start the actual test.
         while ( msg != null)
@@ -146,12 +137,26 @@ public class PerfConsumer extends PerfBa
         MapMessage m = controllerSession.createMapMessage();
         m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
         sendMessageToController(m);
+        consumer.setMessageListener(this);
     }
 
     public void startTest() throws Exception
     {
-        System.out.println("Consumer Starting test......");
-        consumer.setMessageListener(this);
+        System.out.println("Consumer: " + id + " Starting test......" + "\n");
+        resetCounters();
+    }
+
+    public void resetCounters()
+    {
+        rcvdMsgCount = 0;
+        maxLatency = 0;
+        minLatency = Long.MAX_VALUE;
+        totalLatency = 0;
+        if (printStdDev)
+        {
+            sample = null;
+            sample = new ArrayList<Long>(params.getMsgCount());
+        }
     }
 
     public void sendResults() throws Exception
@@ -193,7 +198,6 @@ public class PerfConsumer extends PerfBa
             System.out.println(new StringBuilder("Std Dev             : ").
                                append(stdDev/Clock.convertToMiliSecs()).toString());
         }
-        System.out.println("Consumer has completed the test......\n");
     }
 
     public double calculateStdDev(double mean)
@@ -262,8 +266,15 @@ public class PerfConsumer extends PerfBa
         {
             setUp();
             warmup();
-            startTest();
-            sendResults();
+            boolean nextIteration = true;
+            while (nextIteration)
+            {
+                System.out.println("=========================================================\n");
+                System.out.println("Consumer: " + id + " starting a new iteration ......\n");
+                startTest();
+                sendResults();
+                nextIteration = continueTest();
+            }
             tearDown();
         }
         catch(Exception e)
@@ -272,26 +283,43 @@ public class PerfConsumer extends PerfBa
         }
     }
 
-    public static void main(String[] args)
+        @Override
+    public void tearDown() throws Exception
+    {
+        super.tearDown();
+    }
+
+    public static void main(String[] args) throws InterruptedException
     {
-        final PerfConsumer cons = new PerfConsumer();
-        Runnable r = new Runnable()
+        String scriptId = (args.length == 1) ? args[0] : "";
+        int conCount = Integer.getInteger("con_count",1);
+        final CountDownLatch testCompleted = new CountDownLatch(conCount);
+        for (int i=0; i < conCount; i++)
         {
-            public void run()
+
+            final PerfConsumer cons = new PerfConsumer(scriptId + i);
+            Runnable r = new Runnable()
+            {
+                public void run()
+                {
+                    cons.run();
+                    testCompleted.countDown();
+                }
+            };
+
+            Thread t;
+            try
+            {
+                t = Threading.getThreadFactory().createThread(r);
+            }
+            catch(Exception e)
             {
-                cons.run();
+                throw new Error("Error creating consumer thread",e);
             }
-        };
+            t.start();
 
-        Thread t;
-        try
-        {
-            t = Threading.getThreadFactory().createThread(r);
-        }
-        catch(Exception e)
-        {
-            throw new Error("Error creating consumer thread",e);
         }
-        t.start();
+        testCompleted.await();
+        System.out.println("Consumers have completed the test......\n");
     }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java?rev=1152412&r1=1152411&r2=1152412&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java (original)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java Sat Jul
30 02:22:35 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.tools;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
@@ -30,6 +31,7 @@ import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 
+import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.thread.Threading;
 
 /**
@@ -51,6 +53,12 @@ import org.apache.qpid.thread.Threading;
  * System throughput and latencies calculated by the PerfConsumer are more realistic
  * numbers.
  *
+ * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test
runs
+ * I have done so far, it seems quite useful to compute the producer rate as it gives an
+ * indication of how the system behaves. For ex if there is a gap between producer and consumer
rates
+ * you could clearly see the higher latencies and when producer and consumer rates are very
close,
+ * latency is good.
+ *
  */
 public class PerfProducer extends PerfBase
 {
@@ -69,9 +77,9 @@ public class PerfProducer extends PerfBa
     double rateFactor = 0.4;
     double rate = 0.0;
 
-    public PerfProducer()
+    public PerfProducer(String prefix)
     {
-        super();
+        super(prefix);
         System.out.println("Producer ID : " + id);
     }
 
@@ -114,12 +122,12 @@ public class PerfProducer extends PerfBa
         }
 
         producer = session.createProducer(dest);
+        System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName());
         producer.setDisableMessageID(params.isDisableMessageID());
         producer.setDisableMessageTimestamp(params.isDisableTimestamp());
 
         MapMessage m = controllerSession.createMapMessage();
         m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
-        m.setString(REPLY_ADDR,myControlQueueAddr);
         sendMessageToController(m);
     }
 
@@ -178,7 +186,7 @@ public class PerfProducer extends PerfBa
     public void warmup()throws Exception
     {
         receiveFromController(OPCode.PRODUCER_STARTWARMUP);
-        System.out.println("Producer Warming up......");
+        System.out.println("Producer: " + id + " Warming up......");
 
         for (int i=0; i < params.getWarmupCount() -1; i++)
         {
@@ -194,6 +202,7 @@ public class PerfProducer extends PerfBa
 
     public void startTest() throws Exception
     {
+        resetCounters();
         receiveFromController(OPCode.PRODUCER_START);
         int count = params.getMsgCount();
         boolean transacted = params.isTransacted();
@@ -236,8 +245,11 @@ public class PerfProducer extends PerfBa
                                append(df.format(rate)).
                                append(" msg/sec").
                                toString());
+    }
+
+    public void resetCounters()
+    {
 
-        System.out.println("Producer has completed the test......");
     }
 
     public void sendEndMessage() throws Exception
@@ -255,14 +267,27 @@ public class PerfProducer extends PerfBa
         sendMessageToController(msg);
     }
 
+    @Override
+    public void tearDown() throws Exception
+    {
+        super.tearDown();
+    }
+
     public void run()
     {
         try
         {
             setUp();
             warmup();
-            startTest();
-            sendResults();
+            boolean nextIteration = true;
+            while (nextIteration)
+            {
+                System.out.println("=========================================================\n");
+                System.out.println("Producer: " + id + " starting a new iteration ......\n");
+                startTest();
+                sendResults();
+                nextIteration = continueTest();
+            }
             tearDown();
         }
         catch(Exception e)
@@ -298,27 +323,36 @@ public class PerfProducer extends PerfBa
     }
 
 
-    public static void main(String[] args)
+    public static void main(String[] args) throws InterruptedException
     {
-        final PerfProducer prod = new PerfProducer();
-        prod.startControllerIfNeeded();
-        Runnable r = new Runnable()
+        String scriptId = (args.length == 1) ? args[0] : "";
+        int conCount = Integer.getInteger("con_count",1);
+        final CountDownLatch testCompleted = new CountDownLatch(conCount);
+        for (int i=0; i < conCount; i++)
         {
-            public void run()
+            final PerfProducer prod = new PerfProducer(scriptId + i);
+            prod.startControllerIfNeeded();
+            Runnable r = new Runnable()
             {
-                prod.run();
-            }
-        };
+                public void run()
+                {
+                    prod.run();
+                    testCompleted.countDown();
+                }
+            };
 
-        Thread t;
-        try
-        {
-            t = Threading.getThreadFactory().createThread(r);
-        }
-        catch(Exception e)
-        {
-            throw new Error("Error creating producer thread",e);
+            Thread t;
+            try
+            {
+                t = Threading.getThreadFactory().createThread(r);
+            }
+            catch(Exception e)
+            {
+                throw new Error("Error creating producer thread",e);
+            }
+            t.start();
         }
-        t.start();
+        testCompleted.await();
+        System.out.println("Producers have completed the test......");
     }
 }
\ No newline at end of file



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message