Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 60464 invoked from network); 29 Oct 2006 14:36:14 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 29 Oct 2006 14:36:14 -0000 Received: (qmail 45731 invoked by uid 500); 29 Oct 2006 14:36:26 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 45706 invoked by uid 500); 29 Oct 2006 14:36:25 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 45696 invoked by uid 99); 29 Oct 2006 14:36:25 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 29 Oct 2006 06:36:25 -0800 X-ASF-Spam-Status: No, hits=0.6 required=10.0 tests=NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 29 Oct 2006 06:36:12 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 04CA01A9846; Sun, 29 Oct 2006 06:35:50 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r468913 - in /incubator/activemq/trunk/assembly/src/release: ./ example/ example/ruby/ example/src/ Date: Sun, 29 Oct 2006 14:35:49 -0000 To: activemq-commits@geronimo.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061029143550.04CA01A9846@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Sun Oct 29 06:35:48 2006 New Revision: 468913 URL: http://svn.apache.org/viewvc?view=rev&rev=468913 Log: Freshened up the shipped examples. - Inlined the ToolSupport base class to make it more obvious to JMS beginers how JMS is used (less code reuse but these examples are simple enough) - Added CommandLineSupport class that uses introspection to handle interpreting command line options - Added ruby and java versions of a topic listener and publisher tools that can be used to benchmark broker performance. Added: incubator/activemq/trunk/assembly/src/release/example/ruby/ incubator/activemq/trunk/assembly/src/release/example/ruby/README.txt incubator/activemq/trunk/assembly/src/release/example/ruby/listener.rb (with props) incubator/activemq/trunk/assembly/src/release/example/ruby/publisher.rb (with props) incubator/activemq/trunk/assembly/src/release/example/src/CommnadLineSupport.java incubator/activemq/trunk/assembly/src/release/example/src/TopicListener.java incubator/activemq/trunk/assembly/src/release/example/src/TopicPublisher.java Removed: incubator/activemq/trunk/assembly/src/release/example/src/ToolSupport.java Modified: incubator/activemq/trunk/assembly/src/release/README.txt incubator/activemq/trunk/assembly/src/release/example/build.xml incubator/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java incubator/activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java incubator/activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java incubator/activemq/trunk/assembly/src/release/example/src/ProducerTool.java incubator/activemq/trunk/assembly/src/release/example/src/RequesterTool.java Modified: incubator/activemq/trunk/assembly/src/release/README.txt URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/README.txt?view=diff&rev=468913&r1=468912&r2=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/README.txt (original) +++ incubator/activemq/trunk/assembly/src/release/README.txt Sun Oct 29 06:35:48 2006 @@ -1,5 +1,5 @@ Welcome to Apache ActiveMQ -=================== +========================== Apache ActiveMQ is a high performance Apache 2.0 licenced Message Broker and JMS 1.1 implementation. Modified: incubator/activemq/trunk/assembly/src/release/example/build.xml URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/build.xml?view=diff&rev=468913&r1=468912&r2=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/example/build.xml (original) +++ incubator/activemq/trunk/assembly/src/release/example/build.xml Sun Oct 29 06:35:48 2006 @@ -32,7 +32,13 @@ - + + + + + + + @@ -72,7 +78,7 @@ - + @@ -141,14 +147,17 @@ - - - - - - - - + + + + + + + + + + + @@ -159,16 +168,16 @@ - - - - - - - - - - + + + + + + + + + + @@ -179,16 +188,18 @@ - - - - - - - - - - + + + + + + + + + + + + @@ -201,5 +212,26 @@ - + + + + + + + + + + + + + + + + + + + + + + Added: incubator/activemq/trunk/assembly/src/release/example/ruby/README.txt URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/ruby/README.txt?view=auto&rev=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/example/ruby/README.txt (added) +++ incubator/activemq/trunk/assembly/src/release/example/ruby/README.txt Sun Oct 29 06:35:48 2006 @@ -0,0 +1,7 @@ + +Prereqs +======= + +- Install RubyGems see: http://docs.rubygems.org/ +- Install the stomp gem. Run: gem install stomp + Added: incubator/activemq/trunk/assembly/src/release/example/ruby/listener.rb URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/ruby/listener.rb?view=auto&rev=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/example/ruby/listener.rb (added) +++ incubator/activemq/trunk/assembly/src/release/example/ruby/listener.rb Sun Oct 29 06:35:48 2006 @@ -0,0 +1,50 @@ +#!/usr/bin/ruby +# ------------------------------------------------------------------------ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------------------ + +require 'rubygems' +require 'stomp' + +@conn = Stomp::Connection.open '', '', 'localhost', 61613, false +@count = 0 + +@conn.subscribe '/topic/event', { :ack =>"auto" } +while true + @msg = @conn.receive + if @msg.command == "MESSAGE" + if @msg.body == "SHUTDOWN" + exit 0 + + elsif @msg.body == "REPORT" + @diff = Time.now - @start + @body = "Received #{@count} in #{@diff} seconds"; + @conn.send '/queue/response', @body, {'persistent'=>'false'} + @count = 0; + else + if @count == 0 + @start = Time.now + end + + @count += 1; + if @count % 1000 == 0 + $stdout.print "Received #{@count} messages.\n" + end + end + else + $stdout.print "#{@msg.command}: #{@msg.body}\n" + end +end Propchange: incubator/activemq/trunk/assembly/src/release/example/ruby/listener.rb ------------------------------------------------------------------------------ svn:executable = * Added: incubator/activemq/trunk/assembly/src/release/example/ruby/publisher.rb URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/ruby/publisher.rb?view=auto&rev=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/example/ruby/publisher.rb (added) +++ incubator/activemq/trunk/assembly/src/release/example/ruby/publisher.rb Sun Oct 29 06:35:48 2006 @@ -0,0 +1,65 @@ +#!/usr/bin/ruby +# ------------------------------------------------------------------------ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------------------ + +require 'rubygems' +require 'stomp' + +@conn = Stomp::Connection.open '', '', 'localhost', 61613, false +@messages = 10000 +@batches = 40 +@subscribers = 10 +@size = 256 + +@DATA = "abcdefghijklmnopqrstuvwxyz"; +@body = ""; +for i in 0..(@size-1) + @body += @DATA[ i % @DATA.length,1] +end + +@times = [] +@conn.subscribe '/queue/response', { :ack =>"auto" } + +for i in 1..(@batches) + @body += @DATA[ i % @DATA.length,1] + sleep 1 if i == 1 + + @start = Time.now + + for j in 1..@messages + @conn.send '/topic/event', @body, {'persistent'=>'false'} + $stdout.print "Sent #{j} messages\n" if j%1000==0 + end + @conn.send '/topic/event', "REPORT", {'persistent'=>'false'} + + @remaining = @subscribers + while @remaining > 0 + @msg = @conn.receive + if @msg.command == "MESSAGE" + @remaining -= 1 + $stdout.print "Received report: #{@msg.body}, remaining: #{@remaining}\n" + else + $stdout.print "#{@msg.command}: #{@msg.body}\n" + end + end + @diff = Time.now-@start + + $stdout.print "Batch #{i} of #{@batches} completed in #{@diff} seconds.\n" + @times[i] = @diff +end + +@conn.send '/topic/event', "SHUTDOWN", {'persistent'=>'false'} Propchange: incubator/activemq/trunk/assembly/src/release/example/ruby/publisher.rb ------------------------------------------------------------------------------ svn:executable = * Added: incubator/activemq/trunk/assembly/src/release/example/src/CommnadLineSupport.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/CommnadLineSupport.java?view=auto&rev=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/example/src/CommnadLineSupport.java (added) +++ incubator/activemq/trunk/assembly/src/release/example/src/CommnadLineSupport.java Sun Oct 29 06:35:48 2006 @@ -0,0 +1,114 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; + +import org.apache.activemq.util.IntrospectionSupport; + +/** + * Helper utility that can be used to set the properties on any object + * using command line arguments. + * + * @author Hiram Chirino + */ +public class CommnadLineSupport { + + /** + * Sets the properties of an object given the command line args. + * + * if args contains: --ack-mode=AUTO --url=tcp://localhost:61616 --persistent + * + * then it will try to call the following setters on the target object. + * + * target.setAckMode("AUTO"); + * target.setURL(new URI("tcp://localhost:61616") ); + * target.setPersistent(true); + * + * Notice the the proper conversion for the argument is determined by examining the + * setter arguement type. + * + * @param target the object that will have it's properties set + * @param args the commline options + * @return any arguments that are not valid options for the target + */ + static public String[] setOptions(Object target, String []args) { + ArrayList rc = new ArrayList(); + + for (int i = 0; i < args.length; i++) { + if( args[i] == null ) + continue; + + if( args[i].startsWith("--") ) { + + // --options without a specified value are considered boolean flags that are enabled. + String value="true"; + String name = args[i].substring(2); + + // if --option=value case + int p = name.indexOf("="); + if( p > 0 ) { + value = name.substring(p+1); + name = name.substring(0,p); + } + + // name not set, then it's an unrecognized option + if( name.length()==0 ) { + rc.add(args[i]); + continue; + } + + String propName = convertOptionToPropertyName(name); + if( !IntrospectionSupport.setProperty(target, propName, value) ) { + rc.add(args[i]); + continue; + } + } + + } + + String r[] = new String[rc.size()]; + rc.toArray(r); + return r; + } + + /** + * converts strings like: test-enabled to testEnabled + * @param name + * @return + */ + private static String convertOptionToPropertyName(String name) { + String rc=""; + + // Look for '-' and strip and then convert the subsequent char to uppercase + int p = name.indexOf("-"); + while( p > 0 ) { + // strip + rc += name.substring(0, p); + name = name.substring(p+1); + + // can I convert the next char to upper? + if( name.length() >0 ) { + rc += name.substring(0,1).toUpperCase(); + name = name.substring(1); + } + + p = name.indexOf("-"); + } + return rc+name; + } +} Modified: incubator/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java?view=diff&rev=468913&r1=468912&r2=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java (original) +++ incubator/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java Sun Oct 29 06:35:48 2006 @@ -18,6 +18,7 @@ import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; @@ -28,192 +29,252 @@ import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; + import java.io.IOException; +import java.util.Arrays; /** * A simple tool for consuming messages - * + * * @version $Revision: 1.1.1.1 $ */ -public class ConsumerTool extends ToolSupport implements MessageListener, ExceptionListener { - - protected int count = 0; - protected int dumpCount = 10; - protected boolean verbose = true; - protected int maxiumMessages = 0; - private boolean pauseBeforeShutdown; - private boolean running; - private Session session; +public class ConsumerTool implements MessageListener, ExceptionListener { - private long sleepTime=0; - private long receiveTimeOut=0; + private boolean running; + + private Session session; + private Destination destination; private MessageProducer replyProducer; - - public static void main(String[] args) { - ConsumerTool tool = new ConsumerTool(); - if (args.length > 0) { - tool.url = args[0]; - } - if (args.length > 1) { - tool.topic = args[1].equalsIgnoreCase("true"); - } - if (args.length > 2) { - tool.subject = args[2]; - } - if (args.length > 3) { - tool.durable = args[3].equalsIgnoreCase("true"); - } - if (args.length > 4) { - tool.maxiumMessages = Integer.parseInt(args[4]); - } - if (args.length > 5) { - tool.clientID = args[5]; - } - if (args.length > 6) { - tool.transacted = "true".equals(args[6]); - } - if (args.length > 7) { - tool.sleepTime = Long.parseLong(args[7]); - } - if (args.length > 8) { - tool.receiveTimeOut = Long.parseLong(args[8]); - } - - tool.run(); - } - - public void run() { - try { - running = true; - - System.out.println("Connecting to URL: " + url); - System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject); - System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription"); - - Connection connection = createConnection(); - connection.setExceptionListener(this); - session = createSession(connection); - - replyProducer = session.createProducer(null); - replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - MessageConsumer consumer = null; - if (durable && topic) { - consumer = session.createDurableSubscriber((Topic) destination, consumerName); - } - else { - consumer = session.createConsumer(destination); - } - if ( maxiumMessages > 0 ) { - consumeMessagesAndClose(connection, session, consumer); - } else { - if(receiveTimeOut==0) { - consumer.setMessageListener(this); - } else { - consumeMessagesAndClose(connection, session, consumer, receiveTimeOut); - } - } - } - catch (Exception e) { - System.out.println("Caught: " + e); - e.printStackTrace(); - } - } - - public void onMessage(Message message) { - try { - if (message instanceof TextMessage) { - TextMessage txtMsg = (TextMessage) message; - if (verbose) { - - String msg = txtMsg.getText(); - if (msg.length() > 50) { - msg = msg.substring(0, 50) + "..."; - } - - System.out.println("Received: " + msg); - } - } - else { - if (verbose) { - System.out.println("Received: " + message); - } - } - if(transacted) { - session.commit(); - } - - if ( message.getJMSReplyTo() !=null ) { - replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: "+message.getJMSMessageID())); - if(transacted) { - session.commit(); - } - } - - /* - if (++count % dumpCount == 0) { - dumpStats(connection); - } - */ - } - catch (JMSException e) { - System.out.println("Caught: " + e); - e.printStackTrace(); - } finally { - if( sleepTime> 0 ) { - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - } - } - } - } - - synchronized public void onException(JMSException ex) { - System.out.println("JMS Exception occured. Shutting down client."); - running=false; - } - - synchronized boolean isRunning() { - return running; - } - - protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException { - System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown"); - - for (int i = 0; i < maxiumMessages && isRunning(); ) { - Message message = consumer.receive(1000); - if( message!=null ) { - i++; - onMessage(message); - } - } - System.out.println("Closing connection"); - consumer.close(); - session.close(); - connection.close(); - if (pauseBeforeShutdown) { - System.out.println("Press return to shut down"); - System.in.read(); - } - } - - protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException { - System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown"); - - Message message; - while ( (message = consumer.receive(timeout)) != null ) { - onMessage(message); - } - - System.out.println("Closing connection"); - consumer.close(); - session.close(); - connection.close(); - if (pauseBeforeShutdown) { - System.out.println("Press return to shut down"); - System.in.read(); - } - } + + private boolean pauseBeforeShutdown; + private boolean verbose = true; + private int maxiumMessages = 0; + private String subject = "TOOL.DEFAULT"; + private boolean topic = false; + private String user = ActiveMQConnection.DEFAULT_USER; + private String password = ActiveMQConnection.DEFAULT_PASSWORD; + private String url = ActiveMQConnection.DEFAULT_BROKER_URL; + private boolean transacted = false; + private boolean durable = false; + private String clientId; + private int ackMode = Session.AUTO_ACKNOWLEDGE; + private String consumerName = "James"; + private long sleepTime = 0; + private long receiveTimeOut = 0; + + public static void main(String[] args) { + ConsumerTool consumerTool = new ConsumerTool(); + String[] unknonwn = CommnadLineSupport.setOptions(consumerTool, args); + if (unknonwn.length > 0) { + System.out.println("Unknown options: " + Arrays.toString(unknonwn)); + System.exit(-1); + } + consumerTool.run(); + } + + public void run() { + try { + running = true; + + System.out.println("Connecting to URL: " + url); + System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject); + System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription"); + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); + Connection connection = connectionFactory.createConnection(); + if (durable && clientId != null && clientId.length()>0 && !"null".equals(clientId) ) { + connection.setClientID(clientId); + } + connection.setExceptionListener(this); + connection.start(); + + Session session = connection.createSession(transacted, ackMode); + if (topic) { + destination = session.createTopic(subject); + } else { + destination = session.createQueue(subject); + } + + replyProducer = session.createProducer(null); + replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + MessageConsumer consumer = null; + if (durable && topic) { + consumer = session.createDurableSubscriber((Topic) destination, consumerName); + } else { + consumer = session.createConsumer(destination); + } + + if (maxiumMessages > 0) { + consumeMessagesAndClose(connection, session, consumer); + } else { + if (receiveTimeOut == 0) { + consumer.setMessageListener(this); + } else { + consumeMessagesAndClose(connection, session, consumer, receiveTimeOut); + } + } + + } catch (Exception e) { + System.out.println("Caught: " + e); + e.printStackTrace(); + } + } + + public void onMessage(Message message) { + try { + + if (message instanceof TextMessage) { + TextMessage txtMsg = (TextMessage) message; + if (verbose) { + + String msg = txtMsg.getText(); + if (msg.length() > 50) { + msg = msg.substring(0, 50) + "..."; + } + + System.out.println("Received: " + msg); + } + } else { + if (verbose) { + System.out.println("Received: " + message); + } + } + + if (message.getJMSReplyTo() != null) { + replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID())); + } + + if (transacted) { + session.commit(); + } else if ( ackMode == Session.CLIENT_ACKNOWLEDGE ) { + message.acknowledge(); + } + + } catch (JMSException e) { + System.out.println("Caught: " + e); + e.printStackTrace(); + } finally { + if (sleepTime > 0) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + } + } + } + + synchronized public void onException(JMSException ex) { + System.out.println("JMS Exception occured. Shutting down client."); + running = false; + } + + synchronized boolean isRunning() { + return running; + } + + protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, + IOException { + System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown"); + + for (int i = 0; i < maxiumMessages && isRunning();) { + Message message = consumer.receive(1000); + if (message != null) { + i++; + onMessage(message); + } + } + System.out.println("Closing connection"); + consumer.close(); + session.close(); + connection.close(); + if (pauseBeforeShutdown) { + System.out.println("Press return to shut down"); + System.in.read(); + } + } + + protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) + throws JMSException, IOException { + System.out.println("We will consume messages while they continue to be delivered within: " + timeout + + " ms, and then we will shutdown"); + + Message message; + while ((message = consumer.receive(timeout)) != null) { + onMessage(message); + } + + System.out.println("Closing connection"); + consumer.close(); + session.close(); + connection.close(); + if (pauseBeforeShutdown) { + System.out.println("Press return to shut down"); + System.in.read(); + } + } + + public void setAckMode(String ackMode) { + if( "CLIENT_ACKNOWLEDGE".equals(ackMode) ) { + this.ackMode = Session.CLIENT_ACKNOWLEDGE; + } + if( "AUTO_ACKNOWLEDGE".equals(ackMode) ) { + this.ackMode = Session.AUTO_ACKNOWLEDGE; + } + if( "DUPS_OK_ACKNOWLEDGE".equals(ackMode) ) { + this.ackMode = Session.DUPS_OK_ACKNOWLEDGE; + } + if( "SESSION_TRANSACTED".equals(ackMode) ) { + this.ackMode = Session.SESSION_TRANSACTED; + } + } + + public void setClientId(String clientID) { + this.clientId = clientID; + } + public void setConsumerName(String consumerName) { + this.consumerName = consumerName; + } + public void setDurable(boolean durable) { + this.durable = durable; + } + public void setMaxiumMessages(int maxiumMessages) { + this.maxiumMessages = maxiumMessages; + } + public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) { + this.pauseBeforeShutdown = pauseBeforeShutdown; + } + public void setPassword(String pwd) { + this.password = pwd; + } + public void setReceiveTimeOut(long receiveTimeOut) { + this.receiveTimeOut = receiveTimeOut; + } + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + public void setSubject(String subject) { + this.subject = subject; + } + public void setTopic(boolean topic) { + this.topic = topic; + } + public void setQueue(boolean queue) { + this.topic = !queue; + } + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + public void setUrl(String url) { + this.url = url; + } + public void setUser(String user) { + this.user = user; + } + public void setVerbose(boolean verbose) { + this.verbose = verbose; + } } Modified: incubator/activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java?view=diff&rev=468913&r1=468912&r2=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java (original) +++ incubator/activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java Sun Oct 29 06:35:48 2006 @@ -1,22 +1,21 @@ -import org.apache.activemq.broker.BrokerService; - /** - * - * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - **/ + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.activemq.broker.BrokerService; /** * This example demonstrates how to run an embedded broker inside your Java code Modified: incubator/activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java?view=diff&rev=468913&r1=468912&r2=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java (original) +++ incubator/activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java Sun Oct 29 06:35:48 2006 @@ -16,15 +16,10 @@ * limitations under the License. */ -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; +import java.util.Arrays; +import java.util.HashSet; + import javax.jms.MessageListener; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import java.io.IOException; /** * A simple tool for producing and consuming messages @@ -34,44 +29,24 @@ public class ProducerAndConsumerTool extends ConsumerTool implements MessageListener { public static void main(String[] args) { - ProducerAndConsumerTool tool = new ProducerAndConsumerTool(); - if (args.length > 0) { - tool.url = args[0]; - } - else { - tool.url = "vm://localhost"; - } - if (args.length > 1) { - tool.topic = args[1].equalsIgnoreCase("true"); - } - if (args.length > 2) { - tool.subject = args[2]; - } - if (args.length > 3) { - tool.durable = args[3].equalsIgnoreCase("true"); - } - if (args.length > 4) { - tool.maxiumMessages = Integer.parseInt(args[4]); - } - if (args.length > 5) { - tool.clientID = args[5]; - } - tool.run(); - } - - public void run() { - super.run(); - - // now lets publish some messages - ProducerTool tool = new ProducerTool(); - tool.url = this.url; - tool.topic = this.topic; - tool.subject = this.subject; - tool.durable = this.durable; - tool.clientID = this.clientID; - - tool.run(); + + ConsumerTool consumerTool = new ConsumerTool(); + String[] unknonwn = CommnadLineSupport.setOptions(consumerTool, args); + HashSet set1 = new HashSet(Arrays.asList(unknonwn)); + + ProducerTool producerTool = new ProducerTool(); + unknonwn = CommnadLineSupport.setOptions(producerTool, args); + HashSet set2 = new HashSet(Arrays.asList(unknonwn)); + + set1.retainAll(set2); + if( set1.size() > 0 ) { + System.out.println("Unknown options: "+set1); + System.exit(-1); + } + + consumerTool.run(); + producerTool.run(); + } - } Modified: incubator/activemq/trunk/assembly/src/release/example/src/ProducerTool.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/ProducerTool.java?view=diff&rev=468913&r1=468912&r2=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/example/src/ProducerTool.java (original) +++ incubator/activemq/trunk/assembly/src/release/example/src/ProducerTool.java Sun Oct 29 06:35:48 2006 @@ -15,143 +15,182 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import java.util.Arrays; +import java.util.Date; + import javax.jms.Connection; import javax.jms.DeliveryMode; -import javax.jms.JMSException; +import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import java.util.Date; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.util.IndentPrinter; /** * A simple tool for publishing messages - * + * * @version $Revision: 1.2 $ */ -public class ProducerTool extends ToolSupport { +public class ProducerTool { - protected int messageCount = 10; - protected long sleepTime = 0L; - protected boolean verbose = true; - protected int messageSize = 255; - private long timeToLive; - - public static void main(String[] args) { - runTool(args, new ProducerTool()); - } - - protected static void runTool(String[] args, ProducerTool tool) { - tool.clientID = null; - if (args.length > 0) { - tool.url = args[0]; - } - if (args.length > 1) { - tool.topic = args[1].equalsIgnoreCase("true"); - } - if (args.length > 2) { - tool.subject = args[2]; - } - if (args.length > 3) { - tool.durable = args[3].equalsIgnoreCase("true"); - } - if (args.length > 4) { - tool.messageCount = Integer.parseInt(args[4]); - } - if (args.length > 5) { - tool.messageSize = Integer.parseInt(args[5]); - } - if (args.length > 6) { - if( ! "null".equals(args[6]) ) { - tool.clientID = args[6]; - } - } - if (args.length > 7) { - tool.timeToLive = Long.parseLong(args[7]); - } - if (args.length > 8) { - tool.sleepTime = Long.parseLong(args[8]); - } - if (args.length > 9) { - tool.transacted = "true".equals(args[9]); - } - tool.run(); - } - - public void run() { - try { - System.out.println("Connecting to URL: " + url); - System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject); - System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing"); - System.out.println("Sleeping between publish "+sleepTime+" ms"); - if( timeToLive!=0 ) { - System.out.println("Messages time to live "+timeToLive+" ms"); - } - Connection connection = createConnection(); - Session session = createSession(connection); - MessageProducer producer = createProducer(session); - sendLoop(session, producer); - - System.out.println("Done."); - close(connection, session); - } - catch (Exception e) { - System.out.println("Caught: " + e); - e.printStackTrace(); - } - } - - protected MessageProducer createProducer(Session session) throws JMSException { - MessageProducer producer = session.createProducer(destination); - if (durable) { - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - } - else { - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - } - if( timeToLive!=0 ) - producer.setTimeToLive(timeToLive); - return producer; - } - - protected void sendLoop(Session session, MessageProducer producer) throws Exception { - - for (int i = 0; i < messageCount || messageCount==0 ; i++) { - - - TextMessage message = session.createTextMessage(createMessageText(i)); - - if (verbose) { - String msg = message.getText(); - if (msg.length() > 50) { - msg = msg.substring(0, 50) + "..."; - } - System.out.println("Sending message: " + msg); - } - - producer.send(message); - if(transacted) { - session.commit(); - } - - Thread.sleep(sleepTime); - - } - - } - - /** - * @param i - * @return - */ - private String createMessageText(int index) { - StringBuffer buffer = new StringBuffer(messageSize); - buffer.append("Message: " + index + " sent at: " + new Date()); - if (buffer.length() > messageSize) { - return buffer.substring(0, messageSize); - } - for (int i = buffer.length(); i < messageSize; i++) { - buffer.append(' '); - } - return buffer.toString(); - } + private Destination destination; + private int messageCount = 10; + private long sleepTime = 0L; + private boolean verbose = true; + private int messageSize = 255; + private long timeToLive; + private String user = ActiveMQConnection.DEFAULT_USER; + private String password = ActiveMQConnection.DEFAULT_PASSWORD; + private String url = ActiveMQConnection.DEFAULT_BROKER_URL; + private String subject = "TOOL.DEFAULT"; + private boolean topic = false; + private boolean transacted = false; + private boolean persistent = false; + + public static void main(String[] args) { + ProducerTool producerTool = new ProducerTool(); + String[] unknonwn = CommnadLineSupport.setOptions(producerTool, args); + if( unknonwn.length > 0 ) { + System.out.println("Unknown options: "+Arrays.toString(unknonwn)); + System.exit(-1); + } + producerTool.run(); + } + + public void run() { + Connection connection=null; + try { + System.out.println("Connecting to URL: " + url); + System.out.println("Publishing a Message with size " + messageSize+ " to " + (topic ? "topic" : "queue") + ": " + subject); + System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages"); + System.out.println("Sleeping between publish " + sleepTime + " ms"); + if (timeToLive != 0) { + System.out.println("Messages time to live " + timeToLive + " ms"); + } + + // Create the connection. + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); + connection = connectionFactory.createConnection(); + connection.start(); + + // Create the session + Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + if (topic) { + destination = session.createTopic(subject); + } else { + destination = session.createQueue(subject); + } + + // Create the producer. + MessageProducer producer = session.createProducer(destination); + if (persistent) { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } else { + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + } + if (timeToLive != 0) + producer.setTimeToLive(timeToLive); + + // Start sending messages + sendLoop(session, producer); + + System.out.println("Done."); + + // Use the ActiveMQConnection interface to dump the connection stats. + ActiveMQConnection c = (ActiveMQConnection) connection; + c.getConnectionStats().dump(new IndentPrinter()); + + } catch (Exception e) { + System.out.println("Caught: " + e); + e.printStackTrace(); + } finally { + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected void sendLoop(Session session, MessageProducer producer) + throws Exception { + + for (int i = 0; i < messageCount || messageCount == 0; i++) { + + TextMessage message = session + .createTextMessage(createMessageText(i)); + + if (verbose) { + String msg = message.getText(); + if (msg.length() > 50) { + msg = msg.substring(0, 50) + "..."; + } + System.out.println("Sending message: " + msg); + } + + producer.send(message); + if (transacted) { + session.commit(); + } + + Thread.sleep(sleepTime); + + } + + } + + private String createMessageText(int index) { + StringBuffer buffer = new StringBuffer(messageSize); + buffer.append("Message: " + index + " sent at: " + new Date()); + if (buffer.length() > messageSize) { + return buffer.substring(0, messageSize); + } + for (int i = buffer.length(); i < messageSize; i++) { + buffer.append(' '); + } + return buffer.toString(); + } + + + public void setPersistent(boolean durable) { + this.persistent = durable; + } + public void setMessageCount(int messageCount) { + this.messageCount = messageCount; + } + public void setMessageSize(int messageSize) { + this.messageSize = messageSize; + } + public void setPassword(String pwd) { + this.password = pwd; + } + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + public void setSubject(String subject) { + this.subject = subject; + } + public void setTimeToLive(long timeToLive) { + this.timeToLive = timeToLive; + } + public void setTopic(boolean topic) { + this.topic = topic; + } + public void setQueue(boolean queue) { + this.topic = !queue; + } + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + public void setUrl(String url) { + this.url = url; + } + public void setUser(String user) { + this.user = user; + } + public void setVerbose(boolean verbose) { + this.verbose = verbose; + } } Modified: incubator/activemq/trunk/assembly/src/release/example/src/RequesterTool.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/RequesterTool.java?view=diff&rev=468913&r1=468912&r2=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/example/src/RequesterTool.java (original) +++ incubator/activemq/trunk/assembly/src/release/example/src/RequesterTool.java Sun Oct 29 06:35:48 2006 @@ -15,169 +15,228 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import java.util.Arrays; +import java.util.Date; + import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import java.util.Date; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.util.IndentPrinter; /** * A simple tool for publishing messages - * + * * @version $Revision: 1.2 $ */ -public class RequesterTool extends ToolSupport { +public class RequesterTool { - protected int messageCount = 10; - protected long sleepTime = 0L; - protected boolean verbose = true; - protected int messageSize = 255; - private long timeToLive; - - public static void main(String[] args) { - runTool(args, new RequesterTool()); - } - - protected static void runTool(String[] args, RequesterTool tool) { - tool.clientID = null; - if (args.length > 0) { - tool.url = args[0]; - } - if (args.length > 1) { - tool.topic = args[1].equalsIgnoreCase("true"); - } - if (args.length > 2) { - tool.subject = args[2]; - } - if (args.length > 3) { - tool.durable = args[3].equalsIgnoreCase("true"); - } - if (args.length > 4) { - tool.messageCount = Integer.parseInt(args[4]); - } - if (args.length > 5) { - tool.messageSize = Integer.parseInt(args[5]); - } - if (args.length > 6) { - if( ! "null".equals(args[6]) ) { - tool.clientID = args[6]; - } - } - if (args.length > 7) { - tool.timeToLive = Long.parseLong(args[7]); - } - if (args.length > 8) { - tool.sleepTime = Long.parseLong(args[8]); - } - if (args.length > 9) { - tool.transacted = "true".equals(args[9]); - } - tool.run(); - } - - public void run() { - try { - System.out.println("Connecting to URL: " + url); - System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject); - System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing"); - System.out.println("Sleeping between publish "+sleepTime+" ms"); - if( timeToLive!=0 ) { - System.out.println("Messages time to live "+timeToLive+" ms"); - } - Connection connection = createConnection(); - Session session = createSession(connection); - MessageProducer producer = createProducer(session); - - Destination replyDest = null; - if( this.topic ) { - replyDest = session.createTemporaryTopic(); - } else { - replyDest = session.createTemporaryQueue(); - } - - System.out.println("Reply Destination: "+replyDest); - MessageConsumer consumer = session.createConsumer(replyDest); - - requestLoop(session, producer, consumer, replyDest); - - System.out.println("Done."); - close(connection, session); - } - catch (Exception e) { - System.out.println("Caught: " + e); - e.printStackTrace(); - } - } - - protected MessageProducer createProducer(Session session) throws JMSException { - MessageProducer producer = session.createProducer(destination); - if (durable) { - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - } - else { - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - } - if( timeToLive!=0 ) - producer.setTimeToLive(timeToLive); - return producer; - } - - protected void requestLoop(Session session, MessageProducer producer, MessageConsumer consumer, Destination replyDest) throws Exception { - - for (int i = 0; i < messageCount || messageCount==0 ; i++) { - - - TextMessage message = session.createTextMessage(createMessageText(i)); - message.setJMSReplyTo(replyDest); - - if (verbose) { - String msg = message.getText(); - if (msg.length() > 50) { - msg = msg.substring(0, 50) + "..."; - } - System.out.println("Sending message: " + msg); - } - - producer.send(message); - if(transacted) { - session.commit(); - } - - System.out.println("Waiting for reponse message..."); - Message message2 = consumer.receive(); - if( message2 instanceof TextMessage ) { - System.out.println("Reponse message: "+((TextMessage)message2).getText()); - } else { - System.out.println("Reponse message: "+message2); - } - if(transacted) { - session.commit(); - } - - Thread.sleep(sleepTime); - - } - - } - - /** - * @param i - * @return - */ - private String createMessageText(int index) { - StringBuffer buffer = new StringBuffer(messageSize); - buffer.append("Message: " + index + " sent at: " + new Date()); - if (buffer.length() > messageSize) { - return buffer.substring(0, messageSize); - } - for (int i = buffer.length(); i < messageSize; i++) { - buffer.append(' '); - } - return buffer.toString(); - } + private int messageCount = 10; + private long sleepTime = 0L; + private boolean verbose = true; + private int messageSize = 255; + private long timeToLive; + private String subject = "TOOL.DEFAULT"; + private String replySubject; + private boolean topic = false; + private String user = ActiveMQConnection.DEFAULT_USER; + private String password = ActiveMQConnection.DEFAULT_PASSWORD; + private String url = ActiveMQConnection.DEFAULT_BROKER_URL; + private boolean transacted = false; + private boolean persistent = false; + private String clientId; + + private Destination destination; + private Destination replyDest; + private MessageProducer producer; + private MessageConsumer consumer; + private Session session; + + public static void main(String[] args) { + RequesterTool requesterTool = new RequesterTool(); + String[] unknonwn = CommnadLineSupport.setOptions(requesterTool, args); + if (unknonwn.length > 0) { + System.out.println("Unknown options: " + Arrays.toString(unknonwn)); + System.exit(-1); + } + requesterTool.run(); + } + + public void run() { + + Connection connection=null; + try { + + System.out.println("Connecting to URL: " + url); + System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject); + System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages"); + System.out.println("Sleeping between publish " + sleepTime + " ms"); + + // Create the connection + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); + connection = connectionFactory.createConnection(); + if (persistent && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) { + connection.setClientID(clientId); + } + connection.start(); + + // Create the Session + session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + // And the Destinations.. + if (topic) { + destination = session.createTopic(subject); + if( replySubject==null ) + replyDest = session.createTemporaryTopic(); + else + replyDest = session.createTopic(replySubject); + } else { + destination = session.createQueue(subject); + if( replySubject==null ) + replyDest = session.createTemporaryQueue(); + else + replyDest = session.createQueue(replySubject); + } + System.out.println("Reply Destination: " + replyDest); + + // Create the producer + producer = session.createProducer(destination); + if (persistent) { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } else { + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + } + if (timeToLive != 0) { + System.out.println("Messages time to live " + timeToLive + " ms"); + producer.setTimeToLive(timeToLive); + } + + // Create the reply consumer + consumer = session.createConsumer(replyDest); + + // Start sending reqests. + requestLoop(); + + System.out.println("Done."); + + // Use the ActiveMQConnection interface to dump the connection stats. + ActiveMQConnection c = (ActiveMQConnection) connection; + c.getConnectionStats().dump(new IndentPrinter()); + + } catch (Exception e) { + System.out.println("Caught: " + e); + e.printStackTrace(); + } finally { + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected void requestLoop() throws Exception { + + for (int i = 0; i < messageCount || messageCount == 0; i++) { + + TextMessage message = session.createTextMessage(createMessageText(i)); + message.setJMSReplyTo(replyDest); + + if (verbose) { + String msg = message.getText(); + if (msg.length() > 50) { + msg = msg.substring(0, 50) + "..."; + } + System.out.println("Sending message: " + msg); + } + + producer.send(message); + if (transacted) { + session.commit(); + } + + System.out.println("Waiting for reponse message..."); + Message message2 = consumer.receive(); + if (message2 instanceof TextMessage) { + System.out.println("Reponse message: " + ((TextMessage) message2).getText()); + } else { + System.out.println("Reponse message: " + message2); + } + if (transacted) { + session.commit(); + } + + Thread.sleep(sleepTime); + + } + } + + /** + * @param i + * @return + */ + private String createMessageText(int index) { + StringBuffer buffer = new StringBuffer(messageSize); + buffer.append("Message: " + index + " sent at: " + new Date()); + if (buffer.length() > messageSize) { + return buffer.substring(0, messageSize); + } + for (int i = buffer.length(); i < messageSize; i++) { + buffer.append(' '); + } + return buffer.toString(); + } + + + public void setClientId(String clientId) { + this.clientId = clientId; + } + public void setPersistent(boolean durable) { + this.persistent = durable; + } + public void setMessageCount(int messageCount) { + this.messageCount = messageCount; + } + public void setMessageSize(int messageSize) { + this.messageSize = messageSize; + } + public void setPassword(String password) { + this.password = password; + } + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + public void setSubject(String subject) { + this.subject = subject; + } + public void setTimeToLive(long timeToLive) { + this.timeToLive = timeToLive; + } + public void setTopic(boolean topic) { + this.topic = topic; + } + public void setQueue(boolean queue) { + this.topic = !queue; + } + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + public void setUrl(String url) { + this.url = url; + } + public void setUser(String user) { + this.user = user; + } + public void setVerbose(boolean verbose) { + this.verbose = verbose; + } + public void setReplySubject(String replySubject) { + this.replySubject = replySubject; + } } Added: incubator/activemq/trunk/assembly/src/release/example/src/TopicListener.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/TopicListener.java?view=auto&rev=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/example/src/TopicListener.java (added) +++ incubator/activemq/trunk/assembly/src/release/example/src/TopicListener.java Sun Oct 29 06:35:48 2006 @@ -0,0 +1,123 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.util.Arrays; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; + +/** + * Use in conjunction with TopicPublisher to test the performance of ActiveMQ Topics. + */ +public class TopicListener implements MessageListener { + + private Connection connection; + private MessageProducer producer; + private Session session; + private int count; + private long start; + private Topic topic; + private Topic control; + +// private String url="tcp://localhost:61616?jms.dispatchAsync=true&jms.useAsyncSend=true&jms.optimizeAcknowledge=true&jms.disableTimeStampsByDefault=true&jms.optimizedMessageDispatch=true&wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false"; + private String url="tcp://localhost:61616"; + + public static void main(String[] argv) throws Exception { + TopicListener l = new TopicListener(); + String[] unknonwn = CommnadLineSupport.setOptions(l, argv); + if (unknonwn.length > 0) { + System.out.println("Unknown options: " + Arrays.toString(unknonwn)); + System.exit(-1); + } + l.run(); + } + + public void run() throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); + connection = factory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic("topictest.messages"); + control = session.createTopic("topictest.control"); + + MessageConsumer consumer = session.createConsumer(topic); + consumer.setMessageListener(this); + + connection.start(); + + producer = session.createProducer(control); + System.out.println("Waiting for messages..."); + } + + private static boolean checkText(Message m, String s) + { + try + { + return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + return false; + } + } + + + public void onMessage(Message message) { + if ( checkText(message, "SHUTDOWN") ) { + + try { + connection.close(); + } catch (Exception e) { + e.printStackTrace(System.out); + } + + } else if (checkText(message, "REPORT")) { + // send a report: + try { + long time = (System.currentTimeMillis() - start); + String msg = "Received " + count + " in " + time + "ms"; + producer.send(session.createTextMessage(msg)); + } catch (Exception e) { + e.printStackTrace(System.out); + } + count = 0; + + } else { + + if (count==0) { + start = System.currentTimeMillis(); + } + + if (++count % 1000 == 0) + System.out.println("Received " + count + " messages."); + } + } + + public void setUrl(String url) { + this.url = url; + } + +} Added: incubator/activemq/trunk/assembly/src/release/example/src/TopicPublisher.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/assembly/src/release/example/src/TopicPublisher.java?view=auto&rev=468913 ============================================================================== --- incubator/activemq/trunk/assembly/src/release/example/src/TopicPublisher.java (added) +++ incubator/activemq/trunk/assembly/src/release/example/src/TopicPublisher.java Sun Oct 29 06:35:48 2006 @@ -0,0 +1,214 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.util.Arrays; + +import javax.jms.*; + +import org.apache.activemq.ActiveMQConnectionFactory; + +/** + * Use in conjunction with TopicListener to test the performance of ActiveMQ Topics. + */ +public class TopicPublisher implements MessageListener +{ + private final Object mutex = new Object(); + private Connection connection; + private Session session; + private MessageProducer publisher; + private Topic topic; + private Topic control; + +// private String url="tcp://localhost:61616?jms.dispatchAsync=true&jms.useAsyncSend=true&jms.optimizeAcknowledge=true&jms.disableTimeStampsByDefault=true&jms.optimizedMessageDispatch=true&wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false"; + private String url="tcp://localhost:61616"; + private int size=256; + private int subscribers=1; + private int remaining; + private int messages=10000; + private long delay; + private int batch=40; + + private byte[] payload; + private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + + public static void main(String[] argv) throws Exception + { + TopicPublisher p = new TopicPublisher(); + String[] unknonwn = CommnadLineSupport.setOptions(p, argv); + if (unknonwn.length > 0) { + System.out.println("Unknown options: " + Arrays.toString(unknonwn)); + System.exit(-1); + } + p.run(); + } + + private void run() throws Exception + { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); + connection = factory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic("topictest.messages"); + control = session.createTopic("topictest.control"); + + publisher = session.createProducer(topic); + publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + payload = new byte[size]; + for(int i = 0; i < size; i++) + { + payload[i] = (byte) DATA[i % DATA.length]; + } + + session.createConsumer(control).setMessageListener(this); + connection.start(); + + long[] times = new long[batch]; + for(int i = 0; i < batch; i++) + { + if(i > 0) Thread.sleep(delay*1000); + times[i] = batch(messages); + System.out.println("Batch " + (i+1) + " of " + batch + " completed in " + times[i] + " ms."); + } + + long min = min(times); + long max = max(times); + System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max)); + + //request shutdown + publisher.send(session.createTextMessage("SHUTDOWN")); + + connection.stop(); + connection.close(); + } + + private long batch(int msgCount) throws Exception + { + long start = System.currentTimeMillis(); + remaining=subscribers; + publish(); + waitForCompletion(); + return System.currentTimeMillis() - start; + } + + private void publish() throws Exception + { + + //send events + BytesMessage msg = session.createBytesMessage(); + msg.writeBytes(payload); + for (int i = 0; i < messages; i++) + { + publisher.send(msg); + if ((i + 1) % 1000 == 0) + { + System.out.println("Sent " + (i + 1) + " messages"); + } + } + + //request report + publisher.send(session.createTextMessage("REPORT")); + } + + private void waitForCompletion() throws Exception + { + System.out.println("Waiting for completion..."); + synchronized (mutex) + { + while (remaining > 0) + { + mutex.wait(); + } + } + } + + + public void onMessage(Message message) + { + synchronized (mutex) + { + System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining"); + if (remaining == 0) + { + mutex.notify(); + } + } + } + + Object getReport(Message m) + { + try + { + return ((TextMessage) m).getText(); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + return e.toString(); + } + } + + static long min(long[] times) + { + long min = times.length > 0 ? times[0] : 0; + for(int i = 0; i < times.length; i++) + { + min = Math.min(min, times[i]); + } + return min; + } + + static long max(long[] times) + { + long max = times.length > 0 ? times[0] : 0; + for(int i = 0; i < times.length; i++) + { + max = Math.max(max, times[i]); + } + return max; + } + + static long avg(long[] times, long min, long max) + { + long sum = 0; + for(int i = 0; i < times.length; i++) + { + sum += times[i]; + } + sum -= min; + sum -= max; + return (sum / times.length - 2); + } + + public void setBatch(int batch) { + this.batch = batch; + } + public void setDelay(long delay) { + this.delay = delay; + } + public void setMessages(int messages) { + this.messages = messages; + } + public void setSize(int size) { + this.size = size; + } + public void setSubscribers(int subscribers) { + this.subscribers = subscribers; + } + public void setUrl(String url) { + this.url = url; + } +}