activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5558 - add producer/consumer commands
Date Thu, 05 Feb 2015 12:58:43 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk b0a1bd833 -> 9f0ab46e2


https://issues.apache.org/jira/browse/AMQ-5558 - add producer/consumer commands


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9f0ab46e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9f0ab46e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9f0ab46e

Branch: refs/heads/trunk
Commit: 9f0ab46e293e6d31369f06f6669cd3d63db906fa
Parents: b0a1bd8
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Thu Feb 5 13:58:22 2015 +0100
Committer: Dejan Bosanac <dejan@nighttale.net>
Committed: Thu Feb 5 13:58:31 2015 +0100

----------------------------------------------------------------------
 .../apache/activemq/util/ConsumerThread.java    | 154 +++++++++++++
 .../apache/activemq/util/ProducerThread.java    | 231 +++++++++++++++++++
 .../resources/org/apache/activemq/util/demo.txt |  15 ++
 .../console/command/AbstractCommand.java        |  31 +++
 .../console/command/ConsumerCommand.java        | 157 +++++++++++++
 .../console/command/ProducerCommand.java        | 206 +++++++++++++++++
 .../org.apache.activemq.console.command.Command |   2 +
 .../activemq/console/command/consumer.txt       |  11 +
 .../activemq/console/command/producer.txt       |  16 ++
 .../org/apache/activemq/bugs/AMQ3120Test.java   |   2 +-
 .../org/apache/activemq/bugs/AMQ4323Test.java   |   2 +-
 .../activemq/usecases/MemoryLimitTest.java      |   6 +-
 .../apache/activemq/util/ConsumerThread.java    |  80 -------
 .../apache/activemq/util/ProducerThread.java    |  82 -------
 14 files changed, 828 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
new file mode 100644
index 0000000..402b2a5
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+
+public class ConsumerThread extends Thread {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConsumerThread.class);
+
+    int messageCount = 1000;
+    int receiveTimeOut = 3000;
+    Destination destination;
+    Session session;
+    boolean breakOnNull = true;
+    int sleep;
+    int transactionBatchSize;
+
+    int received = 0;
+    int transactions = 0;
+    boolean running = false;
+    CountDownLatch finished;
+
+    public ConsumerThread(Session session, Destination destination) {
+        this.destination = destination;
+        this.session = session;
+    }
+
+    @Override
+    public void run() {
+        running = true;
+        MessageConsumer consumer = null;
+        String threadName = Thread.currentThread().getName();
+        LOG.info(threadName + " wait until " + messageCount + " messages are consumed");
+        try {
+            consumer = session.createConsumer(destination);
+            while (running && received < messageCount) {
+                Message msg = consumer.receive(receiveTimeOut);
+                if (msg != null) {
+                    LOG.info(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
+                    received++;
+                } else {
+                    if (breakOnNull) {
+                        break;
+                    }
+                }
+
+                if (transactionBatchSize > 0 && received > 0 && received % transactionBatchSize == 0) {
+                    LOG.info(threadName + " Committing transaction: " + transactions++);
+                    session.commit();
+                }
+
+                if (sleep > 0) {
+                    Thread.sleep(sleep);
+                }
+
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (finished != null) {
+                finished.countDown();
+            }
+            if (consumer != null) {
+                LOG.info(threadName + " Consumed: " + this.getReceived() + " messages");
+                try {
+                    consumer.close();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        LOG.info(threadName + " Consumer thread finished");
+    }
+
+    public int getReceived() {
+        return received;
+    }
+
+    public void setMessageCount(int messageCount) {
+        this.messageCount = messageCount;
+    }
+
+    public void setBreakOnNull(boolean breakOnNull) {
+        this.breakOnNull = breakOnNull;
+    }
+
+    public int getTransactionBatchSize() {
+        return transactionBatchSize;
+    }
+
+    public void setTransactionBatchSize(int transactionBatchSize) {
+        this.transactionBatchSize = transactionBatchSize;
+    }
+
+    public int getMessageCount() {
+        return messageCount;
+    }
+
+    public boolean isBreakOnNull() {
+        return breakOnNull;
+    }
+
+    public int getReceiveTimeOut() {
+        return receiveTimeOut;
+    }
+
+    public void setReceiveTimeOut(int receiveTimeOut) {
+        this.receiveTimeOut = receiveTimeOut;
+    }
+
+    public boolean isRunning() {
+        return running;
+    }
+
+    public void setRunning(boolean running) {
+        this.running = running;
+    }
+
+    public int getSleep() {
+        return sleep;
+    }
+
+    public void setSleep(int sleep) {
+        this.sleep = sleep;
+    }
+
+    public CountDownLatch getFinished() {
+        return finished;
+    }
+
+    public void setFinished(CountDownLatch finished) {
+        this.finished = finished;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
new file mode 100644
index 0000000..ad44259
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.File;
+import java.io.FileReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+
+public class ProducerThread extends Thread {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class);
+
+    int messageCount = 1000;
+    Destination destination;
+    protected Session session;
+    int sleep = 0;
+    boolean persistent = true;
+    int messageSize = 0;
+    int textMessageSize;
+    long msgTTL = 0L;
+    String msgGroupID=null;
+    int transactionBatchSize;
+
+    int transactions = 0;
+    int sentCount = 0;
+    byte[] payload = null;
+    boolean running = false;
+    CountDownLatch finished;
+
+
+    public ProducerThread(Session session, Destination destination) {
+        this.destination = destination;
+        this.session = session;
+    }
+
+    public void run() {
+        MessageProducer producer = null;
+        String threadName = Thread.currentThread().getName();
+        try {
+            producer = session.createProducer(destination);
+            producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+            producer.setTimeToLive(msgTTL);
+            initPayLoad();
+            running = true;
+
+            LOG.info(threadName +  " Started to calculate elapsed time ...\n");
+            long tStart = System.currentTimeMillis();
+
+            for (sentCount = 0; sentCount < messageCount && running; sentCount++) {
+                Message message = createMessage(sentCount);
+                producer.send(message);
+                LOG.info(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
+
+                if (transactionBatchSize > 0 && sentCount > 0 && sentCount % transactionBatchSize == 0) {
+                    LOG.info(threadName + " Committing transaction: " + transactions++);
+                    session.commit();
+                }
+
+                if (sleep > 0) {
+                    Thread.sleep(sleep);
+                }
+            }
+
+            LOG.info(threadName + " Produced: " + this.getSentCount() + " messages");
+            long tEnd = System.currentTimeMillis();
+            long elapsed = (tEnd - tStart) / 1000;
+            LOG.info(threadName + " Elapsed time in second : " + elapsed + " s");
+            LOG.info(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds");
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (finished != null) {
+                finished.countDown();
+            }
+            if (producer != null) {
+                try {
+                    producer.close();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void initPayLoad() {
+        if (messageSize > 0) {
+            payload = new byte[messageSize];
+            for (int i = 0; i < payload.length; i++) {
+                payload[i] = '.';
+            }
+        }
+    }
+
+    protected Message createMessage(int i) throws Exception {
+        Message message = null;
+        if (payload != null) {
+            message = session.createBytesMessage();
+            ((BytesMessage)message).writeBytes(payload);
+        } else {
+            if (textMessageSize > 0) {
+                InputStreamReader reader = null;
+                try {
+                    InputStream is = getClass().getResourceAsStream("demo.txt");
+                    reader = new InputStreamReader(is);
+                    char[] chars = new char[textMessageSize];
+                    reader.read(chars);
+                    message = session.createTextMessage(String.valueOf(chars));
+                } catch (Exception e) {
+                    LOG.warn(Thread.currentThread().getName() + " Failed to load " + textMessageSize + " bytes of demo text. Using default text message instead");
+                    message = session.createTextMessage("test message: " + i);
+                } finally {
+                    if (reader != null) {
+                        reader.close();
+                    }
+                }
+            } else {
+                message = session.createTextMessage("test message: " + i);
+            }
+        }
+        if ((msgGroupID != null) && (!msgGroupID.isEmpty())) {
+            message.setStringProperty("JMSXGroupID", msgGroupID);
+        }
+        return message;
+    }
+
+    public void setMessageCount(int messageCount) {
+        this.messageCount = messageCount;
+    }
+
+    public int getSleep() {
+        return sleep;
+    }
+
+    public void setSleep(int sleep) {
+        this.sleep = sleep;
+    }
+
+    public int getMessageCount() {
+        return messageCount;
+    }
+
+    public int getSentCount() {
+        return sentCount;
+    }
+
+    public boolean isPersistent() {
+        return persistent;
+    }
+
+    public void setPersistent(boolean persistent) {
+        this.persistent = persistent;
+    }
+
+    public boolean isRunning() {
+        return running;
+    }
+
+    public void setRunning(boolean running) {
+        this.running = running;
+    }
+
+    public long getMsgTTL() {
+        return msgTTL;
+    }
+
+    public void setMsgTTL(long msgTTL) {
+        this.msgTTL = msgTTL;
+    }
+
+    public int getTransactionBatchSize() {
+        return transactionBatchSize;
+    }
+
+    public void setTransactionBatchSize(int transactionBatchSize) {
+        this.transactionBatchSize = transactionBatchSize;
+    }
+
+    public String getMsgGroupID() {
+        return msgGroupID;
+    }
+
+    public void setMsgGroupID(String msgGroupID) {
+        this.msgGroupID = msgGroupID;
+    }
+
+    public int getTextMessageSize() {
+        return textMessageSize;
+    }
+
+    public void setTextMessageSize(int textMessageSize) {
+        this.textMessageSize = textMessageSize;
+    }
+
+    public int getMessageSize() {
+        return messageSize;
+    }
+
+    public void setMessageSize(int messageSize) {
+        this.messageSize = messageSize;
+    }
+
+    public CountDownLatch getFinished() {
+        return finished;
+    }
+
+    public void setFinished(CountDownLatch finished) {
+        this.finished = finished;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-client/src/main/resources/org/apache/activemq/util/demo.txt
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/resources/org/apache/activemq/util/demo.txt b/activemq-client/src/main/resources/org/apache/activemq/util/demo.txt
new file mode 100644
index 0000000..4a6002e
--- /dev/null
+++ b/activemq-client/src/main/resources/org/apache/activemq/util/demo.txt
@@ -0,0 +1,15 @@
+Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
+Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
+Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis.
+At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat.
+Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos 
 et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
+Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
+Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis.
+At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat.
+Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos 
 et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
+Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
+Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-console/src/main/java/org/apache/activemq/console/command/AbstractCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/AbstractCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/AbstractCommand.java
index de63347..057f9f3 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/AbstractCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/AbstractCommand.java
@@ -16,10 +16,15 @@
  */
 package org.apache.activemq.console.command;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.List;
 
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.console.CommandContext;
+import org.apache.activemq.util.IntrospectionSupport;
 
 public abstract class AbstractCommand implements Command {
     public static final String COMMAND_OPTION_DELIMETER = ",";
@@ -110,6 +115,14 @@ public abstract class AbstractCommand implements Command {
             }
             System.setProperty(key, value);
         } else {
+            if (token.startsWith("--")) {
+                String prop = token.substring(2);
+                if (tokens.isEmpty() || tokens.get(0).startsWith("-")) {
+                    context.print("Property '" + prop + "' is not specified!");
+                } else if (IntrospectionSupport.setProperty(this, prop, tokens.remove(0))) {
+                    return;
+                }
+            }
             // Token is unrecognized
             context.printInfo("Unrecognized option: " + token);
             isPrintHelp = true;
@@ -128,4 +141,22 @@ public abstract class AbstractCommand implements Command {
      * Print the help messages for the specific task
      */
     protected abstract void printHelp();
+
+    protected void printHelpFromFile() {
+        BufferedReader reader = null;
+        try {
+            InputStream is = getClass().getResourceAsStream(getName() + ".txt");
+            reader = new BufferedReader(new InputStreamReader(is));
+            String line;
+            while ((line = reader.readLine()) != null) {
+                context.print(line);
+            }
+        } catch (Exception e) {} finally {
+            if (reader != null) {
+                try {
+                    reader.close();
+                } catch (IOException e) {}
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
new file mode 100644
index 0000000..962b6ad
--- /dev/null
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.console.command;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.ConsumerThread;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class ConsumerCommand extends AbstractCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(ConsumerCommand.class);
+
+    String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
+    String user = ActiveMQConnectionFactory.DEFAULT_USER;
+    String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
+    String destination = "queue://TEST";
+    int messageCount = 1000;
+    int sleep;
+    int transactionBatchSize;
+    int parallelThreads = 1;
+
+    @Override
+    protected void runTask(List<String> tokens) throws Exception {
+        LOG.info("Connecting to URL: " + brokerUrl + " (" + user + ":" + password + ")");
+        LOG.info("Consuming " + destination);
+        LOG.info("Sleeping between receives " + sleep + " ms");
+        LOG.info("Running " + parallelThreads + " parallel threads");
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
+        Connection conn = factory.createConnection(user, password);
+        conn.start();
+
+        Session sess;
+        if (transactionBatchSize != 0) {
+            sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+        } else {
+            sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+
+
+        CountDownLatch active = new CountDownLatch(parallelThreads);
+
+        for (int i = 1; i <= parallelThreads; i++) {
+            ConsumerThread consumer = new ConsumerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
+            consumer.setName("consumer-" + i);
+            consumer.setBreakOnNull(false);
+            consumer.setMessageCount(messageCount);
+            consumer.setSleep(sleep);
+            consumer.setTransactionBatchSize(transactionBatchSize);
+            consumer.setFinished(active);
+            consumer.start();
+        }
+
+        active.await();
+    }
+
+    public String getBrokerUrl() {
+        return brokerUrl;
+    }
+
+    public void setBrokerUrl(String brokerUrl) {
+        this.brokerUrl = brokerUrl;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public int getMessageCount() {
+        return messageCount;
+    }
+
+    public void setMessageCount(int messageCount) {
+        this.messageCount = messageCount;
+    }
+
+    public int getSleep() {
+        return sleep;
+    }
+
+    public void setSleep(int sleep) {
+        this.sleep = sleep;
+    }
+
+    public int getTransactionBatchSize() {
+        return transactionBatchSize;
+    }
+
+    public void setTransactionBatchSize(int transactionBatchSize) {
+        this.transactionBatchSize = transactionBatchSize;
+    }
+
+    public int getParallelThreads() {
+        return parallelThreads;
+    }
+
+    public void setParallelThreads(int parallelThreads) {
+        this.parallelThreads = parallelThreads;
+    }
+
+    @Override
+    protected void printHelp() {
+        printHelpFromFile();
+    }
+
+    @Override
+    public String getName() {
+        return "consumer";
+    }
+
+    @Override
+    public String getOneLineDescription() {
+        return "Receives messages from the broker";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
new file mode 100644
index 0000000..ba696bb
--- /dev/null
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.console.command;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.ProducerThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import java.io.*;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class ProducerCommand extends AbstractCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(ProducerCommand.class);
+
+    String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
+    String user = ActiveMQConnectionFactory.DEFAULT_USER;
+    String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
+    String destination = "queue://TEST";
+    int messageCount = 1000;
+    int sleep = 0;
+    boolean persistent = true;
+    int messageSize = 0;
+    int textMessageSize;
+    long msgTTL = 0L;
+    String msgGroupID=null;
+    int transactionBatchSize;
+    private int parallelThreads = 1;
+
+    @Override
+    protected void runTask(List<String> tokens) throws Exception {
+        LOG.info("Connecting to URL: " + brokerUrl + " (" + user + ":" + password + ")");
+        LOG.info("Producing messages to " + destination);
+        LOG.info("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
+        LOG.info("Sleeping between sends " + sleep + " ms");
+        LOG.info("Running " + parallelThreads + " parallel threads");
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
+        Connection conn = factory.createConnection(user, password);
+        conn.start();
+
+        Session sess;
+        if (transactionBatchSize != 0) {
+            sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+        } else {
+            sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+
+        CountDownLatch active = new CountDownLatch(parallelThreads);
+
+        for (int i = 1; i <= parallelThreads; i++) {
+            ProducerThread producer = new ProducerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
+            producer.setName("producer-" + i);
+            producer.setMessageCount(messageCount);
+            producer.setSleep(sleep);
+            producer.setMsgTTL(msgTTL);
+            producer.setPersistent(persistent);
+            producer.setTransactionBatchSize(transactionBatchSize);
+            producer.setMessageSize(messageSize);
+            producer.setMsgGroupID(msgGroupID);
+            producer.setTextMessageSize(textMessageSize);
+            producer.setFinished(active);
+            producer.start();
+        }
+
+        active.await();
+    }
+
+    public String getBrokerUrl() {
+        return brokerUrl;
+    }
+
+    public void setBrokerUrl(String brokerUrl) {
+        this.brokerUrl = brokerUrl;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public int getMessageCount() {
+        return messageCount;
+    }
+
+    public void setMessageCount(int messageCount) {
+        this.messageCount = messageCount;
+    }
+
+    public int getSleep() {
+        return sleep;
+    }
+
+    public void setSleep(int sleep) {
+        this.sleep = sleep;
+    }
+
+    public boolean isPersistent() {
+        return persistent;
+    }
+
+    public void setPersistent(boolean persistent) {
+        this.persistent = persistent;
+    }
+
+    public int getMessageSize() {
+        return messageSize;
+    }
+
+    public void setMessageSize(int messageSize) {
+        this.messageSize = messageSize;
+    }
+
+    public int getTextMessageSize() {
+        return textMessageSize;
+    }
+
+    public void setTextMessageSize(int textMessageSize) {
+        this.textMessageSize = textMessageSize;
+    }
+
+    public long getMsgTTL() {
+        return msgTTL;
+    }
+
+    public void setMsgTTL(long msgTTL) {
+        this.msgTTL = msgTTL;
+    }
+
+    public String getMsgGroupID() {
+        return msgGroupID;
+    }
+
+    public void setMsgGroupID(String msgGroupID) {
+        this.msgGroupID = msgGroupID;
+    }
+
+    public int getTransactionBatchSize() {
+        return transactionBatchSize;
+    }
+
+    public void setTransactionBatchSize(int transactionBatchSize) {
+        this.transactionBatchSize = transactionBatchSize;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public int getParallelThreads() {
+        return parallelThreads;
+    }
+
+    public void setParallelThreads(int parallelThreads) {
+        this.parallelThreads = parallelThreads;
+    }
+
+    @Override
+    protected void printHelp() {
+        printHelpFromFile();
+    }
+
+    @Override
+    public String getName() {
+        return "producer";
+    }
+
+    @Override
+    public String getOneLineDescription() {
+        return "Sends messages to the broker";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command b/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command
index 64749c6..bb7ddb6 100644
--- a/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command
+++ b/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command
@@ -26,3 +26,5 @@ org.apache.activemq.console.command.EncryptCommand
 org.apache.activemq.console.command.DecryptCommand
 org.apache.activemq.console.command.StoreExportCommand
 org.apache.activemq.console.command.PurgeCommand
+org.apache.activemq.console.command.ProducerCommand
+org.apache.activemq.console.command.ConsumerCommand

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
new file mode 100644
index 0000000..a834ca1
--- /dev/null
+++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
@@ -0,0 +1,11 @@
+Usage: consumer [OPTIONS]
+Description: Demo consumer that can be used to receive messages to the broker
+Options :
+    [--brokerUrl                                   URL] - connection factory url; default " + ActiveMQConnectionFactory.DEFAULT_BROKER_URL
+    [--user                                         ..] - connection user name
+    [--password                                     ..] - connection password
+    [--destination               queue://..|topic://..] - ; default TEST
+    [--messageCount                                  N] - number of messages to send; default 1000
+    [--sleep                                         N] - millisecond sleep period between sends or receives; default 0
+    [--transactionBatchSize                          N] - use send transaction batches of size N; default 0, no jms transactions
+    [--parallelThreads                               N] - number of threads to run in parallel; default 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
new file mode 100644
index 0000000..3cf5550
--- /dev/null
+++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
@@ -0,0 +1,16 @@
+Usage: producer [OPTIONS]
+Description: Demo producer that can be used to send messages to the broker
+Options :
+    [--brokerUrl                                   URL] - connection factory url; default " + ActiveMQConnectionFactory.DEFAULT_BROKER_URL
+    [--user                                         ..] - connection user name
+    [--password                                     ..] - connection password
+    [--destination               queue://..|topic://..] - ; default TEST
+    [--persistent                           true|false] - use persistent or non persistent messages; default true
+    [--messageCount                                  N] - number of messages to send; default 1000
+    [--sleep                                         N] - millisecond sleep period between sends or receives; default 0
+    [--transactionBatchSize                          N] - use send transaction batches of size N; default 0, no jms transactions
+    [--parallelThreads                               N] - number of threads to run in parallel; default 1
+    [--msgTTL                                        N] - message TTL in milliseconds
+    [--messageSize                                   N] - size in bytes of a BytesMessage; default 0, a simple TextMessage is used
+    [--textMessageSize                               N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used
+    [--msgGroupID                                   ..] - JMS message group identifier
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
index bfff0fd..6494efe 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
@@ -123,7 +123,7 @@ public class AMQ3120Test {
         ProducerThread producer = new ProducerThread(producerSess, destination) {
             @Override
             protected Message createMessage(int i) throws Exception {
-                return sess.createTextMessage(payload + "::" + i);
+                return session.createTextMessage(payload + "::" + i);
             }
         };
         producer.setSleep(650);

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
index 8e6a96f..e965731 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
@@ -127,7 +127,7 @@ public class AMQ4323Test {
         ProducerThread producer = new ProducerThread(producerSess, destination) {
             @Override
             protected Message createMessage(int i) throws Exception {
-                return sess.createTextMessage(payload + "::" + i);
+                return session.createTextMessage(payload + "::" + i);
             }
         };
         producer.setMessageCount(messageCount);

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
index 49026bd..c481172 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
@@ -105,7 +105,7 @@ public class MemoryLimitTest extends TestSupport {
         final ProducerThread producer = new ProducerThread(sess, queue) {
             @Override
             protected Message createMessage(int i) throws Exception {
-                BytesMessage bytesMessage = sess.createBytesMessage();
+                BytesMessage bytesMessage = session.createBytesMessage();
                 bytesMessage.writeBytes(payload);
                 return bytesMessage;
             }
@@ -168,7 +168,7 @@ public class MemoryLimitTest extends TestSupport {
         final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) {
             @Override
             protected Message createMessage(int i) throws Exception {
-                return sess.createTextMessage(payload + "::" + i);
+                return session.createTextMessage(payload + "::" + i);
             }
         };
         producer.setMessageCount(1000);
@@ -176,7 +176,7 @@ public class MemoryLimitTest extends TestSupport {
         final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) {
             @Override
             protected Message createMessage(int i) throws Exception {
-                return sess.createTextMessage(payload + "::" + i);
+                return session.createTextMessage(payload + "::" + i);
             }
         };
         producer2.setMessageCount(1000);

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java b/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java
deleted file mode 100644
index 6b4bad2..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-
-public class ConsumerThread extends Thread {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ConsumerThread.class);
-
-    int messageCount = 1000;
-    int received = 0;
-    Destination dest;
-    Session sess;
-    boolean breakOnNull = true;
-
-    public ConsumerThread(Session sess, Destination dest) {
-        this.dest = dest;
-        this.sess = sess;
-    }
-
-    @Override
-    public void run() {
-      MessageConsumer consumer = null;
-
-        try {
-            consumer = sess.createConsumer(dest);
-            while (received < messageCount) {
-                Message msg = consumer.receive(3000);
-                if (msg != null) {
-                    LOG.info("Received " + received + ": " + ((TextMessage)msg).getText());
-                    received++;
-                } else {
-                    if (breakOnNull) {
-                        break;
-                    }
-                }
-            }
-        } catch (JMSException e) {
-            e.printStackTrace();
-        } finally {
-            if (consumer != null) {
-                try {
-                    consumer.close();
-                } catch (JMSException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    public int getReceived() {
-        return received;
-    }
-
-    public void setMessageCount(int messageCount) {
-        this.messageCount = messageCount;
-    }
-
-    public void setBreakOnNull(boolean breakOnNull) {
-        this.breakOnNull = breakOnNull;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-unit-tests/src/test/java/org/apache/activemq/util/ProducerThread.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/util/ProducerThread.java b/activemq-unit-tests/src/test/java/org/apache/activemq/util/ProducerThread.java
deleted file mode 100644
index c7cf90d..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/util/ProducerThread.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-
-public class ProducerThread extends Thread {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class);
-
-    int messageCount = 1000;
-    Destination dest;
-    protected Session sess;
-    int sleep = 0;
-    int sentCount = 0;
-
-    public ProducerThread(Session sess, Destination dest) {
-        this.dest = dest;
-        this.sess = sess;
-    }
-
-    public void run() {
-        MessageProducer producer = null;
-        try {
-            producer = sess.createProducer(dest);
-            for (sentCount = 0; sentCount < messageCount; sentCount++) {
-                producer.send(createMessage(sentCount));
-                LOG.info("Sent 'test message: " + sentCount + "'");
-                if (sleep > 0) {
-                    Thread.sleep(sleep);
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            if (producer != null) {
-                try {
-                    producer.close();
-                } catch (JMSException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    protected Message createMessage(int i) throws Exception {
-        return sess.createTextMessage("test message: " + i);
-    }
-
-    public void setMessageCount(int messageCount) {
-        this.messageCount = messageCount;
-    }
-
-    public void setSleep(int sleep) {
-        this.sleep = sleep;
-    }
-
-    public int getMessageCount() {
-        return messageCount;
-    }
-
-    public int getSentCount() {
-        return sentCount;
-    }
-}


Mime
View raw message