activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r468913 - in /incubator/activemq/trunk/assembly/src/release: ./ example/ example/ruby/ example/src/
Date Sun, 29 Oct 2006 14:35:49 GMT
Author: chirino
Date: Sun Oct 29 06:35:48 2006
New Revision: 468913

URL: http://svn.apache.org/viewvc?view=rev&rev=468913
Log:
Freshened up the shipped examples.
  - Inlined the ToolSupport base class to make it more obvious to JMS beginers how JMS is used (less code reuse but these examples are simple enough)
  - Added CommandLineSupport class that uses introspection to handle interpreting command line options
  - Added ruby and java versions of a topic listener and publisher tools that can be used to benchmark broker performance.

Added:
    incubator/activemq/trunk/assembly/src/release/example/ruby/
    incubator/activemq/trunk/assembly/src/release/example/ruby/README.txt
    incubator/activemq/trunk/assembly/src/release/example/ruby/listener.rb   (with props)
    incubator/activemq/trunk/assembly/src/release/example/ruby/publisher.rb   (with props)
    incubator/activemq/trunk/assembly/src/release/example/src/CommnadLineSupport.java
    incubator/activemq/trunk/assembly/src/release/example/src/TopicListener.java
    incubator/activemq/trunk/assembly/src/release/example/src/TopicPublisher.java
Removed:
    incubator/activemq/trunk/assembly/src/release/example/src/ToolSupport.java
Modified:
    incubator/activemq/trunk/assembly/src/release/README.txt
    incubator/activemq/trunk/assembly/src/release/example/build.xml
    incubator/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java
    incubator/activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java
    incubator/activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java
    incubator/activemq/trunk/assembly/src/release/example/src/ProducerTool.java
    incubator/activemq/trunk/assembly/src/release/example/src/RequesterTool.java

Modified: incubator/activemq/trunk/assembly/src/release/README.txt
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/README.txt?view=diff&rev=468913&r1=468912&r2=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/README.txt (original)
+++ incubator/activemq/trunk/assembly/src/release/README.txt Sun Oct 29 06:35:48 2006
@@ -1,5 +1,5 @@
 Welcome to Apache ActiveMQ 
-=================== 
+========================== 
 Apache ActiveMQ is a high performance Apache 2.0 licenced Message Broker and JMS 1.1 implementation.
 
 

Modified: incubator/activemq/trunk/assembly/src/release/example/build.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/build.xml?view=diff&rev=468913&r1=468912&r2=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/example/build.xml (original)
+++ incubator/activemq/trunk/assembly/src/release/example/build.xml Sun Oct 29 06:35:48 2006
@@ -32,7 +32,13 @@
 	<property name="timeToLive" value="0" />
 	<property name="sleepTime" value="0" />
 	<property name="transacted" value="false" />
-
+	<property name="reply-subject" value="" />
+	<property name="verbose" value="true"/>
+	<property name="ack-mode" value="AUTO_ACKNOWLEDGE"/>
+	<property name="receive-time-out" value="0"/>
+	<property name="subscribers" value="1"/>
+	<property name="batch" value="10"/>
+	
 	<!-- for WAR -->
 	<property name="app.name" value="activemq-web" />
 	<property name="app.base.dir" location="target/${app.name}" />
@@ -72,7 +78,7 @@
 
 		<path id="javac.classpath">
 			<pathelement path="${class.dir}" />
-			<pathelement path="../conf" />
+			<pathelement path="conf" />
 			<fileset dir="../lib">
 				<include name="**/*.jar" />
 			</fileset>
@@ -141,14 +147,17 @@
 			<classpath refid="javac.classpath" />
 			<jvmarg value="-server" />
 		    <sysproperty key="activemq.home" value="${activemq.home}"/>
-			<arg value="${url}" />
-			<arg value="${topic}" />
-			<arg value="${subject}" />
-			<arg value="${durable}" />
-			<arg value="${max}" />
-			<arg value="${clientId}" />
-			<arg value="${transacted}" />
-			<arg value="${sleepTime}" />
+			<arg value="--url=${url}" />
+			<arg value="--topic=${topic}" />
+			<arg value="--subject=${subject}" />
+			<arg value="--durable=${durable}" />
+			<arg value="--maxium-messages=${max}" />
+			<arg value="--client-id=${clientId}" />
+			<arg value="--transacted=${transacted}" />
+			<arg value="--sleep-time=${sleepTime}" />
+			<arg value="--verbose=${verbose}"/>
+			<arg value="--ack-mode=${ack-mode}"/>		
+			<arg value="--receive-time-out=${receive-time-out}"/>					
 		</java>
 	</target>
 
@@ -159,16 +168,16 @@
 			<classpath refid="javac.classpath" />
 			<jvmarg value="-server" />
 		    <sysproperty key="activemq.home" value="${activemq.home}"/>
-			<arg value="${url}" />
-			<arg value="${topic}" />
-			<arg value="${subject}" />
-			<arg value="${durable}" />
-			<arg value="${max}" />
-			<arg value="${messageSize}" />
-			<arg value="${producerClientId}" />
-			<arg value="${timeToLive}" />
-			<arg value="${sleepTime}" />
-			<arg value="${transacted}" />
+			<arg value="--url=${url}" />
+			<arg value="--topic=${topic}" />
+			<arg value="--subject=${subject}" />
+			<arg value="--persistent=${durable}" />
+			<arg value="--message-count=${max}" />
+			<arg value="--message-size=${messageSize}" />
+			<arg value="--time-to-live=${timeToLive}" />
+			<arg value="--sleep-time=${sleepTime}" />
+			<arg value="--transacted=${transacted}" />
+			<arg value="--verbose=${verbose}"/>			
 		</java>
 	</target>
 
@@ -179,16 +188,18 @@
 			<classpath refid="javac.classpath" />
 			<jvmarg value="-server" />
 		    <sysproperty key="activemq.home" value="${activemq.home}"/>
-			<arg value="${url}" />
-			<arg value="${topic}" />
-			<arg value="${subject}" />
-			<arg value="${durable}" />
-			<arg value="${max}" />
-			<arg value="${messageSize}" />
-			<arg value="${producerClientId}" />
-			<arg value="${timeToLive}" />
-			<arg value="${sleepTime}" />
-			<arg value="${transacted}" />
+			<arg value="--url=${url}" />
+			<arg value="--topic=${topic}" />
+			<arg value="--subject=${subject}" />
+			<arg value="--persistent=${durable}" />
+			<arg value="--message-count=${max}" />
+			<arg value="--message-size=${messageSize}" />
+			<arg value="--client-id=${producerClientId}" />
+			<arg value="--time-to-live=${timeToLive}" />
+			<arg value="--sleep-time=${sleepTime}" />
+			<arg value="--transacted=${transacted}" />
+			<arg value="--reply-subject=${reply-subject}" />
+			<arg value="--verbose=${verbose}"/>			
 		</java>
 	</target>
 
@@ -201,5 +212,26 @@
 		    <sysproperty key="activemq.home" value="${activemq.home}"/>
 		</java>
 	</target>
-
+	
+	<target name="topic-publisher" depends="compile" description="Runs a publisher">
+		<java classname="TopicPublisher" fork="yes" maxmemory="100M">
+			<classpath refid="javac.classpath" />
+			<jvmarg value="-server" />
+			<arg value="--url=${url}" />
+			<arg value="--size=${messageSize}" />
+			<arg value="--subscribers=${subscribers}" />
+			<arg value="--messages=${max}" />
+			<arg value="--delay=${sleepTime}" />
+			<arg value="--batch=${batch}" />			
+		</java>
+	</target>	
+	
+	<target name="topic-listener" depends="compile" description="Runs a listener">
+		<java classname="TopicListener" fork="yes" maxmemory="100M">
+			<classpath refid="javac.classpath" />
+			<jvmarg value="-server" />
+			<arg value="--url=${url}" />
+		</java>
+	</target>	
+	
 </project>

Added: incubator/activemq/trunk/assembly/src/release/example/ruby/README.txt
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/ruby/README.txt?view=auto&rev=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/example/ruby/README.txt (added)
+++ incubator/activemq/trunk/assembly/src/release/example/ruby/README.txt Sun Oct 29 06:35:48 2006
@@ -0,0 +1,7 @@
+
+Prereqs
+=======
+
+- Install RubyGems see: http://docs.rubygems.org/
+- Install the stomp gem.  Run: gem install stomp
+

Added: incubator/activemq/trunk/assembly/src/release/example/ruby/listener.rb
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/ruby/listener.rb?view=auto&rev=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/example/ruby/listener.rb (added)
+++ incubator/activemq/trunk/assembly/src/release/example/ruby/listener.rb Sun Oct 29 06:35:48 2006
@@ -0,0 +1,50 @@
+#!/usr/bin/ruby
+# ------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+ 
+require 'rubygems'
+require 'stomp'
+
+@conn = Stomp::Connection.open '', '', 'localhost', 61613, false 
+@count = 0
+
+@conn.subscribe '/topic/event', { :ack =>"auto" }
+while true 
+	@msg = @conn.receive
+	if @msg.command == "MESSAGE" 
+		if @msg.body == "SHUTDOWN"
+			exit 0
+	
+		elsif @msg.body == "REPORT"
+			@diff = Time.now - @start
+			@body = "Received #{@count} in #{@diff} seconds";
+			@conn.send '/queue/response', @body, {'persistent'=>'false'}
+			@count = 0;
+		else
+			if @count == 0 
+				@start = Time.now
+			end
+
+			@count += 1;
+			if @count % 1000 == 0
+ 				$stdout.print "Received #{@count} messages.\n"
+			end
+		end
+	else
+ 		$stdout.print "#{@msg.command}: #{@msg.body}\n"
+	end
+end

Propchange: incubator/activemq/trunk/assembly/src/release/example/ruby/listener.rb
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/assembly/src/release/example/ruby/publisher.rb
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/ruby/publisher.rb?view=auto&rev=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/example/ruby/publisher.rb (added)
+++ incubator/activemq/trunk/assembly/src/release/example/ruby/publisher.rb Sun Oct 29 06:35:48 2006
@@ -0,0 +1,65 @@
+#!/usr/bin/ruby
+# ------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+ 
+require 'rubygems'
+require 'stomp'
+
+@conn = Stomp::Connection.open '', '', 'localhost', 61613, false 
+@messages = 10000
+@batches = 40
+@subscribers = 10
+@size = 256
+
+@DATA = "abcdefghijklmnopqrstuvwxyz";
+@body = "";
+for i in 0..(@size-1)
+	@body += @DATA[ i % @DATA.length,1]
+end
+
+@times = []
+@conn.subscribe '/queue/response', { :ack =>"auto" }
+
+for i in 1..(@batches)
+	@body += @DATA[ i % @DATA.length,1]
+	sleep 1 if i == 1
+
+ 	@start = Time.now	
+
+	for j in 1..@messages
+		@conn.send '/topic/event', @body, {'persistent'=>'false'}
+		$stdout.print "Sent #{j} messages\n" if j%1000==0
+	end
+	@conn.send '/topic/event', "REPORT", {'persistent'=>'false'}
+
+	@remaining = @subscribers
+	while @remaining > 0
+		@msg = @conn.receive
+		if @msg.command == "MESSAGE" 
+			@remaining -= 1
+			$stdout.print "Received report: #{@msg.body}, remaining: #{@remaining}\n"
+		else
+ 			$stdout.print "#{@msg.command}: #{@msg.body}\n"
+		end
+	end
+ 	@diff = Time.now-@start
+
+	$stdout.print "Batch #{i} of #{@batches} completed in #{@diff} seconds.\n"
+	@times[i] = @diff
+end
+
+@conn.send '/topic/event', "SHUTDOWN", {'persistent'=>'false'}

Propchange: incubator/activemq/trunk/assembly/src/release/example/ruby/publisher.rb
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/assembly/src/release/example/src/CommnadLineSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/CommnadLineSupport.java?view=auto&rev=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/example/src/CommnadLineSupport.java (added)
+++ incubator/activemq/trunk/assembly/src/release/example/src/CommnadLineSupport.java Sun Oct 29 06:35:48 2006
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+
+import org.apache.activemq.util.IntrospectionSupport;
+
+/**
+ * Helper utility that can be used to set the properties on any object
+ * using command line arguments.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class CommnadLineSupport {
+	
+	/**
+	 * Sets the properties of an object given the command line args.
+	 * 
+	 * if args contains: --ack-mode=AUTO --url=tcp://localhost:61616 --persistent 
+	 * 
+	 * then it will try to call the following setters on the target object.
+	 * 
+	 * target.setAckMode("AUTO");
+	 * target.setURL(new URI("tcp://localhost:61616") );
+	 * target.setPersistent(true);
+	 * 
+	 * Notice the the proper conversion for the argument is determined by examining the 
+	 * setter arguement type.  
+	 * 
+	 * @param target the object that will have it's properties set
+	 * @param args the commline options
+	 * @return any arguments that are not valid options for the target
+	 */
+	static public String[] setOptions(Object target, String []args) {
+		ArrayList rc = new ArrayList();
+		
+		for (int i = 0; i < args.length; i++) {
+			if( args[i] == null )
+				continue;
+			
+			if( args[i].startsWith("--") ) {
+				
+				// --options without a specified value are considered boolean flags that are enabled.
+				String value="true";
+				String name = args[i].substring(2);
+				
+				// if --option=value case
+				int p = name.indexOf("=");
+				if( p > 0 ) {
+					value = name.substring(p+1);
+					name = name.substring(0,p);
+				}
+				
+				// name not set, then it's an unrecognized option
+				if( name.length()==0 ) {
+					rc.add(args[i]);
+					continue;
+				}
+				
+				String propName = convertOptionToPropertyName(name);
+				if( !IntrospectionSupport.setProperty(target, propName, value) ) {					
+					rc.add(args[i]);
+					continue;
+				}
+			}
+			
+		}
+		
+		String r[] = new String[rc.size()];
+		rc.toArray(r);
+		return r;
+	}
+
+	/**
+	 * converts strings like: test-enabled to testEnabled
+	 * @param name
+	 * @return
+	 */
+	private static String convertOptionToPropertyName(String name) {
+		String rc="";
+		
+		// Look for '-' and strip and then convert the subsequent char to uppercase
+		int p = name.indexOf("-");
+		while( p > 0 ) {
+			// strip
+			rc += name.substring(0, p);
+			name = name.substring(p+1);
+			
+			// can I convert the next char to upper?
+			if( name.length() >0 ) {
+				rc += name.substring(0,1).toUpperCase();
+				name = name.substring(1);
+			}
+			
+			p = name.indexOf("-");
+		}
+		return rc+name;
+	}
+}

Modified: incubator/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java?view=diff&rev=468913&r1=468912&r2=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java (original)
+++ incubator/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java Sun Oct 29 06:35:48 2006
@@ -18,6 +18,7 @@
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -28,192 +29,252 @@
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
 import java.io.IOException;
+import java.util.Arrays;
 
 /**
  * A simple tool for consuming messages
- *
+ * 
  * @version $Revision: 1.1.1.1 $
  */
-public class ConsumerTool extends ToolSupport implements MessageListener, ExceptionListener {
-
-    protected int count = 0;
-    protected int dumpCount = 10;
-    protected boolean verbose = true;
-    protected int maxiumMessages = 0;
-    private boolean pauseBeforeShutdown;
-    private boolean running;
-    private Session session;
+public class ConsumerTool implements MessageListener, ExceptionListener {
 
-    private long sleepTime=0;
-    private long receiveTimeOut=0;
+	private boolean running;
+	
+	private Session session;
+	private Destination destination;
 	private MessageProducer replyProducer;
-
-    public static void main(String[] args) {
-        ConsumerTool tool = new ConsumerTool();
-        if (args.length > 0) {
-            tool.url = args[0];
-        }
-        if (args.length > 1) {
-            tool.topic = args[1].equalsIgnoreCase("true");
-        }
-        if (args.length > 2) {
-            tool.subject = args[2];
-        }
-        if (args.length > 3) {
-            tool.durable = args[3].equalsIgnoreCase("true");
-        }
-        if (args.length > 4) {
-            tool.maxiumMessages = Integer.parseInt(args[4]);
-        }
-        if (args.length > 5) {
-            tool.clientID = args[5];
-        }
-        if (args.length > 6) {
-            tool.transacted = "true".equals(args[6]);
-        }
-        if (args.length > 7) {
-            tool.sleepTime = Long.parseLong(args[7]);
-        }
-        if (args.length > 8) {
-            tool.receiveTimeOut = Long.parseLong(args[8]);
-        }
-        
-		tool.run();
-    }
-
-    public void run() {
-        try {
-            running = true;
-            
-            System.out.println("Connecting to URL: " + url);
-            System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
-            System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
-
-            Connection connection = createConnection();
-            connection.setExceptionListener(this);
-            session = createSession(connection);
-            
-            replyProducer = session.createProducer(null);
-            replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-            
-            MessageConsumer consumer = null;
-            if (durable && topic) {
-                consumer = session.createDurableSubscriber((Topic) destination, consumerName);
-            }
-            else {
-                consumer = session.createConsumer(destination);
-            }
-            if ( maxiumMessages > 0 ) {
-                consumeMessagesAndClose(connection, session, consumer);
-            } else  {
-                if(receiveTimeOut==0) {
-                    consumer.setMessageListener(this);
-                } else {
-                    consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
-                }
-            }
-        }
-        catch (Exception e) {
-            System.out.println("Caught: " + e);
-            e.printStackTrace();
-        }
-    }
-
-    public void onMessage(Message message) {
-        try {
-            if (message instanceof TextMessage) {
-                TextMessage txtMsg = (TextMessage) message;
-                if (verbose) {
-
-                    String msg = txtMsg.getText();
-                    if (msg.length() > 50) {
-                        msg = msg.substring(0, 50) + "...";
-                    }
-
-                    System.out.println("Received: " + msg);
-                }
-            }
-            else {
-                if (verbose) {
-                    System.out.println("Received: " + message);
-                }
-            }
-            if(transacted) {
-                session.commit();
-            }
-            
-            if ( message.getJMSReplyTo() !=null ) {            	
-            	replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: "+message.getJMSMessageID()));
-                if(transacted) {
-                    session.commit();
-                }
-            }
-            
-            /*
-            if (++count % dumpCount == 0) {
-                dumpStats(connection);
-            }
-            */
-        }
-        catch (JMSException e) {
-            System.out.println("Caught: " + e);
-            e.printStackTrace();
-        } finally {
-            if( sleepTime> 0 ) {
-                try {
-                    Thread.sleep(sleepTime);
-                } catch (InterruptedException e) {
-                }
-            }
-        }
-    }
-
-    synchronized public void onException(JMSException ex) {
-        System.out.println("JMS Exception occured.  Shutting down client.");
-        running=false;
-    }
-
-    synchronized boolean isRunning() {
-        return running;
-    }
-    
-    protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
-        System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
-
-        for (int i = 0; i < maxiumMessages && isRunning(); ) {
-            Message message = consumer.receive(1000);
-            if( message!=null ) {
-                i++;
-                onMessage(message);
-            }
-        }
-        System.out.println("Closing connection");
-        consumer.close();
-        session.close();
-        connection.close();
-        if (pauseBeforeShutdown) {
-            System.out.println("Press return to shut down");
-            System.in.read();
-        }
-    }
-    
-    protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException {
-        System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown");
-
-        Message message;
-        while ( (message = consumer.receive(timeout)) != null ) {
-            onMessage(message);
-        }
-        
-        System.out.println("Closing connection");
-        consumer.close();
-        session.close();
-        connection.close();
-        if (pauseBeforeShutdown) {
-            System.out.println("Press return to shut down");
-            System.in.read();
-        }
-    }
+	
+	private boolean pauseBeforeShutdown;
+	private boolean verbose = true;
+	private int maxiumMessages = 0;
+	private String subject = "TOOL.DEFAULT";
+	private boolean topic = false;
+	private String user = ActiveMQConnection.DEFAULT_USER;
+	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
+	private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
+	private boolean transacted = false;
+	private boolean durable = false;
+	private String clientId;
+	private int ackMode = Session.AUTO_ACKNOWLEDGE;
+	private String consumerName = "James";
+	private long sleepTime = 0;
+	private long receiveTimeOut = 0;
+
+	public static void main(String[] args) {
+		ConsumerTool consumerTool = new ConsumerTool();
+		String[] unknonwn = CommnadLineSupport.setOptions(consumerTool, args);
+		if (unknonwn.length > 0) {
+			System.out.println("Unknown options: " + Arrays.toString(unknonwn));
+			System.exit(-1);
+		}
+		consumerTool.run();
+	}
+
+	public void run() {
+		try {
+			running = true;
+
+			System.out.println("Connecting to URL: " + url);
+			System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
+			System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription");
+
+			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
+			Connection connection = connectionFactory.createConnection();
+			if (durable && clientId != null && clientId.length()>0 && !"null".equals(clientId) ) {
+				connection.setClientID(clientId);
+			}
+			connection.setExceptionListener(this);
+			connection.start();
+
+			Session session = connection.createSession(transacted, ackMode);
+			if (topic) {
+				destination = session.createTopic(subject);
+			} else {
+				destination = session.createQueue(subject);
+			}
+
+			replyProducer = session.createProducer(null);
+			replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+			MessageConsumer consumer = null;
+			if (durable && topic) {
+				consumer = session.createDurableSubscriber((Topic) destination, consumerName);
+			} else {
+				consumer = session.createConsumer(destination);
+			}
+			
+			if (maxiumMessages > 0) {
+				consumeMessagesAndClose(connection, session, consumer);
+			} else {
+				if (receiveTimeOut == 0) {
+					consumer.setMessageListener(this);
+				} else {
+					consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
+				}
+			}
+			
+		} catch (Exception e) {
+			System.out.println("Caught: " + e);
+			e.printStackTrace();
+		}
+	}
+
+	public void onMessage(Message message) {
+		try {
+			
+			if (message instanceof TextMessage) {
+				TextMessage txtMsg = (TextMessage) message;
+				if (verbose) {
+
+					String msg = txtMsg.getText();
+					if (msg.length() > 50) {
+						msg = msg.substring(0, 50) + "...";
+					}
+
+					System.out.println("Received: " + msg);
+				}
+			} else {
+				if (verbose) {
+					System.out.println("Received: " + message);
+				}
+			}
+
+			if (message.getJMSReplyTo() != null) {
+				replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID()));
+			}
+			
+			if (transacted) {
+				session.commit();
+			} else if ( ackMode  == Session.CLIENT_ACKNOWLEDGE ) {
+				message.acknowledge();
+			}
+
+		} catch (JMSException e) {
+			System.out.println("Caught: " + e);
+			e.printStackTrace();
+		} finally {
+			if (sleepTime > 0) {
+				try {
+					Thread.sleep(sleepTime);
+				} catch (InterruptedException e) {
+				}
+			}
+		}
+	}
+
+	synchronized public void onException(JMSException ex) {
+		System.out.println("JMS Exception occured.  Shutting down client.");
+		running = false;
+	}
+
+	synchronized boolean isRunning() {
+		return running;
+	}
+
+	protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException,
+			IOException {
+		System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
+
+		for (int i = 0; i < maxiumMessages && isRunning();) {
+			Message message = consumer.receive(1000);
+			if (message != null) {
+				i++;
+				onMessage(message);
+			}
+		}
+		System.out.println("Closing connection");
+		consumer.close();
+		session.close();
+		connection.close();
+		if (pauseBeforeShutdown) {
+			System.out.println("Press return to shut down");
+			System.in.read();
+		}
+	}
+
+	protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout)
+			throws JMSException, IOException {
+		System.out.println("We will consume messages while they continue to be delivered within: " + timeout
+				+ " ms, and then we will shutdown");
+
+		Message message;
+		while ((message = consumer.receive(timeout)) != null) {
+			onMessage(message);
+		}
+
+		System.out.println("Closing connection");
+		consumer.close();
+		session.close();
+		connection.close();
+		if (pauseBeforeShutdown) {
+			System.out.println("Press return to shut down");
+			System.in.read();
+		}
+	}
+
+	public void setAckMode(String ackMode) {
+		if( "CLIENT_ACKNOWLEDGE".equals(ackMode) ) {
+			this.ackMode = Session.CLIENT_ACKNOWLEDGE;
+		}
+		if( "AUTO_ACKNOWLEDGE".equals(ackMode) ) {
+			this.ackMode = Session.AUTO_ACKNOWLEDGE;
+		}
+		if( "DUPS_OK_ACKNOWLEDGE".equals(ackMode) ) {
+			this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
+		}
+		if( "SESSION_TRANSACTED".equals(ackMode) ) {
+			this.ackMode = Session.SESSION_TRANSACTED;
+		}
+	}
+	
+	public void setClientId(String clientID) {
+		this.clientId = clientID;
+	}
+	public void setConsumerName(String consumerName) {
+		this.consumerName = consumerName;
+	}
+	public void setDurable(boolean durable) {
+		this.durable = durable;
+	}
+	public void setMaxiumMessages(int maxiumMessages) {
+		this.maxiumMessages = maxiumMessages;
+	}
+	public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) {
+		this.pauseBeforeShutdown = pauseBeforeShutdown;
+	}
+	public void setPassword(String pwd) {
+		this.password = pwd;
+	}
+	public void setReceiveTimeOut(long receiveTimeOut) {
+		this.receiveTimeOut = receiveTimeOut;
+	}
+	public void setSleepTime(long sleepTime) {
+		this.sleepTime = sleepTime;
+	}
+	public void setSubject(String subject) {
+		this.subject = subject;
+	}
+	public void setTopic(boolean topic) {
+		this.topic = topic;
+	}
+	public void setQueue(boolean queue) {
+		this.topic = !queue;
+	}
+	public void setTransacted(boolean transacted) {
+		this.transacted = transacted;
+	}
+	public void setUrl(String url) {
+		this.url = url;
+	}
+	public void setUser(String user) {
+		this.user = user;
+	}
+	public void setVerbose(boolean verbose) {
+		this.verbose = verbose;
+	}
 
 }

Modified: incubator/activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java?view=diff&rev=468913&r1=468912&r2=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java (original)
+++ incubator/activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java Sun Oct 29 06:35:48 2006
@@ -1,22 +1,21 @@
-import org.apache.activemq.broker.BrokerService;
-
 /**
- * 
- * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
- * 
- * Licensed under the Apache License, Version 2.0 (the "License"); 
- * you may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at 
- * 
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, 
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
- * See the License for the specific language governing permissions and 
- * limitations under the License. 
- * 
- **/
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.activemq.broker.BrokerService;
 
 /**
  * This example demonstrates how to run an embedded broker inside your Java code

Modified: incubator/activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java?view=diff&rev=468913&r1=468912&r2=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java (original)
+++ incubator/activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java Sun Oct 29 06:35:48 2006
@@ -16,15 +16,10 @@
  * limitations under the License.
  */
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
+import java.util.Arrays;
+import java.util.HashSet;
+
 import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import java.io.IOException;
 
 /**
  * A simple tool for producing and consuming messages
@@ -34,44 +29,24 @@
 public class ProducerAndConsumerTool extends ConsumerTool implements MessageListener {
 
     public static void main(String[] args) {
-        ProducerAndConsumerTool tool = new ProducerAndConsumerTool();
-        if (args.length > 0) {
-            tool.url = args[0];
-        }
-        else {
-            tool.url = "vm://localhost";
-        }
-        if (args.length > 1) {
-            tool.topic = args[1].equalsIgnoreCase("true");
-        }
-        if (args.length > 2) {
-            tool.subject = args[2];
-        }
-        if (args.length > 3) {
-            tool.durable = args[3].equalsIgnoreCase("true");
-        }
-        if (args.length > 4) {
-            tool.maxiumMessages = Integer.parseInt(args[4]);
-        }
-        if (args.length > 5) {
-            tool.clientID = args[5];
-        }
-        tool.run();
-    }
-
-    public void run() {
-        super.run();
-
-        // now lets publish some messages
-        ProducerTool tool = new ProducerTool();
-        tool.url = this.url;
-        tool.topic = this.topic;
-        tool.subject = this.subject;
-        tool.durable = this.durable;
-        tool.clientID = this.clientID;
-
-        tool.run();
+    	
+		ConsumerTool consumerTool = new ConsumerTool();
+		String[] unknonwn = CommnadLineSupport.setOptions(consumerTool, args);
+		HashSet set1 = new HashSet(Arrays.asList(unknonwn));
+    	
+		ProducerTool producerTool = new ProducerTool();
+    	unknonwn = CommnadLineSupport.setOptions(producerTool, args);
+		HashSet set2 = new HashSet(Arrays.asList(unknonwn));
+
+		set1.retainAll(set2);
+		if( set1.size() > 0 ) {
+    		System.out.println("Unknown options: "+set1);
+			System.exit(-1);
+    	}
+    	
+		consumerTool.run();
+    	producerTool.run();
+		
     }
-
 
 }

Modified: incubator/activemq/trunk/assembly/src/release/example/src/ProducerTool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/ProducerTool.java?view=diff&rev=468913&r1=468912&r2=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/example/src/ProducerTool.java (original)
+++ incubator/activemq/trunk/assembly/src/release/example/src/ProducerTool.java Sun Oct 29 06:35:48 2006
@@ -15,143 +15,182 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.util.Arrays;
+import java.util.Date;
+
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
+import javax.jms.Destination;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import java.util.Date;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.util.IndentPrinter;
 
 /**
  * A simple tool for publishing messages
- *
+ * 
  * @version $Revision: 1.2 $
  */
-public class ProducerTool extends ToolSupport {
+public class ProducerTool {
 
-    protected int messageCount = 10;
-    protected long sleepTime = 0L;
-    protected boolean verbose = true;
-    protected int messageSize = 255;
-    private long timeToLive;
-
-    public static void main(String[] args) {
-        runTool(args, new ProducerTool());
-    }
-
-    protected static void runTool(String[] args, ProducerTool tool) {
-        tool.clientID = null;
-        if (args.length > 0) {
-            tool.url = args[0];
-        }
-        if (args.length > 1) {
-            tool.topic = args[1].equalsIgnoreCase("true");
-        }
-        if (args.length > 2) {
-            tool.subject = args[2];
-        }
-        if (args.length > 3) {
-            tool.durable = args[3].equalsIgnoreCase("true");
-        }
-        if (args.length > 4) {
-            tool.messageCount = Integer.parseInt(args[4]);
-        }
-        if (args.length > 5) {
-            tool.messageSize = Integer.parseInt(args[5]);
-        }
-        if (args.length > 6) {
-            if( ! "null".equals(args[6]) ) { 
-                tool.clientID = args[6];
-            }
-        }
-        if (args.length > 7) {
-            tool.timeToLive = Long.parseLong(args[7]);
-        }
-        if (args.length > 8) {
-            tool.sleepTime = Long.parseLong(args[8]);
-        }
-        if (args.length > 9) {
-            tool.transacted = "true".equals(args[9]);
-        }
-        tool.run();
-    }
-
-    public void run() {
-        try {
-            System.out.println("Connecting to URL: " + url);
-            System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
-            System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing");
-            System.out.println("Sleeping between publish "+sleepTime+" ms");                
-            if( timeToLive!=0 ) {
-                System.out.println("Messages time to live "+timeToLive+" ms");                
-            }
-            Connection connection = createConnection();
-            Session session = createSession(connection);
-            MessageProducer producer = createProducer(session);
-            sendLoop(session, producer);
-
-            System.out.println("Done.");
-            close(connection, session);
-        }
-        catch (Exception e) {
-            System.out.println("Caught: " + e);
-            e.printStackTrace();
-        }
-    }
-
-    protected MessageProducer createProducer(Session session) throws JMSException {
-        MessageProducer producer = session.createProducer(destination);
-        if (durable) {
-            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-        }
-        else {
-            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        }
-        if( timeToLive!=0 )
-            producer.setTimeToLive(timeToLive);
-        return producer;
-    }
-
-    protected void sendLoop(Session session, MessageProducer producer) throws Exception {
-
-        for (int i = 0; i < messageCount || messageCount==0 ; i++) {
-
-
-            TextMessage message = session.createTextMessage(createMessageText(i));
-
-            if (verbose) {
-                String msg = message.getText();
-                if (msg.length() > 50) {
-                    msg = msg.substring(0, 50) + "...";
-                }
-                System.out.println("Sending message: " + msg);
-            }
-            
-            producer.send(message);
-            if(transacted) {
-                session.commit();
-            }
-            
-            Thread.sleep(sleepTime);
-            
-        }
-        
-    }
-
-    /**
-     * @param i
-     * @return
-     */
-    private String createMessageText(int index) {
-        StringBuffer buffer = new StringBuffer(messageSize);
-        buffer.append("Message: " + index + " sent at: " + new Date());
-        if (buffer.length() > messageSize) {
-            return buffer.substring(0, messageSize);
-        }
-        for (int i = buffer.length(); i < messageSize; i++) {
-            buffer.append(' ');
-        }
-        return buffer.toString();
-    }
+	private Destination destination;
+	private int messageCount = 10;
+	private long sleepTime = 0L;
+	private boolean verbose = true;
+	private int messageSize = 255;
+	private long timeToLive;
+	private String user = ActiveMQConnection.DEFAULT_USER;
+	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
+	private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
+	private String subject = "TOOL.DEFAULT";
+	private boolean topic = false;
+	private boolean transacted = false;
+	private boolean persistent = false;
+
+	public static void main(String[] args) {
+		ProducerTool producerTool = new ProducerTool();
+    	String[] unknonwn = CommnadLineSupport.setOptions(producerTool, args);
+    	if( unknonwn.length > 0 ) {
+    		System.out.println("Unknown options: "+Arrays.toString(unknonwn));
+			System.exit(-1);
+    	}    		
+    	producerTool.run();
+	}
+
+	public void run() {
+		Connection connection=null;
+		try {
+			System.out.println("Connecting to URL: " + url);
+			System.out.println("Publishing a Message with size " + messageSize+ " to " + (topic ? "topic" : "queue") + ": " + subject);
+			System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
+			System.out.println("Sleeping between publish " + sleepTime + " ms");
+			if (timeToLive != 0) {
+				System.out.println("Messages time to live " + timeToLive + " ms");
+			}
+			
+			// Create the connection.
+			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);			
+			connection = connectionFactory.createConnection();
+			connection.start();
+			
+			// Create the session
+			Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+			if (topic) {
+				destination = session.createTopic(subject);
+			} else {
+				destination = session.createQueue(subject);
+			}
+			
+			// Create the producer.
+			MessageProducer producer = session.createProducer(destination);
+			if (persistent) {
+				producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+			} else {
+				producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+			}			
+			if (timeToLive != 0)
+				producer.setTimeToLive(timeToLive);
+						
+			// Start sending messages
+			sendLoop(session, producer);
+
+			System.out.println("Done.");
+			
+			// Use the ActiveMQConnection interface to dump the connection stats.
+			ActiveMQConnection c = (ActiveMQConnection) connection;
+			c.getConnectionStats().dump(new IndentPrinter());
+						
+		} catch (Exception e) {
+			System.out.println("Caught: " + e);
+			e.printStackTrace();
+		} finally {
+			try { 
+				connection.close();
+			} catch (Throwable ignore) {
+			}
+		}
+	}
+
+	protected void sendLoop(Session session, MessageProducer producer)
+			throws Exception {
+
+		for (int i = 0; i < messageCount || messageCount == 0; i++) {
+
+			TextMessage message = session
+					.createTextMessage(createMessageText(i));
+
+			if (verbose) {
+				String msg = message.getText();
+				if (msg.length() > 50) {
+					msg = msg.substring(0, 50) + "...";
+				}
+				System.out.println("Sending message: " + msg);
+			}
+
+			producer.send(message);
+			if (transacted) {
+				session.commit();
+			}
+
+			Thread.sleep(sleepTime);
+
+		}
+
+	}
+
+	private String createMessageText(int index) {
+		StringBuffer buffer = new StringBuffer(messageSize);
+		buffer.append("Message: " + index + " sent at: " + new Date());
+		if (buffer.length() > messageSize) {
+			return buffer.substring(0, messageSize);
+		}
+		for (int i = buffer.length(); i < messageSize; i++) {
+			buffer.append(' ');
+		}
+		return buffer.toString();
+	}
+
+
+	public void setPersistent(boolean durable) {
+		this.persistent = durable;
+	}
+	public void setMessageCount(int messageCount) {
+		this.messageCount = messageCount;
+	}
+	public void setMessageSize(int messageSize) {
+		this.messageSize = messageSize;
+	}
+	public void setPassword(String pwd) {
+		this.password = pwd;
+	}
+	public void setSleepTime(long sleepTime) {
+		this.sleepTime = sleepTime;
+	}
+	public void setSubject(String subject) {
+		this.subject = subject;
+	}
+	public void setTimeToLive(long timeToLive) {
+		this.timeToLive = timeToLive;
+	}
+	public void setTopic(boolean topic) {
+		this.topic = topic;
+	}
+	public void setQueue(boolean queue) {
+		this.topic = !queue;
+	}	
+	public void setTransacted(boolean transacted) {
+		this.transacted = transacted;
+	}
+	public void setUrl(String url) {
+		this.url = url;
+	}
+	public void setUser(String user) {
+		this.user = user;
+	}
+	public void setVerbose(boolean verbose) {
+		this.verbose = verbose;
+	}
 }

Modified: incubator/activemq/trunk/assembly/src/release/example/src/RequesterTool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/RequesterTool.java?view=diff&rev=468913&r1=468912&r2=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/example/src/RequesterTool.java (original)
+++ incubator/activemq/trunk/assembly/src/release/example/src/RequesterTool.java Sun Oct 29 06:35:48 2006
@@ -15,169 +15,228 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.util.Arrays;
+import java.util.Date;
+
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
-import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import java.util.Date;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.util.IndentPrinter;
 
 /**
  * A simple tool for publishing messages
- *
+ * 
  * @version $Revision: 1.2 $
  */
-public class RequesterTool extends ToolSupport {
+public class RequesterTool {
 
-    protected int messageCount = 10;
-    protected long sleepTime = 0L;
-    protected boolean verbose = true;
-    protected int messageSize = 255;
-    private long timeToLive;
-
-    public static void main(String[] args) {
-        runTool(args, new RequesterTool());
-    }
-
-    protected static void runTool(String[] args, RequesterTool tool) {
-        tool.clientID = null;
-        if (args.length > 0) {
-            tool.url = args[0];
-        }
-        if (args.length > 1) {
-            tool.topic = args[1].equalsIgnoreCase("true");
-        }
-        if (args.length > 2) {
-            tool.subject = args[2];
-        }
-        if (args.length > 3) {
-            tool.durable = args[3].equalsIgnoreCase("true");
-        }
-        if (args.length > 4) {
-            tool.messageCount = Integer.parseInt(args[4]);
-        }
-        if (args.length > 5) {
-            tool.messageSize = Integer.parseInt(args[5]);
-        }
-        if (args.length > 6) {
-            if( ! "null".equals(args[6]) ) { 
-                tool.clientID = args[6];
-            }
-        }
-        if (args.length > 7) {
-            tool.timeToLive = Long.parseLong(args[7]);
-        }
-        if (args.length > 8) {
-            tool.sleepTime = Long.parseLong(args[8]);
-        }
-        if (args.length > 9) {
-            tool.transacted = "true".equals(args[9]);
-        }
-        tool.run();
-    }
-
-    public void run() {
-        try {
-            System.out.println("Connecting to URL: " + url);
-            System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
-            System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing");
-            System.out.println("Sleeping between publish "+sleepTime+" ms");                
-            if( timeToLive!=0 ) {
-                System.out.println("Messages time to live "+timeToLive+" ms");                
-            }
-            Connection connection = createConnection();
-            Session session = createSession(connection);
-            MessageProducer producer = createProducer(session);
-            
-            Destination replyDest = null;
-            if( this.topic ) {
-            	replyDest = session.createTemporaryTopic();
-            } else {
-            	replyDest = session.createTemporaryQueue();
-            }
-            
-            System.out.println("Reply Destination: "+replyDest);                
-            MessageConsumer consumer = session.createConsumer(replyDest);
-            
-            requestLoop(session, producer, consumer, replyDest);
-
-            System.out.println("Done.");
-            close(connection, session);
-        }
-        catch (Exception e) {
-            System.out.println("Caught: " + e);
-            e.printStackTrace();
-        }
-    }
-
-    protected MessageProducer createProducer(Session session) throws JMSException {
-        MessageProducer producer = session.createProducer(destination);
-        if (durable) {
-            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-        }
-        else {
-            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        }
-        if( timeToLive!=0 )
-            producer.setTimeToLive(timeToLive);
-        return producer;
-    }
-
-    protected void requestLoop(Session session, MessageProducer producer, MessageConsumer consumer, Destination replyDest) throws Exception {
-
-        for (int i = 0; i < messageCount || messageCount==0 ; i++) {
-
-
-            TextMessage message = session.createTextMessage(createMessageText(i));
-            message.setJMSReplyTo(replyDest);
-            
-            if (verbose) {
-                String msg = message.getText();
-                if (msg.length() > 50) {
-                    msg = msg.substring(0, 50) + "...";
-                }
-                System.out.println("Sending message: " + msg);
-            }
-            
-            producer.send(message);
-            if(transacted) {
-                session.commit();
-            }
-            
-            System.out.println("Waiting for reponse message...");
-            Message message2 = consumer.receive();
-            if( message2 instanceof TextMessage ) {
-            	System.out.println("Reponse message: "+((TextMessage)message2).getText());
-            } else {
-            	System.out.println("Reponse message: "+message2);
-            }
-            if(transacted) {
-                session.commit();
-            }
-            
-            Thread.sleep(sleepTime);
-            
-        }
-        
-    }
-
-    /**
-     * @param i
-     * @return
-     */
-    private String createMessageText(int index) {
-        StringBuffer buffer = new StringBuffer(messageSize);
-        buffer.append("Message: " + index + " sent at: " + new Date());
-        if (buffer.length() > messageSize) {
-            return buffer.substring(0, messageSize);
-        }
-        for (int i = buffer.length(); i < messageSize; i++) {
-            buffer.append(' ');
-        }
-        return buffer.toString();
-    }
+	private int messageCount = 10;
+	private long sleepTime = 0L;
+	private boolean verbose = true;
+	private int messageSize = 255;
+	private long timeToLive;
+	private String subject = "TOOL.DEFAULT";
+	private String replySubject;
+	private boolean topic = false;
+	private String user = ActiveMQConnection.DEFAULT_USER;
+	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
+	private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
+	private boolean transacted = false;
+	private boolean persistent = false;
+	private String clientId;
+
+	private Destination destination;
+	private Destination replyDest;
+	private MessageProducer producer;
+	private MessageConsumer consumer;
+	private Session session;
+
+	public static void main(String[] args) {
+		RequesterTool requesterTool = new RequesterTool();
+		String[] unknonwn = CommnadLineSupport.setOptions(requesterTool, args);
+		if (unknonwn.length > 0) {
+			System.out.println("Unknown options: " + Arrays.toString(unknonwn));
+			System.exit(-1);
+		}
+		requesterTool.run();
+	}
+
+	public void run() {
+		
+		Connection connection=null;
+		try {
+			
+			System.out.println("Connecting to URL: " + url);
+			System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
+			System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
+			System.out.println("Sleeping between publish " + sleepTime + " ms");
+			
+			// Create the connection
+			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
+			connection = connectionFactory.createConnection();
+			if (persistent && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
+				connection.setClientID(clientId);
+			}
+			connection.start();
+
+			// Create the Session
+			session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+			
+			// And the Destinations..
+			if (topic) {
+				destination = session.createTopic(subject);
+				if( replySubject==null )
+					replyDest = session.createTemporaryTopic();
+				else
+					replyDest = session.createTopic(replySubject);
+			} else {
+				destination = session.createQueue(subject);
+				if( replySubject==null )
+					replyDest = session.createTemporaryQueue();
+				else
+					replyDest = session.createQueue(replySubject);
+			}
+			System.out.println("Reply Destination: " + replyDest);
+
+			// Create the producer
+			producer = session.createProducer(destination);
+			if (persistent) {
+				producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+			} else {
+				producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+			}
+			if (timeToLive != 0) {
+				System.out.println("Messages time to live " + timeToLive + " ms");
+				producer.setTimeToLive(timeToLive);
+			}
+
+			// Create the reply consumer
+			consumer = session.createConsumer(replyDest);
+			
+			// Start sending reqests.
+			requestLoop();
+			
+			System.out.println("Done.");
+			
+			// Use the ActiveMQConnection interface to dump the connection stats.
+			ActiveMQConnection c = (ActiveMQConnection) connection;
+			c.getConnectionStats().dump(new IndentPrinter());
+						
+		} catch (Exception e) {
+			System.out.println("Caught: " + e);
+			e.printStackTrace();
+		} finally {
+			try { 
+				connection.close();
+			} catch (Throwable ignore) {
+			}
+		}
+	}
+
+	protected void requestLoop() throws Exception {
+
+		for (int i = 0; i < messageCount || messageCount == 0; i++) {
+
+			TextMessage message = session.createTextMessage(createMessageText(i));
+			message.setJMSReplyTo(replyDest);
+
+			if (verbose) {
+				String msg = message.getText();
+				if (msg.length() > 50) {
+					msg = msg.substring(0, 50) + "...";
+				}
+				System.out.println("Sending message: " + msg);
+			}
+
+			producer.send(message);
+			if (transacted) {
+				session.commit();
+			}
+
+			System.out.println("Waiting for reponse message...");
+			Message message2 = consumer.receive();
+			if (message2 instanceof TextMessage) {
+				System.out.println("Reponse message: " + ((TextMessage) message2).getText());
+			} else {
+				System.out.println("Reponse message: " + message2);
+			}
+			if (transacted) {
+				session.commit();
+			}
+
+			Thread.sleep(sleepTime);
+
+		}
+	}
+
+	/**
+	 * @param i
+	 * @return
+	 */
+	private String createMessageText(int index) {
+		StringBuffer buffer = new StringBuffer(messageSize);
+		buffer.append("Message: " + index + " sent at: " + new Date());
+		if (buffer.length() > messageSize) {
+			return buffer.substring(0, messageSize);
+		}
+		for (int i = buffer.length(); i < messageSize; i++) {
+			buffer.append(' ');
+		}
+		return buffer.toString();
+	}
+
+
+	public void setClientId(String clientId) {
+		this.clientId = clientId;
+	}
+	public void setPersistent(boolean durable) {
+		this.persistent = durable;
+	}
+	public void setMessageCount(int messageCount) {
+		this.messageCount = messageCount;
+	}
+	public void setMessageSize(int messageSize) {
+		this.messageSize = messageSize;
+	}
+	public void setPassword(String password) {
+		this.password = password;
+	}
+	public void setSleepTime(long sleepTime) {
+		this.sleepTime = sleepTime;
+	}
+	public void setSubject(String subject) {
+		this.subject = subject;
+	}
+	public void setTimeToLive(long timeToLive) {
+		this.timeToLive = timeToLive;
+	}
+	public void setTopic(boolean topic) {
+		this.topic = topic;
+	}
+	public void setQueue(boolean queue) {
+		this.topic = !queue;
+	}	
+	public void setTransacted(boolean transacted) {
+		this.transacted = transacted;
+	}
+	public void setUrl(String url) {
+		this.url = url;
+	}
+	public void setUser(String user) {
+		this.user = user;
+	}
+	public void setVerbose(boolean verbose) {
+		this.verbose = verbose;
+	}
+	public void setReplySubject(String replySubject) {
+		this.replySubject = replySubject;
+	}
 }

Added: incubator/activemq/trunk/assembly/src/release/example/src/TopicListener.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/TopicListener.java?view=auto&rev=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/example/src/TopicListener.java (added)
+++ incubator/activemq/trunk/assembly/src/release/example/src/TopicListener.java Sun Oct 29 06:35:48 2006
@@ -0,0 +1,123 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Arrays;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ * Use in conjunction with TopicPublisher to test the performance of ActiveMQ Topics.
+ */
+public class TopicListener implements MessageListener {
+	
+	private Connection connection;
+	private MessageProducer producer;
+	private Session session;	
+	private int count;
+	private long start;
+	private Topic topic;
+	private Topic control;
+	
+//	private String url="tcp://localhost:61616?jms.dispatchAsync=true&jms.useAsyncSend=true&jms.optimizeAcknowledge=true&jms.disableTimeStampsByDefault=true&jms.optimizedMessageDispatch=true&wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false";
+	private String url="tcp://localhost:61616";	
+
+	public static void main(String[] argv) throws Exception {
+		TopicListener l = new TopicListener();
+		String[] unknonwn = CommnadLineSupport.setOptions(l, argv);
+		if (unknonwn.length > 0) {
+			System.out.println("Unknown options: " + Arrays.toString(unknonwn));
+			System.exit(-1);
+		}
+    	l.run();
+	}
+	
+	public void run() throws JMSException {
+		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
+		connection = factory.createConnection();    	
+		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+		topic = session.createTopic("topictest.messages");
+		control = session.createTopic("topictest.control");
+
+		MessageConsumer consumer = session.createConsumer(topic);
+		consumer.setMessageListener(this);
+
+		connection.start();
+
+		producer = session.createProducer(control);
+		System.out.println("Waiting for messages...");		
+	}
+	
+    private static boolean checkText(Message m, String s)
+    {
+        try
+        {
+            return m instanceof TextMessage && ((TextMessage) m).getText().equals(s);
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace(System.out);
+            return false;
+        }
+    }
+
+
+	public void onMessage(Message message) {
+		if ( checkText(message, "SHUTDOWN") ) {
+			
+			try {
+				connection.close();
+			} catch (Exception e) {
+				e.printStackTrace(System.out);
+			}
+			
+		} else if (checkText(message, "REPORT")) {
+			// send a report:
+			try {
+				long time = (System.currentTimeMillis() - start);
+				String msg = "Received " + count + " in " + time + "ms";
+				producer.send(session.createTextMessage(msg));
+			} catch (Exception e) {
+				e.printStackTrace(System.out);
+			}
+			count = 0;
+			
+		} else {
+			
+			if (count==0) {
+				start = System.currentTimeMillis();
+			}
+			
+			if (++count % 1000 == 0) 
+				System.out.println("Received " + count + " messages.");
+		}
+	}
+
+	public void setUrl(String url) {
+		this.url = url;
+	}
+
+}

Added: incubator/activemq/trunk/assembly/src/release/example/src/TopicPublisher.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/TopicPublisher.java?view=auto&rev=468913
==============================================================================
--- incubator/activemq/trunk/assembly/src/release/example/src/TopicPublisher.java (added)
+++ incubator/activemq/trunk/assembly/src/release/example/src/TopicPublisher.java Sun Oct 29 06:35:48 2006
@@ -0,0 +1,214 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Arrays;
+
+import javax.jms.*;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ * Use in conjunction with TopicListener to test the performance of ActiveMQ Topics.
+ */
+public class TopicPublisher implements MessageListener
+{
+    private final Object mutex = new Object();
+    private Connection connection;
+    private Session session;
+    private MessageProducer publisher;
+	private Topic topic;
+	private Topic control;
+	
+//	private String url="tcp://localhost:61616?jms.dispatchAsync=true&jms.useAsyncSend=true&jms.optimizeAcknowledge=true&jms.disableTimeStampsByDefault=true&jms.optimizedMessageDispatch=true&wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false";
+	private String url="tcp://localhost:61616";
+	private int size=256;
+	private int subscribers=1;
+	private int remaining;
+	private int messages=10000;
+	private long delay;
+	private int batch=40;
+	
+	private byte[] payload;
+    private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+
+    public static void main(String[] argv) throws Exception
+    {
+    	TopicPublisher p = new TopicPublisher();
+    	String[] unknonwn = CommnadLineSupport.setOptions(p, argv);
+		if (unknonwn.length > 0) {
+			System.out.println("Unknown options: " + Arrays.toString(unknonwn));
+			System.exit(-1);
+		}
+    	p.run();
+    }
+
+    private void run() throws Exception
+    {
+		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
+		connection = factory.createConnection();    	
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+		topic = session.createTopic("topictest.messages");
+		control = session.createTopic("topictest.control");
+        
+        publisher = session.createProducer(topic);
+        publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        
+        payload = new byte[size];
+        for(int i = 0; i < size; i++)
+        {
+            payload[i] = (byte) DATA[i % DATA.length];
+        }
+
+        session.createConsumer(control).setMessageListener(this);
+        connection.start();
+
+        long[] times = new long[batch];
+        for(int i = 0; i < batch; i++)
+        {
+            if(i > 0) Thread.sleep(delay*1000);
+            times[i] = batch(messages);
+            System.out.println("Batch " + (i+1) + " of " + batch + " completed in " + times[i] + " ms.");
+        }
+
+        long min = min(times);
+        long max = max(times);
+        System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max));
+
+        //request shutdown
+        publisher.send(session.createTextMessage("SHUTDOWN"));
+
+        connection.stop();
+        connection.close();
+    }
+
+    private long batch(int msgCount) throws Exception
+    {
+        long start = System.currentTimeMillis();
+        remaining=subscribers;
+        publish();
+        waitForCompletion();
+        return System.currentTimeMillis() - start;
+    }
+
+    private void publish() throws Exception
+    {
+
+        //send events
+        BytesMessage msg = session.createBytesMessage();
+        msg.writeBytes(payload);
+        for (int i = 0; i < messages; i++)
+        {
+            publisher.send(msg);
+            if ((i + 1) % 1000 == 0)
+            {
+                System.out.println("Sent " + (i + 1) + " messages");
+            }
+        }
+
+        //request report
+        publisher.send(session.createTextMessage("REPORT"));
+    }
+
+    private void waitForCompletion() throws Exception
+    {
+        System.out.println("Waiting for completion...");
+        synchronized (mutex)
+        {
+            while (remaining > 0)
+            {
+                mutex.wait();
+            }
+        }
+    }
+
+
+    public void onMessage(Message message)
+    {
+        synchronized (mutex)
+        {
+            System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");
+            if (remaining == 0)
+            {
+            	mutex.notify();
+            }
+        }
+    }
+    
+    Object getReport(Message m)
+    {
+        try
+        {
+            return ((TextMessage) m).getText();
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace(System.out);
+            return e.toString();
+        }
+    }    
+
+    static long min(long[] times)
+    {
+        long min = times.length > 0 ? times[0] : 0;
+        for(int i = 0; i < times.length; i++)
+        {
+            min = Math.min(min, times[i]);
+        }
+        return min;
+    }
+
+    static long max(long[] times)
+    {
+        long max = times.length > 0 ? times[0] : 0;
+        for(int i = 0; i < times.length; i++)
+        {
+            max = Math.max(max, times[i]);
+        }
+        return max;
+    }
+
+    static long avg(long[] times, long min, long max)
+    {
+        long sum = 0;
+        for(int i = 0; i < times.length; i++)
+        {
+            sum += times[i];
+        }
+        sum -= min;
+        sum -= max;
+        return (sum / times.length - 2);
+    }
+
+	public void setBatch(int batch) {
+		this.batch = batch;
+	}
+	public void setDelay(long delay) {
+		this.delay = delay;
+	}
+	public void setMessages(int messages) {
+		this.messages = messages;
+	}
+	public void setSize(int size) {
+		this.size = size;
+	}
+	public void setSubscribers(int subscribers) {
+		this.subscribers = subscribers;
+	}
+	public void setUrl(String url) {
+		this.url = url;
+	}
+}



Mime
View raw message