activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5558 - some more options for producer/consumer tools
Date Mon, 23 Mar 2015 13:46:40 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 85d9d4e94 -> df3ff9c65


https://issues.apache.org/jira/browse/AMQ-5558 - some more options for producer/consumer tools


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/df3ff9c6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/df3ff9c6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/df3ff9c6

Branch: refs/heads/master
Commit: df3ff9c65e0567dc6bc18de769c47e65623ccb71
Parents: 85d9d4e
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Mon Mar 23 14:46:14 2015 +0100
Committer: Dejan Bosanac <dejan@nighttale.net>
Committed: Mon Mar 23 14:46:35 2015 +0100

----------------------------------------------------------------------
 .../apache/activemq/util/ConsumerThread.java    | 15 ++++
 .../apache/activemq/util/ProducerThread.java    | 84 ++++++++++++++------
 .../console/command/ConsumerCommand.java        | 11 ++-
 .../console/command/ProducerCommand.java        | 22 ++++-
 .../activemq/console/command/consumer.txt       |  3 +-
 .../activemq/console/command/producer.txt       |  2 +
 6 files changed, 109 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/df3ff9c6/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
index 402b2a5..86bcadb 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
@@ -38,6 +38,7 @@ public class ConsumerThread extends Thread {
     int transactions = 0;
     boolean running = false;
     CountDownLatch finished;
+    boolean bytesAsText;
 
     public ConsumerThread(Session session, Destination destination) {
         this.destination = destination;
@@ -56,6 +57,12 @@ public class ConsumerThread extends Thread {
                 Message msg = consumer.receive(receiveTimeOut);
                 if (msg != null) {
                     LOG.info(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage)
msg).getText() : msg.getJMSMessageID()));
+                    if (bytesAsText && (msg instanceof BytesMessage)) {
+                        long length = ((BytesMessage) msg).getBodyLength();
+                        byte[] bytes = new byte[(int) length];
+                        ((BytesMessage) msg).readBytes(bytes);
+                        LOG.info("BytesMessage as text string: " + new String(bytes));
+                    }
                     received++;
                 } else {
                     if (breakOnNull) {
@@ -151,4 +158,12 @@ public class ConsumerThread extends Thread {
     public void setFinished(CountDownLatch finished) {
         this.finished = finished;
     }
+
+    public boolean isBytesAsText() {
+        return bytesAsText;
+    }
+
+    public void setBytesAsText(boolean bytesAsText) {
+        this.bytesAsText = bytesAsText;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/df3ff9c6/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
index ad44259..638f60b 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
@@ -20,10 +20,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
-import java.io.File;
-import java.io.FileReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
+import java.io.*;
 import java.net.URL;
 import java.util.concurrent.CountDownLatch;
 
@@ -44,6 +41,9 @@ public class ProducerThread extends Thread {
 
     int transactions = 0;
     int sentCount = 0;
+    String message;
+    String messageText = null;
+    String url = null;
     byte[] payload = null;
     boolean running = false;
     CountDownLatch finished;
@@ -114,35 +114,55 @@ public class ProducerThread extends Thread {
     }
 
     protected Message createMessage(int i) throws Exception {
-        Message message = null;
+        Message answer;
         if (payload != null) {
-            message = session.createBytesMessage();
-            ((BytesMessage)message).writeBytes(payload);
+            answer = session.createBytesMessage();
+            ((BytesMessage) answer).writeBytes(payload);
         } else {
             if (textMessageSize > 0) {
-                InputStreamReader reader = null;
-                try {
-                    InputStream is = getClass().getResourceAsStream("demo.txt");
-                    reader = new InputStreamReader(is);
-                    char[] chars = new char[textMessageSize];
-                    reader.read(chars);
-                    message = session.createTextMessage(String.valueOf(chars));
-                } catch (Exception e) {
-                    LOG.warn(Thread.currentThread().getName() + " Failed to load " + textMessageSize
+ " bytes of demo text. Using default text message instead");
-                    message = session.createTextMessage("test message: " + i);
-                } finally {
-                    if (reader != null) {
-                        reader.close();
-                    }
+                if (messageText == null) {
+                    messageText = readInputStream(getClass().getResourceAsStream("demo.txt"),
textMessageSize, i);
                 }
+            } else if (url != null) {
+                messageText = readInputStream(new URL(url).openStream(), -1, i);
+            } else if (message != null) {
+                messageText = message;
             } else {
-                message = session.createTextMessage("test message: " + i);
+                messageText = createDefaultMessage(i);
             }
+            answer = session.createTextMessage(messageText);
         }
         if ((msgGroupID != null) && (!msgGroupID.isEmpty())) {
-            message.setStringProperty("JMSXGroupID", msgGroupID);
+            answer.setStringProperty("JMSXGroupID", msgGroupID);
         }
-        return message;
+        return answer;
+    }
+
+    private String readInputStream(InputStream is, int size, int messageNumber) throws IOException
{
+        InputStreamReader reader = new InputStreamReader(is);
+        try {
+            char[] buffer;
+            if (size > 0) {
+                buffer = new char[size];
+            } else {
+                buffer = new char[1024];
+            }
+            int count;
+            StringBuilder builder = new StringBuilder();
+            while ((count = reader.read(buffer)) != -1) {
+                builder.append(buffer, 0, count);
+                if (size > 0) break;
+            }
+            return builder.toString();
+        } catch (IOException ioe) {
+            return createDefaultMessage(messageNumber);
+        } finally {
+            reader.close();
+        }
+    }
+
+    private String createDefaultMessage(int messageNumber) {
+        return "test message: " + messageNumber;
     }
 
     public void setMessageCount(int messageCount) {
@@ -228,4 +248,20 @@ public class ProducerThread extends Thread {
     public void setFinished(CountDownLatch finished) {
         this.finished = finished;
     }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/df3ff9c6/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
index 3998610..58f37b8 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
@@ -39,6 +39,7 @@ public class ConsumerCommand extends AbstractCommand {
     int sleep;
     int transactionBatchSize;
     int parallelThreads = 1;
+    boolean bytesAsText;
 
     @Override
     protected void runTask(List<String> tokens) throws Exception {
@@ -71,6 +72,7 @@ public class ConsumerCommand extends AbstractCommand {
                 consumer.setSleep(sleep);
                 consumer.setTransactionBatchSize(transactionBatchSize);
                 consumer.setFinished(active);
+                consumer.setBytesAsText(bytesAsText);
                 consumer.start();
             }
 
@@ -146,6 +148,14 @@ public class ConsumerCommand extends AbstractCommand {
         this.parallelThreads = parallelThreads;
     }
 
+    public boolean isBytesAsText() {
+        return bytesAsText;
+    }
+
+    public void setBytesAsText(boolean bytesAsText) {
+        this.bytesAsText = bytesAsText;
+    }
+
     @Override
     protected void printHelp() {
         printHelpFromFile();
@@ -160,5 +170,4 @@ public class ConsumerCommand extends AbstractCommand {
     public String getOneLineDescription() {
         return "Receives messages from the broker";
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/df3ff9c6/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
index 5f3ac92..e4c53fe 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
@@ -18,14 +18,12 @@ package org.apache.activemq.console.command;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.ProducerThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
 import javax.jms.Session;
-import java.io.*;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
@@ -39,6 +37,8 @@ public class ProducerCommand extends AbstractCommand {
     int messageCount = 1000;
     int sleep = 0;
     boolean persistent = true;
+    String message = null;
+    String url = null;
     int messageSize = 0;
     int textMessageSize;
     long msgTTL = 0L;
@@ -77,6 +77,8 @@ public class ProducerCommand extends AbstractCommand {
                 producer.setMsgTTL(msgTTL);
                 producer.setPersistent(persistent);
                 producer.setTransactionBatchSize(transactionBatchSize);
+                producer.setMessage(message);
+                producer.setUrl(url);
                 producer.setMessageSize(messageSize);
                 producer.setMsgGroupID(msgGroupID);
                 producer.setTextMessageSize(textMessageSize);
@@ -196,6 +198,22 @@ public class ProducerCommand extends AbstractCommand {
         this.parallelThreads = parallelThreads;
     }
 
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
     @Override
     protected void printHelp() {
         printHelpFromFile();

http://git-wip-us.apache.org/repos/asf/activemq/blob/df3ff9c6/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
index bc6f4df..986a771 100644
--- a/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
+++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
@@ -8,4 +8,5 @@ Options :
     [--messageCount                                  N] - number of messages to send; default
1000
     [--sleep                                         N] - millisecond sleep period between
sends or receives; default 0
     [--transactionBatchSize                          N] - use send transaction batches of
size N; default 0, no jms transactions
-    [--parallelThreads                               N] - number of threads to run in parallel;
default 1
\ No newline at end of file
+    [--parallelThreads                               N] - number of threads to run in parallel;
default 1
+    [--bytesAsText                          true|false] - try to treat a BytesMessage as
a text string
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/df3ff9c6/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
index 0e183fb..65f437a 100644
--- a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
+++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
@@ -13,4 +13,6 @@ Options :
     [--msgTTL                                        N] - message TTL in milliseconds
     [--messageSize                                   N] - size in bytes of a BytesMessage;
default 0, a simple TextMessage is used
     [--textMessageSize                               N] - size in bytes of a TextMessage,
a Lorem ipsum demo TextMessage is used
+    [--message                                      ..] - a text string to use as the message
body
+    [--url                                         URL] - a url pointing to a document to
use as the message body
     [--msgGroupID                                   ..] - JMS message group identifier
\ No newline at end of file


Mime
View raw message