Return-Path: Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: (qmail 2644 invoked from network); 16 Apr 2010 05:21:35 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 16 Apr 2010 05:21:35 -0000 Received: (qmail 75875 invoked by uid 500); 16 Apr 2010 05:21:35 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 75688 invoked by uid 500); 16 Apr 2010 05:21:35 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 75679 invoked by uid 99); 16 Apr 2010 05:21:34 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Apr 2010 05:21:34 +0000 X-ASF-Spam-Status: No, hits=0.8 required=10.0 tests=DATE_IN_PAST_12_24,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: local policy) Received: from [85.214.52.93] (HELO 256bit.org) (85.214.52.93) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Apr 2010 05:21:24 +0000 Received: from dslb-092-074-129-200.pools.arcor-ip.net ([92.74.129.200] helo=[192.168.0.82]) by 256bit.org with esmtpsa (TLS1.0:DHE_RSA_AES_256_CBC_SHA1:32) (Exim 4.69) (envelope-from ) id 1O2dzC-0002qc-J4 for dev@activemq.apache.org; Fri, 16 Apr 2010 07:21:03 +0200 Message-ID: <4BC74A2F.8070203@256bit.org> Date: Thu, 15 Apr 2010 19:17:35 +0200 From: =?ISO-8859-15?Q?Marc_Sch=F6chlin?= User-Agent: Thunderbird 2.0.0.24 (X11/20100411) MIME-Version: 1.0 To: dev@activemq.apache.org Subject: [PATCH] ActiveMQ Examples/Testcode Content-Type: multipart/mixed; boundary="------------050106050304090401050504" X-Spam-Level: - X-Virus-Checked: Checked by ClamAV on apache.org X-Old-Spam-Flag: X-Old-Spam-Status: No, score=-1.1 --------------050106050304090401050504 Content-Type: text/plain; charset=ISO-8859-15 Content-Transfer-Encoding: 7bit Hello Developers, i made some trivial enhancements to the ActiveMq examples. This patch introduces parallel enqueueing/dequeuing of messages (without connection pooling). I used this tool for benchmarking/verifying/testing my ActiveMQ configuration - maybe also other users are interested in this. Example: --- ant producer -DparallelThreads=60 -Dmax=2000 -DmessageSize=1024 ant consumer -DparallelThreads=100 -Dmax=2000 --- The patch is based on the following release: --- $ LANG=C svn info Path: . URL: https://svn.apache.org/repos/asf/activemq/trunk Repository Root: https://svn.apache.org/repos/asf Repository UUID: 13f79535-47bb-0310-9956-ffa450edef68 Revision: 932332 --- You can apply this, i accept the Apache licence. Regards Marc Schoechlin --------------050106050304090401050504 Content-Type: text/x-diff; name="examples.diff" Content-Transfer-Encoding: 7bit Content-Disposition: inline; filename="examples.diff" Index: assembly/src/release/example/src/ProducerTool.java =================================================================== --- assembly/src/release/example/src/ProducerTool.java (Revision 932332) +++ assembly/src/release/example/src/ProducerTool.java (Arbeitskopie) @@ -15,7 +15,9 @@ * limitations under the License. */ import java.util.Arrays; +import java.util.ArrayList; import java.util.Date; +import java.util.Iterator; import javax.jms.Connection; import javax.jms.DeliveryMode; @@ -33,13 +35,14 @@ * * @version $Revision: 1.2 $ */ -public class ProducerTool { +public class ProducerTool extends Thread{ private Destination destination; private int messageCount = 10; private long sleepTime; private boolean verbose = true; private int messageSize = 255; + private static int parallelThreads = 1; private long timeToLive; private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; @@ -48,28 +51,60 @@ private boolean topic; private boolean transacted; private boolean persistent; + private static Object lockResults = new Object(); public static void main(String[] args) { + ArrayList threads = new ArrayList(); ProducerTool producerTool = new ProducerTool(); String[] unknown = CommandLineSupport.setOptions(producerTool, args); if (unknown.length > 0) { System.out.println("Unknown options: " + Arrays.toString(unknown)); System.exit(-1); } - producerTool.run(); + producerTool.showParameters(); + for (int threadCount=1; threadCount <= parallelThreads; threadCount++){ + producerTool = new ProducerTool(); + CommandLineSupport.setOptions(producerTool, args); + producerTool.start(); + threads.add(producerTool); + } + + while(true){ + Iterator itr = threads.iterator(); + int running = 0; + while (itr.hasNext()) { + ProducerTool thread = itr.next(); + if (thread.isAlive()){ + running++; + } + } + if (running <= 0){ + System.out.println("All threads completed their work"); + break; + } + try{ + Thread.sleep(1000); + }catch(Exception e){ + } + } } + public void showParameters(){ + System.out.println("Connecting to URL: " + url); + System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject); + System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages"); + System.out.println("Sleeping between publish " + sleepTime + " ms"); + System.out.println("Running "+parallelThreads+" parallel threads"); + + if (timeToLive != 0) { + System.out.println("Messages time to live " + timeToLive + " ms"); + } + } + + public void run() { Connection connection = null; try { - System.out.println("Connecting to URL: " + url); - System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject); - System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages"); - System.out.println("Sleeping between publish " + sleepTime + " ms"); - if (timeToLive != 0) { - System.out.println("Messages time to live " + timeToLive + " ms"); - } - // Create the connection. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); @@ -97,15 +132,16 @@ // Start sending messages sendLoop(session, producer); - System.out.println("Done."); + System.out.println("["+this.getName()+"] Done."); - // Use the ActiveMQConnection interface to dump the connection - // stats. - ActiveMQConnection c = (ActiveMQConnection)connection; - c.getConnectionStats().dump(new IndentPrinter()); + synchronized(lockResults){ + ActiveMQConnection c = (ActiveMQConnection)connection; + System.out.println("***** Results of "+this.getName()+"\n"); + c.getConnectionStats().dump(new IndentPrinter()); + } } catch (Exception e) { - System.out.println("Caught: " + e); + System.out.println("["+this.getName()+"] Caught: " + e); e.printStackTrace(); } finally { try { @@ -126,18 +162,17 @@ if (msg.length() > 50) { msg = msg.substring(0, 50) + "..."; } - System.out.println("Sending message: " + msg); + System.out.println("["+this.getName()+"] Sending message: '" + msg + "'"); } producer.send(message); + if (transacted) { + System.out.println("["+this.getName()+"] Committing "+messageCount+" messages"); session.commit(); } - Thread.sleep(sleepTime); - } - } private String createMessageText(int index) { @@ -180,6 +215,10 @@ this.timeToLive = timeToLive; } + public void setParallelThreads(int parallelThreads) { + if(parallelThreads < 1) parallelThreads = 1; + this.parallelThreads = parallelThreads; + } public void setTopic(boolean topic) { this.topic = topic; } Index: assembly/src/release/example/src/ConsumerTool.java =================================================================== --- assembly/src/release/example/src/ConsumerTool.java (Revision 932332) +++ assembly/src/release/example/src/ConsumerTool.java (Arbeitskopie) @@ -17,6 +17,8 @@ import java.io.IOException; import java.util.Arrays; +import java.util.ArrayList; +import java.util.Iterator; import javax.jms.Connection; import javax.jms.DeliveryMode; @@ -39,7 +41,7 @@ * * @version $Revision: 1.1.1.1 $ */ -public class ConsumerTool implements MessageListener, ExceptionListener { +public class ConsumerTool extends Thread implements MessageListener, ExceptionListener{ private boolean running; @@ -47,9 +49,10 @@ private Destination destination; private MessageProducer replyProducer; - private boolean pauseBeforeShutdown; + private boolean pauseBeforeShutdown = false; private boolean verbose = true; private int maxiumMessages; + private static int parallelThreads = 1; private String subject = "TOOL.DEFAULT"; private boolean topic; private String user = ActiveMQConnection.DEFAULT_USER; @@ -64,22 +67,58 @@ private long receiveTimeOut; public static void main(String[] args) { + ArrayList threads = new ArrayList(); ConsumerTool consumerTool = new ConsumerTool(); String[] unknown = CommandLineSupport.setOptions(consumerTool, args); if (unknown.length > 0) { System.out.println("Unknown options: " + Arrays.toString(unknown)); System.exit(-1); } - consumerTool.run(); + consumerTool.showParameters(); + for (int threadCount=1; threadCount <= parallelThreads; threadCount++){ + consumerTool = new ConsumerTool(); + CommandLineSupport.setOptions(consumerTool, args); + consumerTool.start(); + threads.add(consumerTool); + } + + while(true){ + Iterator itr = threads.iterator(); + int running = 0; + while (itr.hasNext()) { + ConsumerTool thread = itr.next(); + if (thread.isAlive()){ + running++; + } + } + + if (running <= 0){ + System.out.println("All threads completed their work"); + break; + } + + try{ + Thread.sleep(1000); + }catch(Exception e){ + } + } + Iterator itr = threads.iterator(); + while (itr.hasNext()) { + ConsumerTool thread = itr.next(); + } } + public void showParameters(){ + System.out.println("Connecting to URL: " + url); + System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject); + System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription"); + System.out.println("Running "+parallelThreads+" parallel threads"); + } + public void run() { try { running = true; - System.out.println("Connecting to URL: " + url); - System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject); - System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription"); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); Connection connection = connectionFactory.createConnection(); @@ -117,7 +156,7 @@ } } catch (Exception e) { - System.out.println("Caught: " + e); + System.out.println("["+this.getName()+"] Caught: " + e); e.printStackTrace(); } } @@ -130,15 +169,15 @@ if (verbose) { String msg = txtMsg.getText(); - if (msg.length() > 50) { + int length = msg.length(); + if (length > 50) { msg = msg.substring(0, 50) + "..."; } - - System.out.println("Received: " + msg); + System.out.println("["+this.getName()+"] Received: '" + msg + "' (length "+length+")"); } } else { if (verbose) { - System.out.println("Received: " + message); + System.out.println("["+this.getName()+"] Received: '" + message +"'"); } } @@ -153,7 +192,7 @@ } } catch (JMSException e) { - System.out.println("Caught: " + e); + System.out.println("["+this.getName()+"] Caught: " + e); e.printStackTrace(); } finally { if (sleepTime > 0) { @@ -166,7 +205,7 @@ } public synchronized void onException(JMSException ex) { - System.out.println("JMS Exception occured. Shutting down client."); + System.out.println("["+this.getName()+"] JMS Exception occured. Shutting down client."); running = false; } @@ -175,7 +214,7 @@ } protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException { - System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown"); + System.out.println("["+this.getName()+"] We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown"); for (int i = 0; i < maxiumMessages && isRunning();) { Message message = consumer.receive(1000); @@ -184,30 +223,30 @@ onMessage(message); } } - System.out.println("Closing connection"); + System.out.println("["+this.getName()+"] Closing connection"); consumer.close(); session.close(); connection.close(); if (pauseBeforeShutdown) { - System.out.println("Press return to shut down"); + System.out.println("["+this.getName()+"] Press return to shut down"); System.in.read(); } } protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException { - System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown"); + System.out.println("["+this.getName()+"] We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown"); Message message; while ((message = consumer.receive(timeout)) != null) { onMessage(message); } - System.out.println("Closing connection"); + System.out.println("["+this.getName()+"] Closing connection"); consumer.close(); session.close(); connection.close(); if (pauseBeforeShutdown) { - System.out.println("Press return to shut down"); + System.out.println("["+this.getName()+"] Press return to shut down"); System.in.read(); } } @@ -263,6 +302,10 @@ this.subject = subject; } + public void setParallelThreads(int parallelThreads) { + if(parallelThreads < 1) parallelThreads = 1; + this.parallelThreads = parallelThreads; + } public void setTopic(boolean topic) { this.topic = topic; } @@ -286,5 +329,4 @@ public void setVerbose(boolean verbose) { this.verbose = verbose; } - } Index: assembly/src/release/example/build.xml =================================================================== --- assembly/src/release/example/build.xml (Revision 932332) +++ assembly/src/release/example/build.xml (Arbeitskopie) @@ -26,6 +26,7 @@ + @@ -92,7 +93,7 @@ more information receive-time-out - An integer to specify the time to wait for message consumption - + parallelThreads - The number of parallel threads -------------------------------------------------------- ant producer <options> - Creates a producer publishing a number of messages @@ -112,6 +113,9 @@ transacted - A boolean to specify that you want to use transactions? verbose - Used to print out more info; the default is true + messageSize - The size of the message in 1-byte characters + parallelThreads - The number of parallel threads + -------------------------------------------------------- @@ -209,6 +213,7 @@ + @@ -230,6 +235,7 @@ + --------------050106050304090401050504--