activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r935507 - in /activemq/trunk/assembly/src/release/example: build.xml src/ConsumerTool.java src/ProducerTool.java
Date Mon, 19 Apr 2010 09:57:54 GMT
Author: gtully
Date: Mon Apr 19 09:57:54 2010
New Revision: 935507

URL: http://svn.apache.org/viewvc?rev=935507&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2698 - patch applied with thanks; -DparallelThreads=X
now in example  producer and consumer tool

Modified:
    activemq/trunk/assembly/src/release/example/build.xml
    activemq/trunk/assembly/src/release/example/src/ConsumerTool.java
    activemq/trunk/assembly/src/release/example/src/ProducerTool.java

Modified: activemq/trunk/assembly/src/release/example/build.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/build.xml?rev=935507&r1=935506&r2=935507&view=diff
==============================================================================
--- activemq/trunk/assembly/src/release/example/build.xml (original)
+++ activemq/trunk/assembly/src/release/example/build.xml Mon Apr 19 09:57:54 2010
@@ -26,6 +26,7 @@
 	<property name="subject" value="TEST.FOO" />
 	<property name="durable" value="false" />
 	<property name="max" value="2000" />
+	<property name="parallelThreads" value="1" />
 	<property name="messageSize" value="1000" />
 	<property name="clientId" value="consumer1" />
 	<property name="producerClientId" value="null" />
@@ -92,7 +93,7 @@
                                     more information
                  receive-time-out - An integer to specify the time to wait for
                                     message consumption
- 
+                  parallelThreads - The number of parallel threads
 				
             --------------------------------------------------------				
             ant producer &lt;options&gt; - Creates a producer publishing a number
of messages
@@ -112,6 +113,9 @@
                     transacted - A boolean to specify that you want to use 
                                  transactions? 
                        verbose - Used to print out more info; the default is true
+                   messageSize - The size of the message in 1-byte characters
+               parallelThreads - The number of parallel threads
+
 		   
            --------------------------------------------------------
 		
@@ -209,6 +213,7 @@
 			<arg value="--durable=${durable}" />
 			<arg value="--maxium-messages=${max}" />
 			<arg value="--client-id=${clientId}" />
+			<arg value="--parallel-threads=${parallelThreads}" />
 			<arg value="--transacted=${transacted}" />
 			<arg value="--sleep-time=${sleepTime}" />
 			<arg value="--verbose=${verbose}"/>
@@ -230,6 +235,7 @@
 			<arg value="--persistent=${durable}" />
 			<arg value="--message-count=${max}" />
 			<arg value="--message-size=${messageSize}" />
+			<arg value="--parallel-threads=${parallelThreads}" />
 			<arg value="--time-to-live=${timeToLive}" />
 			<arg value="--sleep-time=${sleepTime}" />
 			<arg value="--transacted=${transacted}" />

Modified: activemq/trunk/assembly/src/release/example/src/ConsumerTool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java?rev=935507&r1=935506&r2=935507&view=diff
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/ConsumerTool.java (original)
+++ activemq/trunk/assembly/src/release/example/src/ConsumerTool.java Mon Apr 19 09:57:54
2010
@@ -17,6 +17,8 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Iterator;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -39,7 +41,7 @@ import org.apache.activemq.ActiveMQConne
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class ConsumerTool implements MessageListener, ExceptionListener {
+public class ConsumerTool extends Thread implements MessageListener, ExceptionListener {
 
     private boolean running;
 
@@ -47,9 +49,10 @@ public class ConsumerTool implements Mes
     private Destination destination;
     private MessageProducer replyProducer;
 
-    private boolean pauseBeforeShutdown;
+    private boolean pauseBeforeShutdown = false;
     private boolean verbose = true;
     private int maxiumMessages;
+    private static int parallelThreads = 1;
     private String subject = "TOOL.DEFAULT";
     private boolean topic;
     private String user = ActiveMQConnection.DEFAULT_USER;
@@ -64,23 +67,58 @@ public class ConsumerTool implements Mes
     private long receiveTimeOut;
 
     public static void main(String[] args) {
+        ArrayList<ConsumerTool> threads = new ArrayList();
         ConsumerTool consumerTool = new ConsumerTool();
         String[] unknown = CommandLineSupport.setOptions(consumerTool, args);
         if (unknown.length > 0) {
             System.out.println("Unknown options: " + Arrays.toString(unknown));
             System.exit(-1);
         }
-        consumerTool.run();
+        consumerTool.showParameters();
+        for (int threadCount = 1; threadCount <= parallelThreads; threadCount++) {
+            consumerTool = new ConsumerTool();
+            CommandLineSupport.setOptions(consumerTool, args);
+            consumerTool.start();
+            threads.add(consumerTool);
+        }
+
+        while (true) {
+            Iterator<ConsumerTool> itr = threads.iterator();
+            int running = 0;
+            while (itr.hasNext()) {
+                ConsumerTool thread = itr.next();
+                if (thread.isAlive()) {
+                    running++;
+                }
+            }
+
+            if (running <= 0) {
+                System.out.println("All threads completed their work");
+                break;
+            }
+
+            try {
+                Thread.sleep(1000);
+            } catch (Exception e) {
+            }
+        }
+        Iterator<ConsumerTool> itr = threads.iterator();
+        while (itr.hasNext()) {
+            ConsumerTool thread = itr.next();
+        }
+    }
+
+    public void showParameters() {
+        System.out.println("Connecting to URL: " + url);
+        System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
+        System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription");
+        System.out.println("Running " + parallelThreads + " parallel threads");
     }
 
     public void run() {
         try {
             running = true;
 
-            System.out.println("Connecting to URL: " + url);
-            System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
-            System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription");
-
             ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,
password, url);
             Connection connection = connectionFactory.createConnection();
             if (durable && clientId != null && clientId.length() > 0 &&
!"null".equals(clientId)) {
@@ -101,7 +139,7 @@ public class ConsumerTool implements Mes
 
             MessageConsumer consumer = null;
             if (durable && topic) {
-                consumer = session.createDurableSubscriber((Topic)destination, consumerName);
+                consumer = session.createDurableSubscriber((Topic) destination, consumerName);
             } else {
                 consumer = session.createConsumer(destination);
             }
@@ -117,7 +155,7 @@ public class ConsumerTool implements Mes
             }
 
         } catch (Exception e) {
-            System.out.println("Caught: " + e);
+            System.out.println("[" + this.getName() + "] Caught: " + e);
             e.printStackTrace();
         }
     }
@@ -126,19 +164,19 @@ public class ConsumerTool implements Mes
         try {
 
             if (message instanceof TextMessage) {
-                TextMessage txtMsg = (TextMessage)message;
+                TextMessage txtMsg = (TextMessage) message;
                 if (verbose) {
 
                     String msg = txtMsg.getText();
-                    if (msg.length() > 50) {
+                    int length = msg.length();
+                    if (length > 50) {
                         msg = msg.substring(0, 50) + "...";
                     }
-
-                    System.out.println("Received: " + msg);
+                    System.out.println("[" + this.getName() + "] Received: '" + msg + "'
(length " + length + ")");
                 }
             } else {
                 if (verbose) {
-                    System.out.println("Received: " + message);
+                    System.out.println("[" + this.getName() + "] Received: '" + message +
"'");
                 }
             }
 
@@ -153,7 +191,7 @@ public class ConsumerTool implements Mes
             }
 
         } catch (JMSException e) {
-            System.out.println("Caught: " + e);
+            System.out.println("[" + this.getName() + "] Caught: " + e);
             e.printStackTrace();
         } finally {
             if (sleepTime > 0) {
@@ -166,7 +204,7 @@ public class ConsumerTool implements Mes
     }
 
     public synchronized void onException(JMSException ex) {
-        System.out.println("JMS Exception occured.  Shutting down client.");
+        System.out.println("[" + this.getName() + "] JMS Exception occured.  Shutting down
client.");
         running = false;
     }
 
@@ -174,8 +212,10 @@ public class ConsumerTool implements Mes
         return running;
     }
 
-    protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer
consumer) throws JMSException, IOException {
-        System.out.println("We are about to wait until we consume: " + maxiumMessages + "
message(s) then we will shutdown");
+    protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer
consumer) throws JMSException,
+            IOException {
+        System.out.println("[" + this.getName() + "] We are about to wait until we consume:
" + maxiumMessages
+                + " message(s) then we will shutdown");
 
         for (int i = 0; i < maxiumMessages && isRunning();) {
             Message message = consumer.receive(1000);
@@ -184,30 +224,32 @@ public class ConsumerTool implements Mes
                 onMessage(message);
             }
         }
-        System.out.println("Closing connection");
+        System.out.println("[" + this.getName() + "] Closing connection");
         consumer.close();
         session.close();
         connection.close();
         if (pauseBeforeShutdown) {
-            System.out.println("Press return to shut down");
+            System.out.println("[" + this.getName() + "] Press return to shut down");
             System.in.read();
         }
     }
 
-    protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer
consumer, long timeout) throws JMSException, IOException {
-        System.out.println("We will consume messages while they continue to be delivered
within: " + timeout + " ms, and then we will shutdown");
+    protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer
consumer, long timeout)
+            throws JMSException, IOException {
+        System.out.println("[" + this.getName() + "] We will consume messages while they
continue to be delivered within: " + timeout
+                + " ms, and then we will shutdown");
 
         Message message;
         while ((message = consumer.receive(timeout)) != null) {
             onMessage(message);
         }
 
-        System.out.println("Closing connection");
+        System.out.println("[" + this.getName() + "] Closing connection");
         consumer.close();
         session.close();
         connection.close();
         if (pauseBeforeShutdown) {
-            System.out.println("Press return to shut down");
+            System.out.println("[" + this.getName() + "] Press return to shut down");
             System.in.read();
         }
     }
@@ -263,6 +305,13 @@ public class ConsumerTool implements Mes
         this.subject = subject;
     }
 
+    public void setParallelThreads(int parallelThreads) {
+        if (parallelThreads < 1) {
+            parallelThreads = 1;
+        }
+        this.parallelThreads = parallelThreads;
+    }
+
     public void setTopic(boolean topic) {
         this.topic = topic;
     }
@@ -286,5 +335,4 @@ public class ConsumerTool implements Mes
     public void setVerbose(boolean verbose) {
         this.verbose = verbose;
     }
-
 }

Modified: activemq/trunk/assembly/src/release/example/src/ProducerTool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/ProducerTool.java?rev=935507&r1=935506&r2=935507&view=diff
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/ProducerTool.java (original)
+++ activemq/trunk/assembly/src/release/example/src/ProducerTool.java Mon Apr 19 09:57:54
2010
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.Iterator;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -33,13 +35,14 @@ import org.apache.activemq.util.IndentPr
  * 
  * @version $Revision: 1.2 $
  */
-public class ProducerTool {
+public class ProducerTool extends Thread {
 
     private Destination destination;
     private int messageCount = 10;
     private long sleepTime;
     private boolean verbose = true;
     private int messageSize = 255;
+    private static int parallelThreads = 1;
     private long timeToLive;
     private String user = ActiveMQConnection.DEFAULT_USER;
     private String password = ActiveMQConnection.DEFAULT_PASSWORD;
@@ -48,28 +51,59 @@ public class ProducerTool {
     private boolean topic;
     private boolean transacted;
     private boolean persistent;
+    private static Object lockResults = new Object();
 
     public static void main(String[] args) {
+        ArrayList<ProducerTool> threads = new ArrayList();
         ProducerTool producerTool = new ProducerTool();
         String[] unknown = CommandLineSupport.setOptions(producerTool, args);
         if (unknown.length > 0) {
             System.out.println("Unknown options: " + Arrays.toString(unknown));
             System.exit(-1);
         }
-        producerTool.run();
+        producerTool.showParameters();
+        for (int threadCount = 1; threadCount <= parallelThreads; threadCount++) {
+            producerTool = new ProducerTool();
+            CommandLineSupport.setOptions(producerTool, args);
+            producerTool.start();
+            threads.add(producerTool);
+        }
+
+        while (true) {
+            Iterator<ProducerTool> itr = threads.iterator();
+            int running = 0;
+            while (itr.hasNext()) {
+                ProducerTool thread = itr.next();
+                if (thread.isAlive()) {
+                    running++;
+                }
+            }
+            if (running <= 0) {
+                System.out.println("All threads completed their work");
+                break;
+            }
+            try {
+                Thread.sleep(1000);
+            } catch (Exception e) {
+            }
+        }
+    }
+
+    public void showParameters() {
+        System.out.println("Connecting to URL: " + url);
+        System.out.println("Publishing a Message with size " + messageSize + " to " + (topic
? "topic" : "queue") + ": " + subject);
+        System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + "
messages");
+        System.out.println("Sleeping between publish " + sleepTime + " ms");
+        System.out.println("Running " + parallelThreads + " parallel threads");
+
+        if (timeToLive != 0) {
+            System.out.println("Messages time to live " + timeToLive + " ms");
+        }
     }
 
     public void run() {
         Connection connection = null;
         try {
-            System.out.println("Connecting to URL: " + url);
-            System.out.println("Publishing a Message with size " + messageSize + " to " +
(topic ? "topic" : "queue") + ": " + subject);
-            System.out.println("Using " + (persistent ? "persistent" : "non-persistent")
+ " messages");
-            System.out.println("Sleeping between publish " + sleepTime + " ms");
-            if (timeToLive != 0) {
-                System.out.println("Messages time to live " + timeToLive + " ms");
-            }
-
             // Create the connection.
             ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,
password, url);
             connection = connectionFactory.createConnection();
@@ -97,15 +131,16 @@ public class ProducerTool {
             // Start sending messages
             sendLoop(session, producer);
 
-            System.out.println("Done.");
+            System.out.println("[" + this.getName() + "] Done.");
 
-            // Use the ActiveMQConnection interface to dump the connection
-            // stats.
-            ActiveMQConnection c = (ActiveMQConnection)connection;
-            c.getConnectionStats().dump(new IndentPrinter());
+            synchronized (lockResults) {
+                ActiveMQConnection c = (ActiveMQConnection) connection;
+                System.out.println("[" + this.getName() + "] Results:\n");
+                c.getConnectionStats().dump(new IndentPrinter());
+            }
 
         } catch (Exception e) {
-            System.out.println("Caught: " + e);
+            System.out.println("[" + this.getName() + "] Caught: " + e);
             e.printStackTrace();
         } finally {
             try {
@@ -126,18 +161,17 @@ public class ProducerTool {
                 if (msg.length() > 50) {
                     msg = msg.substring(0, 50) + "...";
                 }
-                System.out.println("Sending message: " + msg);
+                System.out.println("[" + this.getName() + "] Sending message: '" + msg +
"'");
             }
 
             producer.send(message);
+
             if (transacted) {
+                System.out.println("[" + this.getName() + "] Committing " + messageCount
+ " messages");
                 session.commit();
             }
-
             Thread.sleep(sleepTime);
-
         }
-
     }
 
     private String createMessageText(int index) {
@@ -180,6 +214,13 @@ public class ProducerTool {
         this.timeToLive = timeToLive;
     }
 
+    public void setParallelThreads(int parallelThreads) {
+        if (parallelThreads < 1) {
+            parallelThreads = 1;
+        }
+        this.parallelThreads = parallelThreads;
+    }
+
     public void setTopic(boolean topic) {
         this.topic = topic;
     }



Mime
View raw message