Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5B3F9106B1 for ; Thu, 10 Oct 2013 14:40:40 +0000 (UTC) Received: (qmail 96003 invoked by uid 500); 10 Oct 2013 14:40:35 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 95814 invoked by uid 500); 10 Oct 2013 14:40:28 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 95416 invoked by uid 99); 10 Oct 2013 14:40:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Oct 2013 14:40:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id F2744914205; Thu, 10 Oct 2013 14:40:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chirino@apache.org To: commits@activemq.apache.org Date: Thu, 10 Oct 2013 14:40:30 -0000 Message-Id: <66e196a743b941ec8b706f8432f31880@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/18] Porting apollo examples structure to ActiveMQ 5.9. http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/exploring-jms/build.xml ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/exploring-jms/build.xml b/assembly/src/release/examples/openwire/exploring-jms/build.xml new file mode 100644 index 0000000..9318351 --- /dev/null +++ b/assembly/src/release/examples/openwire/exploring-jms/build.xml @@ -0,0 +1,334 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/exploring-jms/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/exploring-jms/conf/log4j.properties b/assembly/src/release/examples/openwire/exploring-jms/conf/log4j.properties new file mode 100644 index 0000000..e5326c5 --- /dev/null +++ b/assembly/src/release/examples/openwire/exploring-jms/conf/log4j.properties @@ -0,0 +1,39 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# +# The logging properties used by the standalone ActiveMQ broker +# +log4j.rootLogger=INFO, stdout, logfile + +# CONSOLE appender +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %m%n + +# Log File appender +log4j.appender.logfile=org.apache.log4j.FileAppender +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +log4j.appender.logfile.file=activemq.log +log4j.appender.logfile.append=true + +# +# You can change logger levels here. +# +log4j.logger.org.apache.activemq=INFO +log4j.logger.org.apache.activemq.spring=WARN http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/exploring-jms/readme.txt ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/exploring-jms/readme.txt b/assembly/src/release/examples/openwire/exploring-jms/readme.txt new file mode 100644 index 0000000..3296f92 --- /dev/null +++ b/assembly/src/release/examples/openwire/exploring-jms/readme.txt @@ -0,0 +1,10 @@ +FuseSource distribution of ActiveMQ - Exploring JMS v1.1 +======================================================== +The Exploring JMS Samples are a described in the "Exploring JMS with +the FuseSource distribution of ActiveMQ" book available from fusesource.com. + +They demonstrate how to use the JMS client APIs to accomplish basic messaging +tasks. + +The build.xml file provides targets for rebuilding the examples and for +launching them. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/java/pom.xml ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/java/pom.xml b/assembly/src/release/examples/openwire/java/pom.xml new file mode 100644 index 0000000..0e7a500 --- /dev/null +++ b/assembly/src/release/examples/openwire/java/pom.xml @@ -0,0 +1,83 @@ + + + + + 4.0.0 + + example + openwire-example + 0.1-SNAPSHOT + + example + ActiveMQ Openwire Java Examples + + + + Fusesource Snapshots + http://repo.fusesource.com/nexus/content/repositories/snapshots + + + + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + 1.1 + + + org.apache.activemq + activemq-client + ${project.version} + + + org.slf4j + slf4j-nop + ${slf4j-version} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.1 + + 1.6 + 1.6 + + + + + + org.fusesource.mvnplugins + maven-uberize-plugin + 1.14 + + + package + uberize + + + + + + + + + http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/java/readme.md ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/java/readme.md b/assembly/src/release/examples/openwire/java/readme.md new file mode 100644 index 0000000..ad23ad7 --- /dev/null +++ b/assembly/src/release/examples/openwire/java/readme.md @@ -0,0 +1,32 @@ +## Overview + +This is an example of how use the Java JMS api with ActiveMQ. + +## Prereqs + +- Install Java SDK +- Install [Maven](http://maven.apache.org/download.html) + +## Building + +Run: + + mvn install + +## Running the Examples + +In one terminal window run: + + java -cp target/openwire-example-0.1-SNAPSHOT.jar example.Listener + +In another terminal window run: + + java -cp target/openwire-example-0.1-SNAPSHOT.jar example.Publisher + +You can control to which server the examples try to connect to by +setting the following environment variables: + +* `ACTIVEMQ_HOST` +* `ACTIVEMQ_PORT` +* `ACTIVEMQ_USER` +* `ACTIVEMQ_PASSWORD` http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/java/src/main/java/example/Listener.java ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/java/src/main/java/example/Listener.java b/assembly/src/release/examples/openwire/java/src/main/java/example/Listener.java new file mode 100644 index 0000000..eab887d --- /dev/null +++ b/assembly/src/release/examples/openwire/java/src/main/java/example/Listener.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQTopic; + +import javax.jms.*; + +class Listener { + + public static void main(String []args) throws JMSException { + + String user = env("ACTIVEMQ_USER", "admin"); + String password = env("ACTIVEMQ_PASSWORD", "password"); + String host = env("ACTIVEMQ_HOST", "localhost"); + int port = Integer.parseInt(env("ACTIVEMQ_PORT", "61616")); + String destination = arg(args, 0, "event"); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port); + + Connection connection = factory.createConnection(user, password); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = new ActiveMQTopic(destination); + + MessageConsumer consumer = session.createConsumer(dest); + long start = System.currentTimeMillis(); + long count = 1; + System.out.println("Waiting for messages..."); + while(true) { + Message msg = consumer.receive(); + if( msg instanceof TextMessage ) { + String body = ((TextMessage) msg).getText(); + if( "SHUTDOWN".equals(body)) { + long diff = System.currentTimeMillis() - start; + System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0))); + break; + } else { + if( count != msg.getIntProperty("id") ) { + System.out.println("mismatch: "+count+"!="+msg.getIntProperty("id")); + } + count = msg.getIntProperty("id"); + + if( count == 0 ) { + start = System.currentTimeMillis(); + } + if( count % 1000 == 0 ) { + System.out.println(String.format("Received %d messages.", count)); + } + count ++; + } + + } else { + System.out.println("Unexpected message type: "+msg.getClass()); + } + } + connection.close(); + } + + private static String env(String key, String defaultValue) { + String rc = System.getenv(key); + if( rc== null ) + return defaultValue; + return rc; + } + + private static String arg(String []args, int index, String defaultValue) { + if( index < args.length ) + return args[index]; + else + return defaultValue; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/java/src/main/java/example/Publisher.java ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/java/src/main/java/example/Publisher.java b/assembly/src/release/examples/openwire/java/src/main/java/example/Publisher.java new file mode 100644 index 0000000..4f2b4f4 --- /dev/null +++ b/assembly/src/release/examples/openwire/java/src/main/java/example/Publisher.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package example; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQTopic; + +import javax.jms.*; + +class Publisher { + + public static void main(String []args) throws JMSException { + + String user = env("ACTIVEMQ_USER", "admin"); + String password = env("ACTIVEMQ_PASSWORD", "password"); + String host = env("ACTIVEMQ_HOST", "localhost"); + int port = Integer.parseInt(env("ACTIVEMQ_PORT", "61616")); + String destination = arg(args, 0, "event"); + + int messages = 10000; + int size = 256; + + String DATA = "abcdefghijklmnopqrstuvwxyz"; + String body = ""; + for( int i=0; i < size; i ++) { + body += DATA.charAt(i%DATA.length()); + } + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port); + + Connection connection = factory.createConnection(user, password); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = new ActiveMQTopic(destination); + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + for( int i=1; i <= messages; i ++) { + TextMessage msg = session.createTextMessage(body); + msg.setIntProperty("id", i); + producer.send(msg); + if( (i % 1000) == 0) { + System.out.println(String.format("Sent %d messages", i)); + } + } + + producer.send(session.createTextMessage("SHUTDOWN")); + connection.close(); + + } + + private static String env(String key, String defaultValue) { + String rc = System.getenv(key); + if( rc== null ) + return defaultValue; + return rc; + } + + private static String arg(String []args, int index, String defaultValue) { + if( index < args.length ) + return args[index]; + else + return defaultValue; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/swissarmy/build.xml ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/swissarmy/build.xml b/assembly/src/release/examples/openwire/swissarmy/build.xml new file mode 100755 index 0000000..f2aba5d --- /dev/null +++ b/assembly/src/release/examples/openwire/swissarmy/build.xml @@ -0,0 +1,328 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + This script requires Ant 1.6 or higher + + NOTE: All options should be specified as system properties + on the command line, e.g.: + + ant consumer -Durl=tcp://hostname:1234 -Dtopic=true + + OR + + ant producer -Durl=tcp://hostname:61616 -Dmax=1000 + + Usage: + ant <ant-task> <options> + + -------------------------------------------------------- + ant consumer <options> - Creates a consumer which waits until a specific number + of messages have been received + + Consumer Options: + url - Used to specify acustom URL for the + broker, e.g., tcp://hostname:1234 + topic - A boolean to determine whether to use + topics or queues; the default is false + subject - Used to specify a custom destination + name, e.g. MyDestination + durable - A boolean to specify that you want to + create a durable topic? + max - The maximum number of messages to wait + for before shutting down + clientId - A string to use as the client id + transacted - A boolean to specify that you want to use + transactions? + sleepTime - The time to sleep between message consumptions + verbose - Used to print out more info; the default is + true + ack-mode - The type of message acknowledgement to use; + see the Javadocs for javax.jms.Session for + more information + receive-time-out - An integer to specify the time to wait for + message consumption + parallelThreads - The number of parallel threads + batch - Batch size for transactions and client acknowledgment (default 10) + user - Connection username (if connecting to secured broker) + password - Connection password (if connecting to secured broker) + + -------------------------------------------------------- + ant producer <options> - Creates a producer publishing a number of messages + + Producer Options: + url - Used to specify acustom URL for the broker, + e.g., tcp://hostname:1234 + topic - A boolean to determine whether to use topics + or queues + subject - Used to specify a custom destination name, + e.g. MyDestination + durable - A boolean to specify that you want to change the DeliveryMode of + the produced messages ('PERSISTENT':'true' or 'NON-PERSISTENT':'false') + max - The maximum number of messages to wait for + before shutting down + sleepTime - The time to sleep between message consumptions + transacted - A boolean to specify that you want to use + transactions? + verbose - Used to print out more info; the default is true + messageSize - The size of the message in 1-byte characters + parallelThreads - The number of parallel threads + batch - Batch size for transactions and client acknowledgment (default 10) + user - Connection username (if connecting to secured broker) + password - Connection password (if connecting to secured broker) + + -------------------------------------------------------- + + ant -help - Display ant help screen + ant help - Display this message + ant clean - Delete the built directory + ant embedBroker - Runs an embedded broker inside Java code + ant war - Creates a WAR deployment unit of the ActiveMQ Broker + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Running consumer against server at $$url = ${url} for subject $$subject = ${subject} + + + + + + + + + + + + + + + + + + + + + + + + + Running producer against server at $$url = ${url} for subject $$subject = ${subject} + + + + + + + + + + + + + + + + + + + + + + + + Running requester against server at $$url = ${url} for subject $$subject = ${subject} + + + + + + + + + + + + + + + + + + + + + + Running an embedded broker example + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Running a Stomp example + + + + + + + + Running a Log4j JMS Appender example + + + + + + + http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/swissarmy/readme.md ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/swissarmy/readme.md b/assembly/src/release/examples/openwire/swissarmy/readme.md new file mode 100644 index 0000000..0f3d709 --- /dev/null +++ b/assembly/src/release/examples/openwire/swissarmy/readme.md @@ -0,0 +1,32 @@ +# Overview + +Before running the examples you should start ActiveMQ on your machine. Follow the +installation instructions to use a binary distribution of ActiveMQ. To run the broker +in a command shell, type: + + bin/activemq + +This starts up the ActiveMQ broker. + +h3. Running the examples from a binary distro + +You can use [Ant](http://ant.apache.org) to compile and run the examples. To run a +message producer run: + + ant producer + +To run a consumer, in another shell run: + + ant consumer + +You should then see messages being produced and consumed. You can also pass additional +commands into these goals using variables that are available in the build.xml. Below +is an example: + + ant consumer -Durl=tcp://localhost:61616 -Dtopic=true -Ddurable=true -Dmax=5000 + ant producer -Durl=tcp://localhost:61616 -Dtopic=true -DtimeToLive=30000 -Dmax=5000 + +For a summary of all the available goals and options try: + + ant help + http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/swissarmy/src/CommandLineSupport.java ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/swissarmy/src/CommandLineSupport.java b/assembly/src/release/examples/openwire/swissarmy/src/CommandLineSupport.java new file mode 100644 index 0000000..cbde8a4 --- /dev/null +++ b/assembly/src/release/examples/openwire/swissarmy/src/CommandLineSupport.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; + +import org.apache.activemq.util.IntrospectionSupport; + +/** + * Helper utility that can be used to set the properties on any object using + * command line arguments. + * + * @author Hiram Chirino + */ +public final class CommandLineSupport { + + private CommandLineSupport() { + } + + /** + * Sets the properties of an object given the command line args. + * + * if args contains: --ack-mode=AUTO --url=tcp://localhost:61616 --persistent + * + * then it will try to call the following setters on the target object. + * + * target.setAckMode("AUTO"); + * target.setURL(new URI("tcp://localhost:61616") ); + * target.setPersistent(true); + * + * Notice the the proper conversion for the argument is determined by examining the + * setter arguement type. + * + * @param target the object that will have it's properties set + * @param args the commline options + * @return any arguments that are not valid options for the target + */ + public static String[] setOptions(Object target, String[] args) { + ArrayList rc = new ArrayList(); + + for (int i = 0; i < args.length; i++) { + if (args[i] == null) { + continue; + } + + if (args[i].startsWith("--")) { + + // --options without a specified value are considered boolean + // flags that are enabled. + String value = "true"; + String name = args[i].substring(2); + + // if --option=value case + int p = name.indexOf("="); + if (p > 0) { + value = name.substring(p + 1); + name = name.substring(0, p); + } + + // name not set, then it's an unrecognized option + if (name.length() == 0) { + rc.add(args[i]); + continue; + } + + String propName = convertOptionToPropertyName(name); + if (!IntrospectionSupport.setProperty(target, propName, value)) { + rc.add(args[i]); + continue; + } + } + + } + + String r[] = new String[rc.size()]; + rc.toArray(r); + return r; + } + + /** + * converts strings like: test-enabled to testEnabled + * + * @param name + * @return + */ + private static String convertOptionToPropertyName(String name) { + String rc = ""; + + // Look for '-' and strip and then convert the subsequent char to + // uppercase + int p = name.indexOf("-"); + while (p > 0) { + // strip + rc += name.substring(0, p); + name = name.substring(p + 1); + + // can I convert the next char to upper? + if (name.length() > 0) { + rc += name.substring(0, 1).toUpperCase(); + name = name.substring(1); + } + + p = name.indexOf("-"); + } + return rc + name; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/swissarmy/src/ConsumerTool.java ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/swissarmy/src/ConsumerTool.java b/assembly/src/release/examples/openwire/swissarmy/src/ConsumerTool.java new file mode 100755 index 0000000..e8f39f7 --- /dev/null +++ b/assembly/src/release/examples/openwire/swissarmy/src/ConsumerTool.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Iterator; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; + +/** + * A simple tool for consuming messages + * + * + */ +public class ConsumerTool extends Thread implements MessageListener, ExceptionListener { + + private boolean running; + + private Session session; + private Destination destination; + private MessageProducer replyProducer; + + private boolean pauseBeforeShutdown = false; + private boolean verbose = true; + private int maxiumMessages; + private static int parallelThreads = 1; + private String subject = "TOOL.DEFAULT"; + private boolean topic; + private String user = ActiveMQConnection.DEFAULT_USER; + private String password = ActiveMQConnection.DEFAULT_PASSWORD; + private String url = ActiveMQConnection.DEFAULT_BROKER_URL; + private boolean transacted; + private boolean durable; + private String clientId; + private int ackMode = Session.AUTO_ACKNOWLEDGE; + private String consumerName = "James"; + private long sleepTime; + private long receiveTimeOut; + private long batch = 10; // Default batch size for CLIENT_ACKNOWLEDGEMENT or SESSION_TRANSACTED + private long messagesReceived = 0; + + public static void main(String[] args) { + ArrayList threads = new ArrayList(); + ConsumerTool consumerTool = new ConsumerTool(); + String[] unknown = CommandLineSupport.setOptions(consumerTool, args); + if (unknown.length > 0) { + System.out.println("Unknown options: " + Arrays.toString(unknown)); + System.exit(-1); + } + consumerTool.showParameters(); + for (int threadCount = 1; threadCount <= parallelThreads; threadCount++) { + consumerTool = new ConsumerTool(); + CommandLineSupport.setOptions(consumerTool, args); + consumerTool.start(); + threads.add(consumerTool); + } + + while (true) { + Iterator itr = threads.iterator(); + int running = 0; + while (itr.hasNext()) { + ConsumerTool thread = itr.next(); + if (thread.isAlive()) { + running++; + } + } + + if (running <= 0) { + System.out.println("All threads completed their work"); + break; + } + + try { + Thread.sleep(1000); + } catch (Exception e) { + } + } + Iterator itr = threads.iterator(); + while (itr.hasNext()) { + ConsumerTool thread = itr.next(); + } + } + + public void showParameters() { + System.out.println("Connecting to URL: " + url + " (" + user + ":" + password + ")"); + System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject); + System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription"); + System.out.println("Running " + parallelThreads + " parallel threads"); + } + + public void run() { + try { + running = true; + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); + Connection connection = connectionFactory.createConnection(); + if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) { + connection.setClientID(clientId); + } + connection.setExceptionListener(this); + connection.start(); + + session = connection.createSession(transacted, ackMode); + if (topic) { + destination = session.createTopic(subject); + } else { + destination = session.createQueue(subject); + } + + replyProducer = session.createProducer(null); + replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + MessageConsumer consumer = null; + if (durable && topic) { + consumer = session.createDurableSubscriber((Topic) destination, consumerName); + } else { + consumer = session.createConsumer(destination); + } + + if (maxiumMessages > 0) { + consumeMessagesAndClose(connection, session, consumer); + } else { + if (receiveTimeOut == 0) { + consumer.setMessageListener(this); + } else { + consumeMessagesAndClose(connection, session, consumer, receiveTimeOut); + } + } + + } catch (Exception e) { + System.out.println("[" + this.getName() + "] Caught: " + e); + e.printStackTrace(); + } + } + + public void onMessage(Message message) { + + messagesReceived++; + + try { + + if (message instanceof TextMessage) { + TextMessage txtMsg = (TextMessage) message; + if (verbose) { + + String msg = txtMsg.getText(); + int length = msg.length(); + if (length > 50) { + msg = msg.substring(0, 50) + "..."; + } + System.out.println("[" + this.getName() + "] Received: '" + msg + "' (length " + length + ")"); + } + } else { + if (verbose) { + System.out.println("[" + this.getName() + "] Received: '" + message + "'"); + } + } + + if (message.getJMSReplyTo() != null) { + replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID())); + } + + if (transacted) { + if ((messagesReceived % batch) == 0) { + System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived); + session.commit(); + } + } else if (ackMode == Session.CLIENT_ACKNOWLEDGE) { + if ((messagesReceived % batch) == 0) { + System.out.println("Acknowledging last " + batch + " messages; messages so far = " + messagesReceived); + message.acknowledge(); + } + } + + } catch (JMSException e) { + System.out.println("[" + this.getName() + "] Caught: " + e); + e.printStackTrace(); + } finally { + if (sleepTime > 0) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + } + } + } + + public synchronized void onException(JMSException ex) { + System.out.println("[" + this.getName() + "] JMS Exception occured. Shutting down client."); + running = false; + } + + synchronized boolean isRunning() { + return running; + } + + protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, + IOException { + System.out.println("[" + this.getName() + "] We are about to wait until we consume: " + maxiumMessages + + " message(s) then we will shutdown"); + + for (int i = 0; i < maxiumMessages && isRunning();) { + Message message = consumer.receive(1000); + if (message != null) { + i++; + onMessage(message); + } + } + System.out.println("[" + this.getName() + "] Closing connection"); + consumer.close(); + session.close(); + connection.close(); + if (pauseBeforeShutdown) { + System.out.println("[" + this.getName() + "] Press return to shut down"); + System.in.read(); + } + } + + protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) + throws JMSException, IOException { + System.out.println("[" + this.getName() + "] We will consume messages while they continue to be delivered within: " + timeout + + " ms, and then we will shutdown"); + + Message message; + while ((message = consumer.receive(timeout)) != null) { + onMessage(message); + } + + System.out.println("[" + this.getName() + "] Closing connection"); + consumer.close(); + session.close(); + connection.close(); + if (pauseBeforeShutdown) { + System.out.println("[" + this.getName() + "] Press return to shut down"); + System.in.read(); + } + } + + public void setAckMode(String ackMode) { + if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) { + this.ackMode = Session.CLIENT_ACKNOWLEDGE; + } + if ("AUTO_ACKNOWLEDGE".equals(ackMode)) { + this.ackMode = Session.AUTO_ACKNOWLEDGE; + } + if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) { + this.ackMode = Session.DUPS_OK_ACKNOWLEDGE; + } + if ("SESSION_TRANSACTED".equals(ackMode)) { + this.ackMode = Session.SESSION_TRANSACTED; + } + } + + public void setClientId(String clientID) { + this.clientId = clientID; + } + + public void setConsumerName(String consumerName) { + this.consumerName = consumerName; + } + + public void setDurable(boolean durable) { + this.durable = durable; + } + + public void setMaxiumMessages(int maxiumMessages) { + this.maxiumMessages = maxiumMessages; + } + + public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) { + this.pauseBeforeShutdown = pauseBeforeShutdown; + } + + public void setPassword(String pwd) { + this.password = pwd; + } + + public void setReceiveTimeOut(long receiveTimeOut) { + this.receiveTimeOut = receiveTimeOut; + } + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + + public void setSubject(String subject) { + this.subject = subject; + } + + public void setParallelThreads(int parallelThreads) { + if (parallelThreads < 1) { + parallelThreads = 1; + } + this.parallelThreads = parallelThreads; + } + + public void setTopic(boolean topic) { + this.topic = topic; + } + + public void setQueue(boolean queue) { + this.topic = !queue; + } + + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + + public void setUrl(String url) { + this.url = url; + } + + public void setUser(String user) { + this.user = user; + } + + public void setVerbose(boolean verbose) { + this.verbose = verbose; + } + + public void setBatch(long batch) { + this.batch = batch; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/swissarmy/src/EmbeddedBroker.java ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/swissarmy/src/EmbeddedBroker.java b/assembly/src/release/examples/openwire/swissarmy/src/EmbeddedBroker.java new file mode 100644 index 0000000..d45ceff --- /dev/null +++ b/assembly/src/release/examples/openwire/swissarmy/src/EmbeddedBroker.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.activemq.broker.BrokerService; + +/** + * This example demonstrates how to run an embedded broker inside your Java code + * + * + */ +public final class EmbeddedBroker { + + private EmbeddedBroker() { + } + + public static void main(String[] args) throws Exception { + BrokerService broker = new BrokerService(); + broker.setUseJmx(true); + broker.addConnector("tcp://localhost:61616"); + broker.start(); + + // now lets wait forever to avoid the JVM terminating immediately + Object lock = new Object(); + synchronized (lock) { + lock.wait(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/swissarmy/src/Log4jJMSAppenderExample.java ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/swissarmy/src/Log4jJMSAppenderExample.java b/assembly/src/release/examples/openwire/swissarmy/src/Log4jJMSAppenderExample.java new file mode 100644 index 0000000..657b9b2 --- /dev/null +++ b/assembly/src/release/examples/openwire/swissarmy/src/Log4jJMSAppenderExample.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; + + +/** + * A simple example of log4j jms appender in conjuction with ActiveMQ + */ +public class Log4jJMSAppenderExample implements MessageListener { + + public Log4jJMSAppenderExample() throws Exception { + // create a logTopic topic consumer + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + Connection conn = factory.createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + MessageConsumer consumer = sess.createConsumer(sess.createTopic("logTopic")); + consumer.setMessageListener(this); + // log a message + Logger log = Logger.getLogger(Log4jJMSAppenderExample.class); + log.info("Test log"); + // clean up + Thread.sleep(1000); + consumer.close(); + sess.close(); + conn.close(); + System.exit(1); + } + + public static void main(String[] args) throws Exception { + new Log4jJMSAppenderExample(); + } + + public void onMessage(Message message) { + try { + // receive log event in your consumer + LoggingEvent event = (LoggingEvent)((ActiveMQObjectMessage)message).getObject(); + System.out.println("Received log [" + event.getLevel() + "]: "+ event.getMessage()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/swissarmy/src/ProducerAndConsumerTool.java ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/swissarmy/src/ProducerAndConsumerTool.java b/assembly/src/release/examples/openwire/swissarmy/src/ProducerAndConsumerTool.java new file mode 100644 index 0000000..a1b96ff --- /dev/null +++ b/assembly/src/release/examples/openwire/swissarmy/src/ProducerAndConsumerTool.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Arrays; +import java.util.HashSet; + +import javax.jms.MessageListener; + +/** + * A simple tool for producing and consuming messages + * + * + */ +public class ProducerAndConsumerTool extends ConsumerTool implements MessageListener { + + public static void main(String[] args) { + + ConsumerTool consumerTool = new ConsumerTool(); + String[] unknown = CommandLineSupport.setOptions(consumerTool, args); + HashSet set1 = new HashSet(Arrays.asList(unknown)); + + ProducerTool producerTool = new ProducerTool(); + unknown = CommandLineSupport.setOptions(producerTool, args); + HashSet set2 = new HashSet(Arrays.asList(unknown)); + + set1.retainAll(set2); + if (set1.size() > 0) { + System.out.println("Unknown options: " + set1); + System.exit(-1); + } + + consumerTool.run(); + producerTool.run(); + + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/swissarmy/src/ProducerTool.java ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/swissarmy/src/ProducerTool.java b/assembly/src/release/examples/openwire/swissarmy/src/ProducerTool.java new file mode 100755 index 0000000..3693fd7 --- /dev/null +++ b/assembly/src/release/examples/openwire/swissarmy/src/ProducerTool.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.util.IndentPrinter; + +/** + * A simple tool for publishing messages + * + * + */ +public class ProducerTool extends Thread { + + private Destination destination; + private int messageCount = 10; + private long sleepTime; + private boolean verbose = true; + private int messageSize = 255; + private static int parallelThreads = 1; + private long timeToLive; + private String user = ActiveMQConnection.DEFAULT_USER; + private String password = ActiveMQConnection.DEFAULT_PASSWORD; + private String url = ActiveMQConnection.DEFAULT_BROKER_URL; + private String subject = "TOOL.DEFAULT"; + private boolean topic; + private boolean transacted; + private boolean persistent; + private long batch = 10; + private static Object lockResults = new Object(); + + public static void main(String[] args) { + ArrayList threads = new ArrayList(); + ProducerTool producerTool = new ProducerTool(); + String[] unknown = CommandLineSupport.setOptions(producerTool, args); + if (unknown.length > 0) { + System.out.println("Unknown options: " + Arrays.toString(unknown)); + System.exit(-1); + } + producerTool.showParameters(); + for (int threadCount = 1; threadCount <= parallelThreads; threadCount++) { + producerTool = new ProducerTool(); + CommandLineSupport.setOptions(producerTool, args); + producerTool.start(); + threads.add(producerTool); + } + + while (true) { + Iterator itr = threads.iterator(); + int running = 0; + while (itr.hasNext()) { + ProducerTool thread = itr.next(); + if (thread.isAlive()) { + running++; + } + } + if (running <= 0) { + System.out.println("All threads completed their work"); + break; + } + try { + Thread.sleep(1000); + } catch (Exception e) { + } + } + } + + public void showParameters() { + System.out.println("Connecting to URL: " + url + " (" + user + ":" + password + ")"); + System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject); + System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages"); + System.out.println("Sleeping between publish " + sleepTime + " ms"); + System.out.println("Running " + parallelThreads + " parallel threads"); + + if (timeToLive != 0) { + System.out.println("Messages time to live " + timeToLive + " ms"); + } + } + + public void run() { + Connection connection = null; + try { + // Create the connection. + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); + connection = connectionFactory.createConnection(); + connection.start(); + + // Create the session + Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + if (topic) { + destination = session.createTopic(subject); + } else { + destination = session.createQueue(subject); + } + + // Create the producer. + MessageProducer producer = session.createProducer(destination); + if (persistent) { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } else { + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + } + if (timeToLive != 0) { + producer.setTimeToLive(timeToLive); + } + + // Start sending messages + sendLoop(session, producer); + + System.out.println("[" + this.getName() + "] Done."); + + synchronized (lockResults) { + ActiveMQConnection c = (ActiveMQConnection) connection; + System.out.println("[" + this.getName() + "] Results:\n"); + c.getConnectionStats().dump(new IndentPrinter()); + } + + } catch (Exception e) { + System.out.println("[" + this.getName() + "] Caught: " + e); + e.printStackTrace(); + } finally { + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected void sendLoop(Session session, MessageProducer producer) throws Exception { + + for (int i = 0; i < messageCount || messageCount == 0; i++) { + + TextMessage message = session.createTextMessage(createMessageText(i)); + + if (verbose) { + String msg = message.getText(); + if (msg.length() > 50) { + msg = msg.substring(0, 50) + "..."; + } + System.out.println("[" + this.getName() + "] Sending message: '" + msg + "'"); + } + + producer.send(message); + + if (transacted && (i % batch == 0)) { + System.out.println("[" + this.getName() + "] Committing " + messageCount + " messages"); + session.commit(); + } + Thread.sleep(sleepTime); + } + } + + private String createMessageText(int index) { + StringBuffer buffer = new StringBuffer(messageSize); + buffer.append("Message: " + index + " sent at: " + new Date()); + if (buffer.length() > messageSize) { + return buffer.substring(0, messageSize); + } + for (int i = buffer.length(); i < messageSize; i++) { + buffer.append(' '); + } + return buffer.toString(); + } + + public void setPersistent(boolean durable) { + this.persistent = durable; + } + + public void setMessageCount(int messageCount) { + this.messageCount = messageCount; + } + + public void setMessageSize(int messageSize) { + this.messageSize = messageSize; + } + + public void setPassword(String pwd) { + this.password = pwd; + } + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + + public void setSubject(String subject) { + this.subject = subject; + } + + public void setTimeToLive(long timeToLive) { + this.timeToLive = timeToLive; + } + + public void setParallelThreads(int parallelThreads) { + if (parallelThreads < 1) { + parallelThreads = 1; + } + this.parallelThreads = parallelThreads; + } + + public void setTopic(boolean topic) { + this.topic = topic; + } + + public void setQueue(boolean queue) { + this.topic = !queue; + } + + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + + public void setUrl(String url) { + this.url = url; + } + + public void setUser(String user) { + this.user = user; + } + + public void setVerbose(boolean verbose) { + this.verbose = verbose; + } + + public void setBatch(long batch) { + this.batch = batch; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/2ecf41d0/assembly/src/release/examples/openwire/swissarmy/src/RequesterTool.java ---------------------------------------------------------------------- diff --git a/assembly/src/release/examples/openwire/swissarmy/src/RequesterTool.java b/assembly/src/release/examples/openwire/swissarmy/src/RequesterTool.java new file mode 100644 index 0000000..cd1a650 --- /dev/null +++ b/assembly/src/release/examples/openwire/swissarmy/src/RequesterTool.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.util.Arrays; +import java.util.Date; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.util.IndentPrinter; + +/** + * A simple tool for publishing messages + * + * + */ +public class RequesterTool { + + private int messageCount = 10; + private long sleepTime; + private boolean verbose = true; + private int messageSize = 255; + private long timeToLive; + private String subject = "TOOL.DEFAULT"; + private String replySubject; + private boolean topic; + private String user = ActiveMQConnection.DEFAULT_USER; + private String password = ActiveMQConnection.DEFAULT_PASSWORD; + private String url = ActiveMQConnection.DEFAULT_BROKER_URL; + private boolean transacted; + private boolean persistent; + private String clientId; + + private Destination destination; + private Destination replyDest; + private MessageProducer producer; + private MessageConsumer consumer; + private Session session; + + public static void main(String[] args) { + RequesterTool requesterTool = new RequesterTool(); + String[] unknown = CommandLineSupport.setOptions(requesterTool, args); + if (unknown.length > 0) { + System.out.println("Unknown options: " + Arrays.toString(unknown)); + System.exit(-1); + } + requesterTool.run(); + } + + public void run() { + + Connection connection = null; + try { + + System.out.println("Connecting to URL: " + url); + System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject); + System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages"); + System.out.println("Sleeping between publish " + sleepTime + " ms"); + + // Create the connection + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); + connection = connectionFactory.createConnection(); + if (persistent && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) { + connection.setClientID(clientId); + } + connection.start(); + + // Create the Session + session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + // And the Destinations.. + if (topic) { + destination = session.createTopic(subject); + if (replySubject == null || replySubject.equals("")) { + replyDest = session.createTemporaryTopic(); + } else { + replyDest = session.createTopic(replySubject); + } + } else { + destination = session.createQueue(subject); + if (replySubject == null || replySubject.equals("")) { + replyDest = session.createTemporaryQueue(); + } else { + replyDest = session.createQueue(replySubject); + } + } + System.out.println("Reply Destination: " + replyDest); + + // Create the producer + producer = session.createProducer(destination); + if (persistent) { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } else { + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + } + if (timeToLive != 0) { + System.out.println("Messages time to live " + timeToLive + " ms"); + producer.setTimeToLive(timeToLive); + } + + // Create the reply consumer + consumer = session.createConsumer(replyDest); + + // Start sending reqests. + requestLoop(); + + System.out.println("Done."); + + // Use the ActiveMQConnection interface to dump the connection + // stats. + ActiveMQConnection c = (ActiveMQConnection)connection; + c.getConnectionStats().dump(new IndentPrinter()); + + } catch (Exception e) { + System.out.println("Caught: " + e); + e.printStackTrace(); + } finally { + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected void requestLoop() throws Exception { + + for (int i = 0; i < messageCount || messageCount == 0; i++) { + + TextMessage message = session.createTextMessage(createMessageText(i)); + message.setJMSReplyTo(replyDest); + + if (verbose) { + String msg = message.getText(); + if (msg.length() > 50) { + msg = msg.substring(0, 50) + "..."; + } + System.out.println("Sending message: " + msg); + } + + producer.send(message); + if (transacted) { + session.commit(); + } + + System.out.println("Waiting for reponse message..."); + Message message2 = consumer.receive(); + if (message2 instanceof TextMessage) { + System.out.println("Reponse message: " + ((TextMessage)message2).getText()); + } else { + System.out.println("Reponse message: " + message2); + } + if (transacted) { + session.commit(); + } + + Thread.sleep(sleepTime); + + } + } + + /** + * @param i + * @return + */ + private String createMessageText(int index) { + StringBuffer buffer = new StringBuffer(messageSize); + buffer.append("Message: " + index + " sent at: " + new Date()); + if (buffer.length() > messageSize) { + return buffer.substring(0, messageSize); + } + for (int i = buffer.length(); i < messageSize; i++) { + buffer.append(' '); + } + return buffer.toString(); + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public void setPersistent(boolean durable) { + this.persistent = durable; + } + + public void setMessageCount(int messageCount) { + this.messageCount = messageCount; + } + + public void setMessageSize(int messageSize) { + this.messageSize = messageSize; + } + + public void setPassword(String password) { + this.password = password; + } + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + + public void setSubject(String subject) { + this.subject = subject; + } + + public void setTimeToLive(long timeToLive) { + this.timeToLive = timeToLive; + } + + public void setTopic(boolean topic) { + this.topic = topic; + } + + public void setQueue(boolean queue) { + this.topic = !queue; + } + + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + + public void setUrl(String url) { + this.url = url; + } + + public void setUser(String user) { + this.user = user; + } + + public void setVerbose(boolean verbose) { + this.verbose = verbose; + } + + public void setReplySubject(String replySubject) { + this.replySubject = replySubject; + } +}