activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/3] activemq-artemis git commit: ARTEMIS-1840 Added FQQN Import/Export Live Broker
Date Wed, 02 May 2018 16:10:07 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 13fac8608 -> ab9f5128b


ARTEMIS-1840 Added FQQN Import/Export Live Broker


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/64ce26e7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/64ce26e7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/64ce26e7

Branch: refs/heads/master
Commit: 64ce26e7cc06c0e5779430a8126128d11da561bd
Parents: 812776f
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Wed May 2 11:35:17 2018 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed May 2 12:09:50 2018 -0400

----------------------------------------------------------------------
 .../artemis/cli/commands/messages/Consumer.java |  57 ++-
 .../cli/commands/messages/ConsumerThread.java   |  73 ++--
 .../cli/commands/messages/DestAbstract.java     |  96 +++++
 .../artemis/cli/commands/messages/Producer.java | 121 +++++--
 .../cli/commands/messages/ProducerThread.java   |  11 +
 .../factory/serialize/MessageSerializer.java    |  37 ++
 .../factory/serialize/XMLMessageSerializer.java | 118 ++++++
 .../cli/test/MessageSerializerTest.java         | 362 +++++++++++++++++++
 8 files changed, 823 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
index ee15a66..856e82b 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
@@ -20,11 +20,16 @@ package org.apache.activemq.artemis.cli.commands.messages;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageListener;
 import javax.jms.Session;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
 
 import io.airlift.airline.Command;
 import io.airlift.airline.Option;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
 
 @Command(name = "consumer", description = "It will consume messages from an instance")
 public class Consumer extends DestAbstract {
@@ -41,6 +46,9 @@ public class Consumer extends DestAbstract {
    @Option(name = "--filter", description = "filter to be used with the consumer")
    String filter;
 
+   @Option(name = "--data", description = "serialize the messages to the specified file as
they are consumed")
+   String file;
+
    @Override
    public Object execute(ActionContext context) throws Exception {
       super.execute(context);
@@ -49,7 +57,34 @@ public class Consumer extends DestAbstract {
 
       ConnectionFactory factory = createConnectionFactory();
 
+      SerialiserMessageListener listener = null;
+      MessageSerializer messageSerializer = null;
+      if (file != null) {
+         try {
+            String className = serializer == null ? DEFAULT_MESSAGE_SERIALIZER : serializer;
+            if (className.equals(DEFAULT_MESSAGE_SERIALIZER) && !protocol.equalsIgnoreCase("CORE"))
{
+               System.err.println("Default Serializer does not support: " + protocol + "
protocol");
+               return null;
+            }
+            messageSerializer = (MessageSerializer) Class.forName(className).getConstructor().newInstance();
+         } catch (Exception e) {
+            System.err.println("Error. Unable to instantiate serializer class: " + serializer);
+            return null;
+         }
+
+         try {
+            OutputStream out = new FileOutputStream(file);
+            listener = new SerialiserMessageListener(messageSerializer, out);
+         } catch (Exception e) {
+            System.err.println("Error: Unable to open file for writing\n" + e.getMessage());
+            return null;
+         }
+      }
+
+      if (messageSerializer != null) messageSerializer.start();
+
       try (Connection connection = factory.createConnection()) {
+         // We read messages in a single thread when persisting to file.
          ConsumerThread[] threadsArray = new ConsumerThread[threads];
          for (int i = 0; i < threads; i++) {
             Session session;
@@ -58,10 +93,13 @@ public class Consumer extends DestAbstract {
             } else {
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             }
-            Destination dest = lookupDestination(session);
+
+            // Do validation on FQQN
+            Destination dest = isFQQN() ? session.createQueue(getFQQNFromDestination(destination))
: lookupDestination(session);
             threadsArray[i] = new ConsumerThread(session, dest, i);
 
-            threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull).setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false);
+            threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull)
+               .setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false).setListener(listener);
          }
 
          for (ConsumerThread thread : threadsArray) {
@@ -77,9 +115,24 @@ public class Consumer extends DestAbstract {
             received += thread.getReceived();
          }
 
+         if (messageSerializer != null) messageSerializer.stop();
+
          return received;
       }
    }
 
+   private class SerialiserMessageListener implements MessageListener {
 
+      private MessageSerializer messageSerializer;
+
+      SerialiserMessageListener(MessageSerializer messageSerializer, OutputStream outputStream)
throws Exception {
+         this.messageSerializer = messageSerializer;
+         this.messageSerializer.setOutput(outputStream);
+      }
+
+      @Override
+      public void onMessage(Message message) {
+         messageSerializer.write(message);
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
index ab3640b..9fbff81 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
@@ -21,6 +21,7 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.QueueBrowser;
@@ -50,6 +51,7 @@ public class ConsumerThread extends Thread {
    boolean running = false;
    CountDownLatch finished;
    boolean bytesAsText;
+   MessageListener listener;
 
    public ConsumerThread(Session session, Destination destination, int threadNr) {
       super("Consumer " + destination.toString() + ", thread=" + threadNr);
@@ -66,6 +68,43 @@ public class ConsumerThread extends Thread {
       }
    }
 
+   private void handle(Message msg, boolean browse) throws JMSException {
+      if (listener != null) {
+         listener.onMessage(msg);
+      } else {
+         if (browse) {
+            if (verbose) {
+               System.out.println("..." + msg);
+            }
+            if (bytesAsText && (msg instanceof BytesMessage)) {
+               long length = ((BytesMessage) msg).getBodyLength();
+               byte[] bytes = new byte[(int) length];
+               ((BytesMessage) msg).readBytes(bytes);
+               System.out.println("Message:" + msg);
+            }
+         } else {
+            if (verbose) {
+               if (bytesAsText && (msg instanceof BytesMessage)) {
+                  long length = ((BytesMessage) msg).getBodyLength();
+                  byte[] bytes = new byte[(int) length];
+                  ((BytesMessage) msg).readBytes(bytes);
+                  System.out.println("Received a message with " + bytes.length);
+               }
+
+               if (msg instanceof TextMessage) {
+                  String text = ((TextMessage) msg).getText();
+                  System.out.println("Received text sized at " + text.length());
+               }
+
+               if (msg instanceof ObjectMessage) {
+                  Object obj = ((ObjectMessage) msg).getObject();
+                  System.out.println("Received object " + obj.toString().length());
+               }
+            }
+         }
+      }
+   }
+
    public void browse() {
       running = true;
       QueueBrowser consumer = null;
@@ -83,16 +122,7 @@ public class ConsumerThread extends Thread {
             Message msg = enumBrowse.nextElement();
             if (msg != null) {
                System.out.println(threadName + " browsing " + (msg instanceof TextMessage
? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
-
-               if (verbose) {
-                  System.out.println("..." + msg);
-               }
-               if (bytesAsText && (msg instanceof BytesMessage)) {
-                  long length = ((BytesMessage) msg).getBodyLength();
-                  byte[] bytes = new byte[(int) length];
-                  ((BytesMessage) msg).readBytes(bytes);
-                  System.out.println("Message:" + msg);
-               }
+               handle(msg, true);
                received++;
 
                if (received >= messageCount) {
@@ -158,24 +188,7 @@ public class ConsumerThread extends Thread {
                      System.out.println("Received " + count);
                   }
                }
-               if (verbose) {
-                  if (bytesAsText && (msg instanceof BytesMessage)) {
-                     long length = ((BytesMessage) msg).getBodyLength();
-                     byte[] bytes = new byte[(int) length];
-                     ((BytesMessage) msg).readBytes(bytes);
-                     System.out.println("Received a message with " + bytes.length);
-                  }
-
-                  if (msg instanceof TextMessage) {
-                     String text = ((TextMessage) msg).getText();
-                     System.out.println("Received text sized at " + text.length());
-                  }
-
-                  if (msg instanceof ObjectMessage) {
-                     Object obj = ((ObjectMessage) msg).getObject();
-                     System.out.println("Received object " + obj.toString().length());
-                  }
-               }
+               handle(msg, false);
                received++;
             } else {
                if (breakOnNull) {
@@ -334,4 +347,8 @@ public class ConsumerThread extends Thread {
       this.browse = browse;
       return this;
    }
+
+   public void setListener(MessageListener listener) {
+      this.listener = listener;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
index 2f4a34c..63b5f17 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
@@ -19,12 +19,30 @@ package org.apache.activemq.artemis.cli.commands.messages;
 
 import javax.jms.Destination;
 import javax.jms.Session;
+import java.nio.ByteBuffer;
 
 import io.airlift.airline.Option;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientRequestor;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
+import org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 
 public class DestAbstract extends ConnectionAbstract {
 
+   public static final String DEFAULT_MESSAGE_SERIALIZER = "org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer";
+
+   private static final String FQQN_PREFIX = "fqqn://";
+
+   private static final String FQQN_SEPERATOR = "::";
+
    @Option(name = "--destination", description = "Destination to be used. It can be prefixed
with queue:// or topic:// (Default: queue://TEST)")
    String destination = "queue://TEST";
 
@@ -40,6 +58,25 @@ public class DestAbstract extends ConnectionAbstract {
    @Option(name = "--threads", description = "Number of Threads to be used (Default: 1)")
    int threads = 1;
 
+   @Option(name = "--serializer", description = "Override the default serializer with a custom
implementation")
+   String serializer;
+
+   protected boolean isFQQN() throws ActiveMQException {
+      boolean fqqn = destination.contains("::");
+      if (fqqn) {
+         if (!destination.startsWith("fqqn://")) {
+            throw new ActiveMQException("FQQN destinations must start with the fqqn:// prefix");
+         }
+
+         if (protocol.equalsIgnoreCase("AMQP")) {
+            throw new ActiveMQException("Sending to FQQN destinations is not support via
AMQP protocol");
+         }
+         return true;
+      } else {
+         return false;
+      }
+   }
+
    protected Destination lookupDestination(Session session) throws Exception {
       if (protocol.equals("AMQP")) {
          return session.createQueue(destination);
@@ -48,4 +85,63 @@ public class DestAbstract extends ConnectionAbstract {
       }
    }
 
+   protected MessageSerializer getMessageSerializer() {
+      if (serializer == null) return new XMLMessageSerializer();
+      try {
+         return (MessageSerializer) Class.forName(serializer).getConstructor().newInstance();
+      } catch (Exception e) {
+         System.out.println("Error: unable to instantiate serializer class: " + serializer);
+         System.out.println("Defaulting to: " + DEFAULT_MESSAGE_SERIALIZER);
+      }
+      return new XMLMessageSerializer();
+   }
+
+   // FIXME We currently do not support producing to FQQN.  This is a work around.
+   private ClientSession getManagementSession() throws Exception {
+      ServerLocator serverLocator = ActiveMQClient.createServerLocator(brokerURL);
+      ClientSessionFactory sf = serverLocator.createSessionFactory();
+
+      ClientSession managementSession;
+      if (user != null || password != null) {
+         managementSession = sf.createSession(user, password, false, true, true, false, 0);
+      } else {
+         managementSession = sf.createSession(false, true, true);
+      }
+      return managementSession;
+   }
+
+   public byte[] getQueueIdFromName(String queueName) throws Exception {
+      ClientMessage message = getQueueAttribute(queueName, "ID");
+      Number idObject = (Number) ManagementHelper.getResult(message);
+      ByteBuffer byteBuffer = ByteBuffer.allocate(8);
+      byteBuffer.putLong(idObject.longValue());
+      return byteBuffer.array();
+   }
+
+   protected ClientMessage getQueueAttribute(String queueName, String attribute) throws Exception
{
+      ClientSession managementSession = getManagementSession();
+      managementSession.start();
+
+      try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management"))
{
+         ClientMessage managementMessage = managementSession.createMessage(false);
+         ManagementHelper.putAttribute(managementMessage, ResourceNames.QUEUE + queueName,
attribute);
+         managementSession.start();
+         ClientMessage reply = requestor.request(managementMessage);
+         return reply;
+      } finally {
+         managementSession.stop();
+      }
+   }
+
+   protected String getQueueFromFQQN(String fqqn) {
+      return fqqn.substring(fqqn.indexOf(FQQN_SEPERATOR) + FQQN_SEPERATOR.length());
+   }
+
+   protected String getAddressFromFQQN(String fqqn) {
+      return fqqn.substring(fqqn.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length(), fqqn.indexOf(FQQN_SEPERATOR));
+   }
+
+   protected String getFQQNFromDestination(String destination) {
+      return destination.substring(destination.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length());
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
index 3cb5eff..0936578 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
@@ -19,12 +19,21 @@ package org.apache.activemq.artemis.cli.commands.messages;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
+import java.io.FileInputStream;
 
 import io.airlift.airline.Command;
 import io.airlift.airline.Option;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
+import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
 
 @Command(name = "producer", description = "It will send messages to an instance")
 public class Producer extends DestAbstract {
@@ -49,6 +58,9 @@ public class Producer extends DestAbstract {
    @Option(name = "--group", description = "Message Group to be used")
    String msgGroupID = null;
 
+   @Option(name = "--data", description = "Messages will be read form the specified file,
other message options will be ignored.")
+   String fileName = null;
+
    @Override
    public Object execute(ActionContext context) throws Exception {
       super.execute(context);
@@ -56,35 +68,100 @@ public class Producer extends DestAbstract {
       ConnectionFactory factory = createConnectionFactory();
 
       try (Connection connection = factory.createConnection()) {
-         ProducerThread[] threadsArray = new ProducerThread[threads];
-         for (int i = 0; i < threads; i++) {
-            Session session;
-            if (txBatchSize > 0) {
-               session = connection.createSession(true, Session.SESSION_TRANSACTED);
-            } else {
-               session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            }
-            Destination dest = lookupDestination(session);
-            threadsArray[i] = new ProducerThread(session, dest, i);
 
-            threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
-               setMessageSize(messageSize).setTextMessageSize(textMessageSize).setObjectSize(objectSize).
-               setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize).
-               setMessageCount(messageCount);
+         byte[] queueId = null;
+         boolean isFQQN = isFQQN();
+         if (isFQQN) {
+            queueId = getQueueIdFromName(getQueueFromFQQN(destination));
          }
 
-         for (ProducerThread thread : threadsArray) {
-            thread.start();
-         }
+         // If we are reading from file, we process messages sequentially to guarantee ordering.
 i.e. no thread creation.
+         if (fileName != null) {
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Destination dest = lookupDestination(session, isFQQN);
+
+            MessageProducer producer = session.createProducer(dest);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+            int messageCount = 0;
+            try {
+               MessageSerializer serializer = getMessageSerializer();
+               serializer.setInput(new FileInputStream(fileName), session);
+               serializer.start();
+
+               Message message = serializer.read();
+
+               while (message != null) {
+                  if (queueId != null) ((ActiveMQMessage) message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS,
queueId);
+                  producer.send(message);
+                  message = serializer.read();
+                  messageCount++;
+               }
+
+               session.commit();
+               serializer.stop();
+            } catch (Exception e) {
+               System.err.println("Error occurred during import.  Rolling back.");
+               session.rollback();
+               e.printStackTrace();
+               return 0;
+            }
+            System.out.println("Sent " + messageCount + " Messages.");
+            return messageCount;
+         } else {
+            ProducerThread[] threadsArray = new ProducerThread[threads];
+            for (int i = 0; i < threads; i++) {
+               Session session;
+               if (txBatchSize > 0) {
+                  session = connection.createSession(true, Session.SESSION_TRANSACTED);
+               } else {
+                  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               }
+               Destination dest = lookupDestination(session, isFQQN);
+               threadsArray[i] = new ProducerThread(session, dest, i);
+
+               threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
+                  setMessageSize(messageSize).setTextMessageSize(textMessageSize).setObjectSize(objectSize).
+                  setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize).
+                  setMessageCount(messageCount).setQueueId(queueId);
+            }
+
+            for (ProducerThread thread : threadsArray) {
+               thread.start();
+            }
 
-         int messagesProduced = 0;
-         for (ProducerThread thread : threadsArray) {
-            thread.join();
-            messagesProduced += thread.getSentCount();
+            int messagesProduced = 0;
+            for (ProducerThread thread : threadsArray) {
+               thread.join();
+               messagesProduced += thread.getSentCount();
+            }
+            return messagesProduced;
          }
+      }
+   }
 
-         return messagesProduced;
+   public Destination lookupDestination(Session session, boolean isFQQN) throws Exception
{
+      Destination dest;
+      if (!isFQQN) {
+         dest = lookupDestination(session);
+      } else {
+         String address = getAddressFromFQQN(destination);
+         if (isFQQNAnycast(getQueueFromFQQN(destination))) {
+            String queue = getQueueFromFQQN(destination);
+            if (!queue.equals(address)) {
+               throw new ActiveMQException("FQQN support is limited to Anycast queues where
the queue name equals the address.");
+            }
+            dest = session.createQueue(address);
+         } else {
+            dest = session.createTopic(address);
+         }
       }
+      return dest;
    }
 
+   protected boolean isFQQNAnycast(String queueName) throws Exception {
+      ClientMessage message = getQueueAttribute(queueName, "RoutingType");
+      String routingType = (String) ManagementHelper.getResult(message);
+      return routingType.equalsIgnoreCase("anycast");
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java
index 6e9fc5c..58a57ef 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java
@@ -30,6 +30,7 @@ import java.io.InputStreamReader;
 import java.net.URL;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 
 public class ProducerThread extends Thread {
@@ -48,6 +49,7 @@ public class ProducerThread extends Thread {
    long msgTTL = 0L;
    String msgGroupID = null;
    int transactionBatchSize;
+   byte[] queueId = null;
 
    int transactions = 0;
    final AtomicInteger sentCount = new AtomicInteger(0);
@@ -121,6 +123,11 @@ public class ProducerThread extends Thread {
 
    private void sendMessage(MessageProducer producer, String threadName) throws Exception
{
       Message message = createMessage(sentCount.get(), threadName);
+
+      if (queueId != null) {
+         ((ActiveMQMessage) message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS,
queueId);
+      }
+
       producer.send(message);
       if (verbose) {
          System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage)
message).getText() : message.getJMSMessageID()));
@@ -370,4 +377,8 @@ public class ProducerThread extends Thread {
       this.objectSize = objectSize;
       return this;
    }
+
+   public void setQueueId(byte[] queueId) {
+      this.queueId = queueId;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/MessageSerializer.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/MessageSerializer.java
new file mode 100644
index 0000000..9207908
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/MessageSerializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.factory.serialize;
+
+import javax.jms.Message;
+import javax.jms.Session;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public interface MessageSerializer {
+
+   Message read() throws Exception;
+
+   void write(Message message);
+
+   void setOutput(OutputStream out) throws Exception;
+
+   void setInput(InputStream in, Session session) throws Exception;
+
+   void start() throws Exception;
+
+   void stop() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/XMLMessageSerializer.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/XMLMessageSerializer.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/XMLMessageSerializer.java
new file mode 100644
index 0000000..e353b94
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/XMLMessageSerializer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.factory.serialize;
+
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Proxy;
+
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XMLMessageExporter;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XMLMessageImporter;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataConstants;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataExporter;
+import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
+import org.apache.activemq.artemis.jms.client.ActiveMQSession;
+
+public class XMLMessageSerializer implements MessageSerializer {
+
+   private XMLMessageExporter writer;
+
+   private XMLMessageImporter reader;
+
+   private ClientSession clientSession;
+
+   private OutputStream out;
+
+   @Override
+   public synchronized Message read() throws Exception {
+      reader.getRawXMLReader().nextTag();
+
+      // End of document.
+      if (reader.getRawXMLReader().getLocalName().equals("messages")) return null;
+
+      XMLMessageImporter.MessageInfo messageInfo = reader.readMessage(true);
+      if (messageInfo == null) return null;
+
+      // This is a large message
+      ActiveMQMessage jmsMessage = new ActiveMQMessage((ClientMessage) messageInfo.message,
clientSession);
+      if (messageInfo.tempFile != null) {
+         jmsMessage.setInputStream(new FileInputStream(messageInfo.tempFile));
+      }
+      return jmsMessage;
+   }
+
+   @Override
+   public synchronized void write(Message message) {
+      try {
+         ICoreMessage core = ((ActiveMQMessage) message).getCoreMessage();
+         writer.printSingleMessageAsXML(core, null, true);
+      } catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+   }
+
+   @Override
+   public void setOutput(OutputStream outputStream) throws Exception {
+      this.out = outputStream;
+      XMLOutputFactory factory = XMLOutputFactory.newInstance();
+      XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(outputStream, "UTF-8");
+      XmlDataExporter.PrettyPrintHandler handler = new XmlDataExporter.PrettyPrintHandler(rawXmlWriter);
+      XMLStreamWriter xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(),
new Class[]{XMLStreamWriter.class}, handler);
+      this.writer = new XMLMessageExporter(xmlWriter);
+   }
+
+   @Override
+   public void setInput(InputStream inputStream, Session session) throws Exception {
+      XMLStreamReader streamReader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
+      this.clientSession = ((ActiveMQSession) session).getCoreSession();
+      this.reader = new XMLMessageImporter(streamReader, clientSession);
+   }
+
+   @Override
+   public synchronized void start() throws Exception {
+      if (writer != null) {
+         writer.getRawXMLWriter().writeStartDocument(XmlDataConstants.XML_VERSION);
+         writer.getRawXMLWriter().writeStartElement(XmlDataConstants.MESSAGES_PARENT);
+      }
+
+      if (reader != null) {
+         // <messages>
+         reader.getRawXMLReader().nextTag();
+      }
+   }
+
+   @Override
+   public synchronized void stop() throws Exception {
+      if (writer != null) {
+         writer.getRawXMLWriter().writeEndElement();
+         writer.getRawXMLWriter().writeEndDocument();
+         writer.getRawXMLWriter().flush();
+         writer.getRawXMLWriter().close();
+         out.flush();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64ce26e7/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java
b/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java
new file mode 100644
index 0000000..df79104
--- /dev/null
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.test;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.cli.Artemis;
+import org.apache.activemq.artemis.cli.commands.Run;
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test to validate that the CLI doesn't throw improper exceptions when invoked.
+ */
+public class MessageSerializerTest extends CliTestBase {
+
+   private Connection connection;
+
+   @Before
+   @Override
+   public void setup() throws Exception {
+      setupAuth();
+      super.setup();
+      startServer();
+      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
+      connection = cf.createConnection("admin", "admin");
+   }
+
+   @After
+   @Override
+   public void tearDown() throws Exception {
+      try {
+         connection.close();
+      } finally {
+         stopServer();
+         super.tearDown();
+      }
+   }
+
+   private void setupAuth() throws Exception {
+      setupAuth(temporaryFolder.getRoot());
+   }
+
+   private void setupAuth(File folder) throws Exception {
+      System.setProperty("java.security.auth.login.config", folder.getAbsolutePath() + "/etc/login.config");
+   }
+
+   private void startServer() throws Exception {
+      File rootDirectory = new File(temporaryFolder.getRoot(), "broker");
+      setupAuth(rootDirectory);
+      Run.setEmbedded(true);
+      Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune",
"--no-web", "--require-login");
+      System.setProperty("artemis.instance", rootDirectory.getAbsolutePath());
+      Artemis.internalExecute("run");
+   }
+
+   private void stopServer() throws Exception {
+      Artemis.internalExecute("stop");
+      assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS));
+      assertEquals(0, LibaioContext.getTotalMaxIO());
+   }
+
+   private File createMessageFile() throws IOException {
+      return temporaryFolder.newFile("messages.xml");
+   }
+
+   @Test
+   public void testTextMessageImportExport() throws Exception {
+      String address = "test";
+      int noMessages = 10;
+      File file = createMessageFile();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      connection.start();
+
+      List<Message> sent = new ArrayList<>(noMessages);
+      for (int i = 0; i < noMessages; i++) {
+         sent.add(session.createTextMessage(RandomUtil.randomString()));
+      }
+
+      sendMessages(session, address, sent);
+      exportMessages(address, noMessages, file);
+
+      // Ensure there's nothing left to consume
+      MessageConsumer consumer = session.createConsumer(getDestination(address));
+      assertNull(consumer.receive(1000));
+      consumer.close();
+
+      importMessages(address, file);
+
+      List<Message> received = consumeMessages(session, address, noMessages, false);
+      for (int i = 0; i < noMessages; i++) {
+         assertEquals(((TextMessage) sent.get(i)).getText(), ((TextMessage) received.get(i)).getText());
+      }
+   }
+
+   @Test
+   public void testObjectMessageImportExport() throws Exception {
+      String address = "test";
+      int noMessages = 10;
+      File file = createMessageFile();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      connection.start();
+
+      // Send initial messages.
+      List<Message> sent = new ArrayList<>(noMessages);
+      for (int i = 0; i < noMessages; i++) {
+         sent.add(session.createObjectMessage(UUID.randomUUID()));
+      }
+
+      sendMessages(session, address, sent);
+      exportMessages(address, noMessages, file);
+
+      // Ensure there's nothing left to consume
+      MessageConsumer consumer = session.createConsumer(getDestination(address));
+      assertNull(consumer.receive(1000));
+      consumer.close();
+
+      importMessages(address, file);
+      List<Message> received = consumeMessages(session, address, noMessages, false);
+      for (int i = 0; i < noMessages; i++) {
+         assertEquals(((ObjectMessage) sent.get(i)).getObject(), ((ObjectMessage) received.get(i)).getObject());
+      }
+   }
+
+   @Test
+   public void testMapMessageImportExport() throws Exception {
+      String address = "test";
+      int noMessages = 10;
+      String key = "testKey";
+      File file = createMessageFile();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      connection.start();
+
+      List<Message> sent = new ArrayList<>(noMessages);
+      for (int i = 0; i < noMessages; i++) {
+         MapMessage m = session.createMapMessage();
+         m.setString(key, RandomUtil.randomString());
+         sent.add(m);
+      }
+
+      sendMessages(session, address, sent);
+      exportMessages(address, noMessages, file);
+
+      // Ensure there's nothing left to consume
+      MessageConsumer consumer = session.createConsumer(getDestination(address));
+      assertNull(consumer.receive(1000));
+      consumer.close();
+
+      importMessages(address, file);
+      List<Message> received = consumeMessages(session, address, noMessages, false);
+      for (int i = 0; i < noMessages; i++) {
+         assertEquals(((MapMessage) sent.get(i)).getString(key), ((MapMessage) received.get(i)).getString(key));
+      }
+   }
+
+   private void sendMessages(Session session, String address, List<Message> messages)
throws Exception {
+      MessageProducer producer = session.createProducer(getDestination(address));
+      for (Message m : messages) {
+         producer.send(m);
+      }
+   }
+
+   private void sendMessages(Session session, Destination destination, List<Message>
messages) throws Exception {
+      MessageProducer producer = session.createProducer(destination);
+      for (Message m : messages) {
+         producer.send(m);
+      }
+   }
+
+   private List<Message> consumeMessages(Session session, String address, int noMessages,
boolean fqqn) throws Exception {
+      Destination destination = fqqn ? session.createQueue(address) : getDestination(address);
+      MessageConsumer consumer = session.createConsumer(destination);
+
+      List<Message> messages = new ArrayList<>();
+      for (int i = 0; i < noMessages; i++) {
+         Message m = consumer.receive(1000);
+         assertNotNull(m);
+         messages.add(m);
+      }
+      return messages;
+   }
+
+   private void exportMessages(String address, int noMessages, File output) throws Exception
{
+      Artemis.main("consumer",
+                   "--user", "admin",
+                   "--password", "admin",
+                   "--destination", address,
+                   "--message-count", "" + noMessages,
+                   "--data", output.getAbsolutePath());
+   }
+
+   private void importMessages(String address, File input) throws Exception {
+      Artemis.main("producer",
+                   "--user", "admin",
+                   "--password", "admin",
+                   "--destination", address,
+                   "--data", input.getAbsolutePath());
+   }
+
+   private void createQueue(String routingTypeOption, String address, String queueName) throws
Exception {
+      Artemis.main("queue", "create",
+                   "--user", "admin",
+                   "--password", "admin",
+                   "--address", address,
+                   "--name", queueName,
+                   routingTypeOption,
+                   "--durable",
+                   "--preserve-on-no-consumers",
+                   "--auto-create-address");
+   }
+
+   @Test
+   public void testSendDirectToQueue() throws Exception {
+
+      String address = "test";
+      String queue1Name = "queue1";
+      String queue2Name = "queue2";
+
+      createQueue("--multicast", address, queue1Name);
+      createQueue("--multicast", address, queue2Name);
+
+      try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = cf.createConnection("admin", "admin");) {
+
+         // send messages to queue
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         connection.start();
+
+         Destination queue1 = session.createQueue(address + "::" + queue1Name);
+         Destination queue2 = session.createQueue(address + "::" + queue2Name);
+
+         MessageConsumer consumer1 = session.createConsumer(queue1);
+         MessageConsumer consumer2 = session.createConsumer(queue2);
+
+         Artemis.main("producer",
+                      "--user", "admin",
+                      "--password", "admin",
+                      "--destination", "fqqn://" + address + "::" + queue1Name,
+                      "--message-count", "5");
+
+         assertNull(consumer2.receive(1000));
+         assertNotNull(consumer1.receive(1000));
+      }
+   }
+
+   @Test
+   public void exportFromFQQN() throws Exception {
+      String addr = "address";
+      String queue = "queue";
+      String fqqn = addr + "::" + queue;
+      String destination = "fqqn://" + fqqn;
+
+      File file = createMessageFile();
+      int noMessages = 10;
+
+      createQueue("--multicast", addr, queue);
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      connection.start();
+
+      Topic topic = session.createTopic(addr);
+
+      List<Message> messages = new ArrayList<>(noMessages);
+      for (int i = 0; i < noMessages; i++) {
+         messages.add(session.createTextMessage(RandomUtil.randomString()));
+      }
+
+      sendMessages(session, topic, messages);
+
+      exportMessages(destination, noMessages, file);
+      importMessages(destination, file);
+
+      List<Message> recieved = consumeMessages(session, fqqn, noMessages, true);
+      for (int i = 0; i < noMessages; i++) {
+         assertEquals(((TextMessage) messages.get(i)).getText(), ((TextMessage) recieved.get(i)).getText());
+      }
+   }
+
+   //read individual lines from byteStream
+   private ArrayList<String> getOutputLines(TestActionContext context, boolean errorOutput)
throws IOException {
+      byte[] bytes;
+
+      if (errorOutput) {
+         bytes = context.getStdErrBytes();
+      } else {
+         bytes = context.getStdoutBytes();
+      }
+      BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bytes)));
+      ArrayList<String> lines = new ArrayList<>();
+
+      String currentLine = bufferedReader.readLine();
+      while (currentLine != null) {
+         lines.add(currentLine);
+         currentLine = bufferedReader.readLine();
+      }
+
+      return lines;
+   }
+
+   private void sendMessages(Session session, String queueName, int messageCount) throws
JMSException {
+      MessageProducer producer = session.createProducer(getDestination(queueName));
+
+      TextMessage message = session.createTextMessage(getTestMessageBody());
+
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(message);
+      }
+   }
+
+   private String getTestMessageBody() {
+      return "Sample Message";
+   }
+
+   private Destination getDestination(String queueName) {
+      return ActiveMQDestination.createDestination("queue://" + queueName, ActiveMQDestination.TYPE.QUEUE);
+   }
+}


Mime
View raw message