activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [4/6] activemq-artemis git commit: ARTEMIS-737 - added JUnit rules for Artemis servers and clients
Date Mon, 26 Sep 2016 21:58:49 GMT
ARTEMIS-737 - added JUnit rules for Artemis servers and clients


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/db095926
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/db095926
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/db095926

Branch: refs/heads/master
Commit: db095926ed1ddb9f59c32650af42e4b19312e76a
Parents: 27ed7ec
Author: Quinn Stevenson <quinn@pronoia-solutions.com>
Authored: Wed Sep 21 15:45:05 2016 -0600
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Sep 26 17:57:52 2016 -0400

----------------------------------------------------------------------
 artemis-junit/pom.xml                           |  70 ++
 .../junit/AbstractActiveMQClientResource.java   | 157 ++++
 .../artemis/junit/ActiveMQConsumerResource.java | 132 +++
 .../junit/ActiveMQDynamicProducerResource.java  | 166 ++++
 .../artemis/junit/ActiveMQProducerResource.java | 273 ++++++
 .../artemis/junit/EmbeddedActiveMQResource.java | 878 +++++++++++++++++++
 .../artemis/junit/EmbeddedJMSResource.java      | 749 ++++++++++++++++
 .../junit/ActiveMQConsumerResourceTest.java     |  87 ++
 .../ActiveMQDynamicProducerResourceTest.java    |  97 ++
 ...ucerResourceWithoutAddressExceptionTest.java |  75 ++
 ...namicProducerResourceWithoutAddressTest.java | 105 +++
 .../junit/ActiveMQProducerResourceTest.java     |  85 ++
 ...ActiveMQResourceCustomConfigurationTest.java |  57 ++
 ...edActiveMQResourceFileConfigurationTest.java |  46 +
 .../junit/EmbeddedActiveMQResourceTest.java     |  88 ++
 ...MSResourceMultipleFileConfigurationTest.java |  79 ++
 .../junit/EmbeddedJMSResourceQueueTest.java     | 102 +++
 ...dJMSResourceSingleFileConfigurationTest.java |  79 ++
 .../junit/EmbeddedJMSResourceTopicTest.java     | 127 +++
 .../MultipleEmbeddedActiveMQResourcesTest.java  |  67 ++
 .../junit/MultipleEmbeddedJMSResourcesTest.java |  54 ++
 .../resources/embedded-artemis-jms-only.xml     |  29 +
 .../resources/embedded-artemis-jms-server.xml   |  40 +
 .../embedded-artemis-minimal-server.xml         |  31 +
 .../test/resources/embedded-artemis-server.xml  |  41 +
 .../src/test/resources/logging.properties       |  61 ++
 pom.xml                                         |   1 +
 27 files changed, 3776 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/db095926/artemis-junit/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-junit/pom.xml b/artemis-junit/pom.xml
new file mode 100644
index 0000000..89aa47f
--- /dev/null
+++ b/artemis-junit/pom.xml
@@ -0,0 +1,70 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.activemq</groupId>
+        <artifactId>artemis-pom</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>artemis-junit</artifactId>
+    <packaging>jar</packaging>
+    <name>ActiveMQ Artemis JUnit Rules</name>
+
+    <properties>
+        <activemq.basedir>${project.basedir}/..</activemq.basedir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <!--
+        -->
+        <dependency>
+            <groupId>org.jboss.logmanager</groupId>
+            <artifactId>jboss-logmanager</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>artemis-jms-server</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>artemis-jms-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/db095926/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/AbstractActiveMQClientResource.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/AbstractActiveMQClientResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/AbstractActiveMQClientResource.java
new file mode 100644
index 0000000..74b9db8
--- /dev/null
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/AbstractActiveMQClientResource.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.junit;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractActiveMQClientResource extends ExternalResource {
+
+   Logger log = LoggerFactory.getLogger(this.getClass());
+
+   boolean autoCreateQueue = true;
+
+   ServerLocator serverLocator;
+   ClientSessionFactory sessionFactory;
+   ClientSession session;
+
+   public AbstractActiveMQClientResource(String url) {
+      if (url == null) {
+         throw new IllegalArgumentException(String.format("Error creating {} - url cannot be null", this.getClass().getSimpleName()));
+      }
+
+      try {
+         this.serverLocator = ActiveMQClient.createServerLocator(url);
+      }
+      catch (Exception ex) {
+         throw new RuntimeException(String.format("Error creating {} - createServerLocator( {} ) failed", this.getClass().getSimpleName(), url.toString()), ex);
+      }
+   }
+
+   public AbstractActiveMQClientResource(ServerLocator serverLocator) {
+      if (serverLocator == null) {
+         throw new IllegalArgumentException(String.format("Error creating {} - ServerLocator cannot be null", this.getClass().getSimpleName()));
+      }
+
+      this.serverLocator = serverLocator;
+   }
+
+   /**
+    * Adds properties to a ClientMessage
+    *
+    * @param message
+    * @param properties
+    */
+   public static void addMessageProperties(ClientMessage message, Map<String, Object> properties) {
+      if (properties != null && properties.size() > 0) {
+         for (Map.Entry<String, Object> property : properties.entrySet()) {
+            message.putObjectProperty(property.getKey(), property.getValue());
+         }
+      }
+   }
+
+   @Override
+   protected void before() throws Throwable {
+      super.before();
+      start();
+   }
+
+   @Override
+   protected void after() {
+      stop();
+      super.after();
+   }
+
+   void start() {
+      log.info("Starting {}", this.getClass().getSimpleName());
+      try {
+         sessionFactory = serverLocator.createSessionFactory();
+         session = sessionFactory.createSession();
+      }
+      catch (RuntimeException runtimeEx) {
+         throw runtimeEx;
+      }
+      catch (Exception ex) {
+         throw new ActiveMQClientResourceException(String.format("%s initialisation failure", this.getClass().getSimpleName()), ex);
+      }
+
+      createClient();
+
+      try {
+         session.start();
+      }
+      catch (ActiveMQException amqEx) {
+         throw new ActiveMQClientResourceException(String.format("%s startup failure", this.getClass().getSimpleName()), amqEx);
+      }
+   }
+
+   void stop() {
+      stopClient();
+      if (session != null) {
+         try {
+            session.close();
+         }
+         catch (ActiveMQException amqEx) {
+            log.warn("ActiveMQException encountered closing InternalClient ClientSession - ignoring", amqEx);
+         }
+         finally {
+            session = null;
+         }
+      }
+      if (sessionFactory != null) {
+         sessionFactory.close();
+         sessionFactory = null;
+      }
+      if (serverLocator != null) {
+         serverLocator.close();
+         serverLocator = null;
+      }
+
+   }
+
+   protected abstract void createClient();
+
+   protected abstract void stopClient();
+
+   public boolean isAutoCreateQueue() {
+      return autoCreateQueue;
+   }
+
+   public void setAutoCreateQueue(boolean autoCreateQueue) {
+      this.autoCreateQueue = autoCreateQueue;
+   }
+
+   public static class ActiveMQClientResourceException extends RuntimeException {
+
+      public ActiveMQClientResourceException(String message) {
+         super(message);
+      }
+
+      public ActiveMQClientResourceException(String message, Exception cause) {
+         super(message, cause);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/db095926/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java
new file mode 100644
index 0000000..2e3088c
--- /dev/null
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.junit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+
+public class ActiveMQConsumerResource extends AbstractActiveMQClientResource {
+
+   long defaultReceiveTimeout = 50;
+
+   SimpleString queueName;
+   ClientConsumer consumer;
+
+   public ActiveMQConsumerResource(String url, SimpleString queueName) {
+      super(url);
+      this.queueName = queueName;
+   }
+
+   public ActiveMQConsumerResource(ServerLocator serverLocator, SimpleString queueName) {
+      super(serverLocator);
+      this.queueName = queueName;
+   }
+
+   public long getDefaultReceiveTimeout() {
+      return defaultReceiveTimeout;
+   }
+
+   /**
+    * Sets the default timeout in milliseconds used when receiving messages.  Defaults to 50 milliseconds
+    *
+    * @param defaultReceiveTimeout received timeout in milliseconds
+    */
+   public void setDefaultReceiveTimeout(long defaultReceiveTimeout) {
+      this.defaultReceiveTimeout = defaultReceiveTimeout;
+   }
+
+   @Override
+   protected void createClient() {
+      boolean browseOnly = false;
+      try {
+         if (!session.queueQuery(queueName).isExists() && autoCreateQueue) {
+            log.warn("{}: queue does not exist - creating queue: address = {}, name = {}", this.getClass().getSimpleName(), queueName.toString(), queueName.toString());
+            session.createQueue(queueName, queueName);
+         }
+         consumer = session.createConsumer(queueName, browseOnly);
+      }
+      catch (ActiveMQException amqEx) {
+         throw new ActiveMQClientResourceException(String.format("Error creating consumer for queueName %s", queueName.toString()), amqEx);
+      }
+   }
+
+   @Override
+   protected void stopClient() {
+      if (consumer != null) {
+         try {
+            consumer.close();
+         }
+         catch (ActiveMQException amqEx) {
+            log.warn("Exception encountered closing consumer - ignoring", amqEx);
+         }
+         finally {
+            consumer = null;
+         }
+      }
+   }
+
+   public boolean isAutoCreateQueue() {
+      return autoCreateQueue;
+   }
+
+   /**
+    * Enable/Disable the automatic creation of non-existant queues.  The default is to automatically create non-existant queues
+    *
+    * @param autoCreateQueue
+    */
+   public void setAutoCreateQueue(boolean autoCreateQueue) {
+      this.autoCreateQueue = autoCreateQueue;
+   }
+
+   public ClientMessage receiveMessage() {
+      return receiveMessage(defaultReceiveTimeout);
+   }
+
+   public ClientMessage receiveMessage(long timeout) {
+      ClientMessage message = null;
+      if (timeout > 0) {
+         try {
+            message = consumer.receive(timeout);
+         }
+         catch (ActiveMQException amqEx) {
+            throw new EmbeddedActiveMQResource.EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive( timeout = %d ) for %s failed", timeout, queueName.toString()), amqEx);
+         }
+      }
+      else if (timeout == 0) {
+         try {
+            message = consumer.receiveImmediate();
+         }
+         catch (ActiveMQException amqEx) {
+            throw new EmbeddedActiveMQResource.EmbeddedActiveMQResourceException(String.format("ClientConsumer.receiveImmediate() for %s failed", queueName.toString()), amqEx);
+         }
+      }
+      else {
+         try {
+            message = consumer.receive();
+         }
+         catch (ActiveMQException amqEx) {
+            throw new EmbeddedActiveMQResource.EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive() for %s failed", queueName.toString()), amqEx);
+         }
+      }
+
+      return message;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/db095926/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQDynamicProducerResource.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQDynamicProducerResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQDynamicProducerResource.java
new file mode 100644
index 0000000..477dd39
--- /dev/null
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQDynamicProducerResource.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.junit;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+
+public class ActiveMQDynamicProducerResource extends ActiveMQProducerResource {
+
+   public ActiveMQDynamicProducerResource(String url) {
+      super(url);
+   }
+
+   public ActiveMQDynamicProducerResource(ServerLocator serverLocator) {
+      super(serverLocator);
+   }
+
+   public ActiveMQDynamicProducerResource(String url, SimpleString address) {
+      super(url, address);
+   }
+
+   public ActiveMQDynamicProducerResource(ServerLocator serverLocator, SimpleString address) {
+      super(serverLocator, address);
+   }
+
+   @Override
+   protected void createClient() {
+      try {
+         if (address != null && !session.addressQuery(address).isExists() && autoCreateQueue) {
+            log.warn("queue does not exist - creating queue: address = {}, name = {}", address.toString(), address.toString());
+            session.createQueue(address, address);
+         }
+         producer = session.createProducer((SimpleString) null);
+      }
+      catch (ActiveMQException amqEx) {
+         if (address == null) {
+            throw new ActiveMQClientResourceException(String.format("Error creating producer for address %s", address.toString()), amqEx);
+         }
+         else {
+            throw new ActiveMQClientResourceException("Error creating producer", amqEx);
+         }
+      }
+   }
+
+   /**
+    * Send a ClientMessage to the default address on the server
+    *
+    * @param message the message to send
+    */
+   @Override
+   public void sendMessage(ClientMessage message) {
+      sendMessage(address, message);
+   }
+
+   /**
+    * Send a ClientMessage to the specified address on the server
+    *
+    * @param targetAddress the target address
+    * @param message       the message to send
+    */
+   public void sendMessage(SimpleString targetAddress, ClientMessage message) {
+      if (targetAddress == null) {
+         throw new IllegalArgumentException(String.format("%s error - address cannot be null", this.getClass().getSimpleName()));
+      }
+      try {
+         if (autoCreateQueue && !session.addressQuery(targetAddress).isExists()) {
+            log.warn("queue does not exist - creating queue: address = {}, name = {}", address.toString(), address.toString());
+            session.createQueue(targetAddress, targetAddress);
+         }
+      }
+      catch (ActiveMQException amqEx) {
+         throw new ActiveMQClientResourceException(String.format("Queue creation failed for queue: address = %s, name = %s", address.toString(), address.toString()));
+      }
+
+      try {
+         producer.send(targetAddress, message);
+      }
+      catch (ActiveMQException amqEx) {
+         throw new ActiveMQClientResourceException(String.format("Failed to send message to %s", targetAddress.toString()), amqEx);
+      }
+   }
+
+   /**
+    * Create a new ClientMessage with the specified body and send to the specified address on the server
+    *
+    * @param targetAddress the target address
+    * @param body          the body for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(SimpleString targetAddress, byte[] body) {
+      ClientMessage message = createMessage(body);
+      sendMessage(targetAddress, message);
+      return message;
+   }
+
+   /**
+    * Create a new ClientMessage with the specified body and send to the server
+    *
+    * @param targetAddress the target address
+    * @param body          the body for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(SimpleString targetAddress, String body) {
+      ClientMessage message = createMessage(body);
+      sendMessage(targetAddress, message);
+      return message;
+   }
+
+   /**
+    * Create a new ClientMessage with the specified properties and send to the server
+    *
+    * @param targetAddress the target address
+    * @param properties    the properties for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(SimpleString targetAddress, Map<String, Object> properties) {
+      ClientMessage message = createMessage(properties);
+      sendMessage(targetAddress, message);
+      return message;
+   }
+
+   /**
+    * Create a new ClientMessage with the specified body and and properties and send to the server
+    *
+    * @param targetAddress the target address
+    * @param properties    the properties for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(SimpleString targetAddress, byte[] body, Map<String, Object> properties) {
+      ClientMessage message = createMessage(body);
+      sendMessage(targetAddress, message);
+      return message;
+   }
+
+   /**
+    * Create a new ClientMessage with the specified body and and properties and send to the server
+    *
+    * @param targetAddress the target address
+    * @param properties    the properties for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(SimpleString targetAddress, String body, Map<String, Object> properties) {
+      ClientMessage message = createMessage(body);
+      sendMessage(targetAddress, message);
+      return message;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/db095926/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQProducerResource.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQProducerResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQProducerResource.java
new file mode 100644
index 0000000..d9336d4
--- /dev/null
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQProducerResource.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.junit;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+
+public class ActiveMQProducerResource extends AbstractActiveMQClientResource {
+
+   boolean useDurableMessage = true;
+   SimpleString address = null;
+   ClientProducer producer;
+
+   protected ActiveMQProducerResource(String url) {
+      super(url);
+   }
+
+   protected ActiveMQProducerResource(ServerLocator serverLocator) {
+      super(serverLocator);
+   }
+
+   public ActiveMQProducerResource(String url, SimpleString address) {
+      super(url);
+      if (address == null) {
+         throw new IllegalArgumentException(String.format("%s construction error - address cannot be null", this.getClass().getSimpleName()));
+      }
+      this.address = address;
+   }
+
+   public ActiveMQProducerResource(ServerLocator serverLocator, SimpleString address) {
+      super(serverLocator);
+      if (address == null) {
+         throw new IllegalArgumentException(String.format("%s construction error - address cannot be null", this.getClass().getSimpleName()));
+      }
+      this.address = address;
+   }
+
+   public boolean isUseDurableMessage() {
+      return useDurableMessage;
+   }
+
+   /**
+    * Disables/Enables creating durable messages.  By default, durable messages are created
+    *
+    * @param useDurableMessage if true, durable messages will be created
+    */
+   public void setUseDurableMessage(boolean useDurableMessage) {
+      this.useDurableMessage = useDurableMessage;
+   }
+
+   @Override
+   protected void createClient() {
+      try {
+         if (!session.addressQuery(address).isExists() && autoCreateQueue) {
+            log.warn("{}: queue does not exist - creating queue: address = {}, name = {}", this.getClass().getSimpleName(), address.toString(), address.toString());
+            session.createQueue(address, address);
+         }
+         producer = session.createProducer(address);
+      }
+      catch (ActiveMQException amqEx) {
+         throw new ActiveMQClientResourceException(String.format("Error creating producer for address %s", address.toString()), amqEx);
+      }
+   }
+
+   @Override
+   protected void stopClient() {
+      if (producer != null) {
+         try {
+            producer.close();
+         }
+         catch (ActiveMQException amqEx) {
+            log.warn("ActiveMQException encountered closing InternalClient ClientProducer - ignoring", amqEx);
+         }
+         finally {
+            producer = null;
+         }
+      }
+   }
+
+   /**
+    * Create a ClientMessage
+    * <p>
+    * If useDurableMessage is false, a non-durable message is created.  Otherwise, a durable message is created
+    *
+    * @return a new ClientMessage
+    */
+   public ClientMessage createMessage() {
+      if (session == null) {
+         throw new IllegalStateException("ClientSession is null");
+      }
+      return session.createMessage(isUseDurableMessage());
+   }
+
+   /**
+    * Create a ClientMessage with the specified body
+    * <p>
+    * If useDurableMessage is false, a non-durable message is created.  Otherwise, a durable message is created
+    *
+    * @param body the body for the new message
+    * @return a new ClientMessage with the specified body
+    */
+   public ClientMessage createMessage(byte[] body) {
+      ClientMessage message = createMessage();
+
+      if (body != null) {
+         message.writeBodyBufferBytes(body);
+      }
+
+      return message;
+   }
+
+   /**
+    * Create a ClientMessage with the specified body
+    * <p>
+    * If useDurableMessage is false, a non-durable message is created.  Otherwise, a durable message is created
+    *
+    * @param body the body for the new message
+    * @return a new ClientMessage with the specified body
+    */
+   public ClientMessage createMessage(String body) {
+      ClientMessage message = createMessage();
+
+      if (body != null) {
+         message.writeBodyBufferString(body);
+      }
+
+      return message;
+   }
+
+   /**
+    * Create a ClientMessage with the specified message properties
+    * <p>
+    * If useDurableMessage is false, a non-durable message is created.  Otherwise, a durable message is created
+    *
+    * @param properties message properties for the new message
+    * @return a new ClientMessage with the specified message properties
+    */
+   public ClientMessage createMessage(Map<String, Object> properties) {
+      ClientMessage message = createMessage();
+
+      addMessageProperties(message, properties);
+
+      return message;
+   }
+
+   /**
+    * Create a ClientMessage with the specified body and message properties
+    * <p>
+    * If useDurableMessage is false, a non-durable message is created.  Otherwise, a durable message is created
+    *
+    * @param body       the body for the new message
+    * @param properties message properties for the new message
+    * @return a new ClientMessage with the specified body and message properties
+    */
+   public ClientMessage createMessage(byte[] body, Map<String, Object> properties) {
+      ClientMessage message = createMessage(body);
+
+      addMessageProperties(message, properties);
+
+      return message;
+   }
+
+   /**
+    * Create a ClientMessage with the specified body and message properties
+    * <p>
+    * If useDurableMessage is false, a non-durable message is created.  Otherwise, a durable message is created
+    *
+    * @param body       the body for the new message
+    * @param properties message properties for the new message
+    * @return a new ClientMessage with the specified body and message properties
+    */
+   public ClientMessage createMessage(String body, Map<String, Object> properties) {
+      ClientMessage message = createMessage(body);
+
+      addMessageProperties(message, properties);
+
+      return message;
+   }
+
+   /**
+    * Send a ClientMessage to the server
+    *
+    * @param message the message to send
+    */
+   public void sendMessage(ClientMessage message) {
+      try {
+         producer.send(message);
+      }
+      catch (ActiveMQException amqEx) {
+         throw new ActiveMQClientResourceException(String.format("Failed to send message to %s", producer.getAddress().toString()), amqEx);
+      }
+   }
+
+   /**
+    * Create a new ClientMessage with the specified body and send to the server
+    *
+    * @param body the body for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(byte[] body) {
+      ClientMessage message = createMessage(body);
+      sendMessage(message);
+      return message;
+   }
+
+   /**
+    * Create a new ClientMessage with the specified body and send to the server
+    *
+    * @param body the body for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(String body) {
+      ClientMessage message = createMessage(body);
+      sendMessage(message);
+      return message;
+   }
+
+   /**
+    * Create a new ClientMessage with the specified properties and send to the server
+    *
+    * @param properties the properties for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(Map<String, Object> properties) {
+      ClientMessage message = createMessage(properties);
+      sendMessage(message);
+      return message;
+   }
+
+   /**
+    * Create a new ClientMessage with the specified body and and properties and send to the server
+    *
+    * @param properties the properties for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(byte[] body, Map<String, Object> properties) {
+      ClientMessage message = createMessage(body);
+      sendMessage(message);
+      return message;
+   }
+
+   /**
+    * Create a new ClientMessage with the specified body and and properties and send to the server
+    *
+    * @param properties the properties for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(String body, Map<String, Object> properties) {
+      ClientMessage message = createMessage(body);
+      sendMessage(message);
+      return message;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/db095926/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java
new file mode 100644
index 0000000..90e567f
--- /dev/null
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.junit;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.FileDeploymentManager;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JUnit Rule that embeds an ActiveMQ Artemis server into a test.
+ */
+public class EmbeddedActiveMQResource extends ExternalResource {
+
+   static final String SERVER_NAME = "embedded-server";
+
+   Logger log = LoggerFactory.getLogger(this.getClass());
+
+   boolean useDurableMessage = true;
+   boolean useDurableQueue = true;
+   long defaultReceiveTimeout = 50;
+
+   Configuration configuration;
+
+   EmbeddedActiveMQ server;
+
+   InternalClient internalClient;
+
+   /**
+    * Create a default EmbeddedActiveMQResource
+    */
+   public EmbeddedActiveMQResource() {
+      configuration = new ConfigurationImpl().setName(SERVER_NAME).setPersistenceEnabled(false).setSecurityEnabled(false).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      init();
+   }
+
+   /**
+    * Create a default EmbeddedActiveMQResource with the specified serverId
+    *
+    * @param serverId server id
+    */
+   public EmbeddedActiveMQResource(int serverId) {
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.SERVER_ID_PROP_NAME, serverId);
+      TransportConfiguration transportConfiguration = new TransportConfiguration(InVMAcceptorFactory.class.getName(), params);
+      configuration = new ConfigurationImpl().setName(SERVER_NAME + "-" + serverId).setPersistenceEnabled(false).setSecurityEnabled(false).addAcceptorConfiguration(transportConfiguration);
+      init();
+   }
+
+   /**
+    * Creates an EmbeddedActiveMQResource using the specified configuration
+    *
+    * @param configuration ActiveMQServer configuration
+    */
+   public EmbeddedActiveMQResource(Configuration configuration) {
+      this.configuration = configuration;
+      init();
+   }
+
+   /**
+    * Creates an EmbeddedActiveMQResource using the specified configuration file
+    *
+    * @param filename ActiveMQServer configuration file name
+    */
+   public EmbeddedActiveMQResource(String filename) {
+      if (filename == null) {
+         throw new IllegalArgumentException("ActiveMQServer configuration file name cannot be null");
+      }
+      FileDeploymentManager deploymentManager = new FileDeploymentManager(filename);
+      FileConfiguration config = new FileConfiguration();
+      deploymentManager.addDeployable(config);
+      try {
+         deploymentManager.readConfiguration();
+      }
+      catch (Exception ex) {
+         throw new EmbeddedActiveMQResourceException(String.format("Failed to read configuration file %s", filename), ex);
+      }
+      this.configuration = config;
+      init();
+   }
+
+   /**
+    * Adds properties to a ClientMessage
+    *
+    * @param message
+    * @param properties
+    */
+   public static void addMessageProperties(ClientMessage message, Map<String, Object> properties) {
+      if (properties != null && properties.size() > 0) {
+         for (Map.Entry<String, Object> property : properties.entrySet()) {
+            message.putObjectProperty(property.getKey(), property.getValue());
+         }
+      }
+   }
+
+   private void init() {
+      if (server == null) {
+         server = new EmbeddedActiveMQ().setConfiguration(configuration);
+      }
+   }
+
+   /**
+    * Start the embedded ActiveMQ Artemis server.
+    * <p/>
+    * The server will normally be started by JUnit using the before() method.  This method allows the server to
+    * be started manually to support advanced testing scenarios.
+    */
+   public void start() {
+      try {
+         server.start();
+      }
+      catch (Exception ex) {
+         throw new RuntimeException(String.format("Exception encountered starting %s: %s", server.getClass().getName(), this.getServerName()), ex);
+      }
+
+      configuration = server.getActiveMQServer().getConfiguration();
+   }
+
+   /**
+    * Stop the embedded ActiveMQ Artemis server
+    * <p/>
+    * The server will normally be stopped by JUnit using the after() method.  This method allows the server to
+    * be stopped manually to support advanced testing scenarios.
+    */
+   public void stop() {
+      if (internalClient != null) {
+         internalClient.stop();
+         internalClient = null;
+      }
+
+      if (server != null) {
+         try {
+            server.stop();
+         }
+         catch (Exception ex) {
+            log.warn(String.format("Exception encountered stopping %s: %s", server.getClass().getSimpleName(), this.getServerName()), ex);
+         }
+      }
+   }
+
+   /**
+    * Invoked by JUnit to setup the resource - start the embedded ActiveMQ Artemis server
+    * <p/>
+    */
+   @Override
+   protected void before() throws Throwable {
+      log.info("Starting {}: {}", this.getClass().getSimpleName(), getServerName());
+
+      this.start();
+
+      super.before();
+   }
+
+   /**
+    * Invoked by JUnit to tear down the resource - stops the embedded ActiveMQ Artemis server
+    */
+   @Override
+   protected void after() {
+      log.info("Stopping {}: {}", this.getClass().getSimpleName(), getServerName());
+
+      super.after();
+
+      this.stop();
+   }
+
+   public boolean isUseDurableMessage() {
+      return useDurableMessage;
+   }
+
+   /**
+    * Disables/Enables creating durable messages.  By default, durable messages are created
+    *
+    * @param useDurableMessage if true, durable messages will be created
+    */
+   public void setUseDurableMessage(boolean useDurableMessage) {
+      this.useDurableMessage = useDurableMessage;
+   }
+
+   public boolean isUseDurableQueue() {
+      return useDurableQueue;
+   }
+
+   /**
+    * Disables/Enables creating durable queues.  By default, durable queues are created
+    *
+    * @param useDurableQueue if true, durable messages will be created
+    */
+   public void setUseDurableQueue(boolean useDurableQueue) {
+      this.useDurableQueue = useDurableQueue;
+   }
+
+   public long getDefaultReceiveTimeout() {
+      return defaultReceiveTimeout;
+   }
+
+   /**
+    * Sets the default timeout in milliseconds used when receiving messages.  Defaults to 50 milliseconds
+    *
+    * @param defaultReceiveTimeout received timeout in milliseconds
+    */
+   public void setDefaultReceiveTimeout(long defaultReceiveTimeout) {
+      this.defaultReceiveTimeout = defaultReceiveTimeout;
+   }
+
+   /**
+    * Get the EmbeddedActiveMQ server.
+    * <p/>
+    * This may be required for advanced configuration of the EmbeddedActiveMQ server.
+    *
+    * @return the embedded ActiveMQ broker
+    */
+   public EmbeddedActiveMQ getServer() {
+      return server;
+   }
+
+   /**
+    * Get the name of the EmbeddedActiveMQ server
+    *
+    * @return name of the embedded server
+    */
+   public String getServerName() {
+      String name = "unknown";
+      ActiveMQServer activeMQServer = server.getActiveMQServer();
+      if (activeMQServer != null) {
+         name = activeMQServer.getConfiguration().getName();
+      }
+      else if (configuration != null) {
+         name = configuration.getName();
+      }
+
+      return name;
+   }
+
+   /**
+    * Get the VM URL for the embedded EmbeddedActiveMQ server
+    *
+    * @return the VM URL for the embedded server
+    */
+   public String getVmURL() {
+      String vmURL = "vm://0";
+      for (TransportConfiguration transportConfiguration : configuration.getAcceptorConfigurations()) {
+         Map<String, Object> params = transportConfiguration.getParams();
+         if (params != null && params.containsKey(TransportConstants.SERVER_ID_PROP_NAME)) {
+            vmURL = "vm://" + params.get(TransportConstants.SERVER_ID_PROP_NAME);
+         }
+      }
+
+      return vmURL;
+   }
+
+   public long getMessageCount(String queueName) {
+      return getMessageCount(SimpleString.toSimpleString(queueName));
+   }
+
+   /**
+    * Get the number of messages in a specific queue.
+    *
+    * @param queueName the name of the queue
+    * @return the number of messages in the queue; -1 if queue is not found
+    */
+   public long getMessageCount(SimpleString queueName) {
+      Queue queue = locateQueue(queueName);
+      if (queue == null) {
+         log.warn("getMessageCount(queueName) - queue {} not found; returning -1", queueName.toString());
+         return -1;
+      }
+
+      return queue.getMessageCount();
+   }
+
+   public Queue locateQueue(String queueName) {
+      return locateQueue(SimpleString.toSimpleString(queueName));
+   }
+
+   public Queue locateQueue(SimpleString queueName) {
+      return server.getActiveMQServer().locateQueue(queueName);
+   }
+
+   public List<Queue> getBoundQueues(String address) {
+      return getBoundQueues(SimpleString.toSimpleString(address));
+   }
+
+   public List<Queue> getBoundQueues(SimpleString address) {
+      if (address == null) {
+         throw new IllegalArgumentException("getBoundQueues( address ) - address cannot be null");
+      }
+      List<Queue> boundQueues = new java.util.LinkedList<>();
+
+      BindingQueryResult bindingQueryResult = null;
+      try {
+         bindingQueryResult = server.getActiveMQServer().bindingQuery(address);
+      }
+      catch (Exception e) {
+         throw new EmbeddedActiveMQResourceException(String.format("getBoundQueues( %s ) - bindingQuery( %s ) failed", address.toString(), address.toString()));
+      }
+      if (bindingQueryResult.isExists()) {
+         for (SimpleString queueName : bindingQueryResult.getQueueNames()) {
+            boundQueues.add(server.getActiveMQServer().locateQueue(queueName));
+         }
+      }
+      return boundQueues;
+   }
+
+   public Queue createQueue(String name) {
+      return createQueue(SimpleString.toSimpleString(name), SimpleString.toSimpleString(name));
+   }
+
+   public Queue createQueue(String address, String name) {
+      return createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(name));
+   }
+
+   public Queue createQueue(SimpleString address, SimpleString name) {
+      SimpleString filter = null;
+      boolean temporary = false;
+      Queue queue = null;
+      try {
+         queue = server.getActiveMQServer().createQueue(address, name, filter, isUseDurableQueue(), temporary);
+      }
+      catch (Exception ex) {
+         throw new EmbeddedActiveMQResourceException(String.format("Failed to create queue: queueName = %s, name = %s", address.toString(), name.toString()), ex);
+      }
+
+      return queue;
+   }
+
+   public void createSharedQueue(String name, String user) {
+      createSharedQueue(SimpleString.toSimpleString(name), SimpleString.toSimpleString(name), SimpleString.toSimpleString(user));
+   }
+
+   public void createSharedQueue(String address, String name, String user) {
+      createSharedQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(name), SimpleString.toSimpleString(user));
+   }
+
+   public void createSharedQueue(SimpleString address, SimpleString name, SimpleString user) {
+      SimpleString filter = null;
+      try {
+         server.getActiveMQServer().createSharedQueue(address, name, filter, user, isUseDurableQueue());
+      }
+      catch (Exception ex) {
+         throw new EmbeddedActiveMQResourceException(String.format("Failed to create shared queue: queueName = %s, name = %s, user = %s", address.toString(), name.toString(), user.toString()), ex);
+      }
+   }
+
+   /**
+    * Create a ClientMessage
+    * <p>
+    * If useDurableMessage is false, a non-durable message is created.  Otherwise, a durable message is created
+    *
+    * @return a new ClientMessage
+    */
+   public ClientMessage createMessage() {
+      getInternalClient();
+      return internalClient.createMessage(isUseDurableMessage());
+   }
+
+   /**
+    * Create a ClientMessage with the specified body
+    * <p>
+    * If useDurableMessage is false, a non-durable message is created.  Otherwise, a durable message is created
+    *
+    * @param body the body for the new message
+    * @return a new ClientMessage with the specified body
+    */
+   public ClientMessage createMessage(byte[] body) {
+      getInternalClient();
+      ClientMessage message = internalClient.createMessage(isUseDurableMessage());
+
+      if (body != null) {
+         message.writeBodyBufferBytes(body);
+      }
+
+      return message;
+   }
+
+   /**
+    * Create a ClientMessage with the specified body
+    * <p>
+    * If useDurableMessage is false, a non-durable message is created.  Otherwise, a durable message is created
+    *
+    * @param body the body for the new message
+    * @return a new ClientMessage with the specified body
+    */
+   public ClientMessage createMessage(String body) {
+      getInternalClient();
+      ClientMessage message = internalClient.createMessage(isUseDurableMessage());
+
+      if (body != null) {
+         message.writeBodyBufferString(body);
+      }
+
+      return message;
+   }
+
+   /**
+    * Create a ClientMessage with the specified message properties
+    * <p>
+    * If useDurableMessage is false, a non-durable message is created.  Otherwise, a durable message is created
+    *
+    * @param properties message properties for the new message
+    * @return a new ClientMessage with the specified message properties
+    */
+   public ClientMessage createMessageWithProperties(Map<String, Object> properties) {
+      getInternalClient();
+      ClientMessage message = internalClient.createMessage(isUseDurableMessage());
+
+      addMessageProperties(message, properties);
+
+      return message;
+   }
+
+   /**
+    * Create a ClientMessage with the specified body and message properties
+    * <p>
+    * If useDurableMessage is false, a non-durable message is created.  Otherwise, a durable message is created
+    *
+    * @param body       the body for the new message
+    * @param properties message properties for the new message
+    * @return a new ClientMessage with the specified body and message properties
+    */
+   public ClientMessage createMessageWithProperties(byte[] body, Map<String, Object> properties) {
+      ClientMessage message = createMessage(body);
+
+      addMessageProperties(message, properties);
+
+      return message;
+   }
+
+   /**
+    * Create a ClientMessage with the specified body and message properties
+    * <p>
+    * If useDurableMessage is false, a non-durable message is created.  Otherwise, a durable message is created
+    *
+    * @param body       the body for the new message
+    * @param properties message properties for the new message
+    * @return a new ClientMessage with the specified body and message properties
+    */
+   public ClientMessage createMessageWithProperties(String body, Map<String, Object> properties) {
+      ClientMessage message = createMessage(body);
+
+      addMessageProperties(message, properties);
+
+      return message;
+   }
+
+   /**
+    * Send a message to an address
+    *
+    * @param address the target queueName for the message
+    * @param message the message to send
+    */
+   public void sendMessage(String address, ClientMessage message) {
+      sendMessage(SimpleString.toSimpleString(address), message);
+   }
+
+   /**
+    * Create a new message with the specified body, and send the message to an address
+    *
+    * @param address the target queueName for the message
+    * @param body    the body for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(String address, byte[] body) {
+      return sendMessage(SimpleString.toSimpleString(address), body);
+   }
+
+   /**
+    * Create a new message with the specified body, and send the message to an address
+    *
+    * @param address the target queueName for the message
+    * @param body    the body for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(String address, String body) {
+      return sendMessage(SimpleString.toSimpleString(address), body);
+   }
+
+   /**
+    * Create a new message with the specified properties, and send the message to an address
+    *
+    * @param address    the target queueName for the message
+    * @param properties message properties for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessageWithProperties(String address, Map<String, Object> properties) {
+      return sendMessageWithProperties(SimpleString.toSimpleString(address), properties);
+   }
+
+   /**
+    * Create a new message with the specified body and properties, and send the message to an address
+    *
+    * @param address    the target queueName for the message
+    * @param body       the body for the new message
+    * @param properties message properties for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessageWithProperties(String address, byte[] body, Map<String, Object> properties) {
+      return sendMessageWithProperties(SimpleString.toSimpleString(address), body, properties);
+   }
+
+   /**
+    * Create a new message with the specified body and properties, and send the message to an address
+    *
+    * @param address    the target queueName for the message
+    * @param body       the body for the new message
+    * @param properties message properties for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessageWithProperties(String address, String body, Map<String, Object> properties) {
+      return sendMessageWithProperties(SimpleString.toSimpleString(address), body, properties);
+   }
+
+   /**
+    * Send a message to an queueName
+    *
+    * @param address the target queueName for the message
+    * @param message the message to send
+    */
+   public void sendMessage(SimpleString address, ClientMessage message) {
+      if (address == null) {
+         throw new IllegalArgumentException("sendMessage failure - queueName is required");
+      }
+      else if (message == null) {
+         throw new IllegalArgumentException("sendMessage failure - a ClientMessage is required");
+      }
+
+      getInternalClient();
+      internalClient.sendMessage(address, message);
+   }
+
+   /**
+    * Create a new message with the specified body, and send the message to an queueName
+    *
+    * @param address the target queueName for the message
+    * @param body    the body for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(SimpleString address, byte[] body) {
+      ClientMessage message = createMessage(body);
+      sendMessage(address, message);
+      return message;
+   }
+
+   /**
+    * Create a new message with the specified body, and send the message to an queueName
+    *
+    * @param address the target queueName for the message
+    * @param body    the body for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessage(SimpleString address, String body) {
+      ClientMessage message = createMessage(body);
+      sendMessage(address, message);
+      return message;
+   }
+
+   /**
+    * Create a new message with the specified properties, and send the message to an queueName
+    *
+    * @param address    the target queueName for the message
+    * @param properties message properties for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessageWithProperties(SimpleString address, Map<String, Object> properties) {
+      ClientMessage message = createMessageWithProperties(properties);
+      sendMessage(address, message);
+      return message;
+   }
+
+   /**
+    * Create a new message with the specified body and properties, and send the message to an queueName
+    *
+    * @param address    the target queueName for the message
+    * @param body       the body for the new message
+    * @param properties message properties for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessageWithProperties(SimpleString address, byte[] body, Map<String, Object> properties) {
+      ClientMessage message = createMessageWithProperties(body, properties);
+      sendMessage(address, message);
+      return message;
+   }
+
+   /**
+    * Create a new message with the specified body and properties, and send the message to an queueName
+    *
+    * @param address    the target queueName for the message
+    * @param body       the body for the new message
+    * @param properties message properties for the new message
+    * @return the message that was sent
+    */
+   public ClientMessage sendMessageWithProperties(SimpleString address, String body, Map<String, Object> properties) {
+
+      ClientMessage message = createMessageWithProperties(body, properties);
+      sendMessage(address, message);
+      return message;
+   }
+
+   /**
+    * Receive a message from the specified queue using the default receive timeout
+    *
+    * @param queueName name of the source queue
+    * @return the received ClientMessage, null if the receive timed-out
+    */
+   public ClientMessage receiveMessage(String queueName) {
+      return receiveMessage(SimpleString.toSimpleString(queueName));
+   }
+
+   /**
+    * Receive a message from the specified queue using the specified receive timeout
+    *
+    * @param queueName name of the source queue
+    * @param timeout   receive timeout in milliseconds
+    * @return the received ClientMessage, null if the receive timed-out
+    */
+   public ClientMessage receiveMessage(String queueName, long timeout) {
+      return receiveMessage(SimpleString.toSimpleString(queueName), timeout);
+   }
+
+   /**
+    * Receive a message from the specified queue using the default receive timeout
+    *
+    * @param queueName name of the source queue
+    * @return the received ClientMessage, null if the receive timed-out
+    */
+   public ClientMessage receiveMessage(SimpleString queueName) {
+      final boolean browseOnly = false;
+      return getInternalClient().receiveMessage(queueName, defaultReceiveTimeout, browseOnly);
+   }
+
+   /**
+    * Receive a message from the specified queue using the specified receive timeout
+    *
+    * @param queueName name of the source queue
+    * @param timeout   receive timeout in milliseconds
+    * @return the received ClientMessage, null if the receive timed-out
+    */
+   public ClientMessage receiveMessage(SimpleString queueName, long timeout) {
+      final boolean browseOnly = false;
+      return getInternalClient().receiveMessage(queueName, timeout, browseOnly);
+   }
+
+   /**
+    * Browse a message (receive but do not consume) from the specified queue using the default receive timeout
+    *
+    * @param queueName name of the source queue
+    * @return the received ClientMessage, null if the receive timed-out
+    */
+   public ClientMessage browseMessage(String queueName) {
+      return browseMessage(SimpleString.toSimpleString(queueName), defaultReceiveTimeout);
+   }
+
+   /**
+    * Browse a message (receive but do not consume) a message from the specified queue using the specified receive timeout
+    *
+    * @param queueName name of the source queue
+    * @param timeout   receive timeout in milliseconds
+    * @return the received ClientMessage, null if the receive timed-out
+    */
+   public ClientMessage browseMessage(String queueName, long timeout) {
+      return browseMessage(SimpleString.toSimpleString(queueName), timeout);
+   }
+
+   /**
+    * Browse a message (receive but do not consume) from the specified queue using the default receive timeout
+    *
+    * @param queueName name of the source queue
+    * @return the received ClientMessage, null if the receive timed-out
+    */
+   public ClientMessage browseMessage(SimpleString queueName) {
+      final boolean browseOnly = true;
+      return getInternalClient().receiveMessage(queueName, defaultReceiveTimeout, browseOnly);
+   }
+
+   /**
+    * Browse a message (receive but do not consume) a message from the specified queue using the specified receive timeout
+    *
+    * @param queueName name of the source queue
+    * @param timeout   receive timeout in milliseconds
+    * @return the received ClientMessage, null if the receive timed-out
+    */
+   public ClientMessage browseMessage(SimpleString queueName, long timeout) {
+      final boolean browseOnly = true;
+      return getInternalClient().receiveMessage(queueName, timeout, browseOnly);
+   }
+
+   private InternalClient getInternalClient() {
+      if (internalClient == null) {
+         log.info("Creating Internal Client");
+         internalClient = new InternalClient();
+         internalClient.start();
+      }
+
+      return internalClient;
+   }
+
+   public static class EmbeddedActiveMQResourceException extends RuntimeException {
+
+      public EmbeddedActiveMQResourceException(String message) {
+         super(message);
+      }
+
+      public EmbeddedActiveMQResourceException(String message, Exception cause) {
+         super(message, cause);
+      }
+   }
+
+   private class InternalClient {
+
+      ServerLocator serverLocator;
+      ClientSessionFactory sessionFactory;
+      ClientSession session;
+      ClientProducer producer;
+
+      InternalClient() {
+      }
+
+      void start() {
+         log.info("Starting {}", this.getClass().getSimpleName());
+         try {
+            serverLocator = ActiveMQClient.createServerLocator(getVmURL());
+            sessionFactory = serverLocator.createSessionFactory();
+         }
+         catch (RuntimeException runtimeEx) {
+            throw runtimeEx;
+         }
+         catch (Exception ex) {
+            throw new EmbeddedActiveMQResourceException("Internal Client creation failure", ex);
+         }
+
+         try {
+            session = sessionFactory.createSession();
+            producer = session.createProducer((String) null);
+            session.start();
+         }
+         catch (ActiveMQException amqEx) {
+            throw new EmbeddedActiveMQResourceException("Internal Client creation failure", amqEx);
+         }
+      }
+
+      void stop() {
+         if (producer != null) {
+            try {
+               producer.close();
+            }
+            catch (ActiveMQException amqEx) {
+               log.warn("ActiveMQException encountered closing InternalClient ClientProducer - ignoring", amqEx);
+            }
+            finally {
+               producer = null;
+            }
+         }
+         if (session != null) {
+            try {
+               session.close();
+            }
+            catch (ActiveMQException amqEx) {
+               log.warn("ActiveMQException encountered closing InternalClient ClientSession - ignoring", amqEx);
+            }
+            finally {
+               session = null;
+            }
+         }
+         if (sessionFactory != null) {
+            sessionFactory.close();
+            sessionFactory = null;
+         }
+         if (serverLocator != null) {
+            serverLocator.close();
+            serverLocator = null;
+         }
+
+      }
+
+      public ClientMessage createMessage(boolean durable) {
+         checkSession();
+
+         return session.createMessage(durable);
+      }
+
+      public void sendMessage(SimpleString address, ClientMessage message) {
+         checkSession();
+         if (producer == null) {
+            throw new IllegalStateException("ClientProducer is null - has the InternalClient been started?");
+         }
+
+         try {
+            producer.send(address, message);
+         }
+         catch (ActiveMQException amqEx) {
+            throw new EmbeddedActiveMQResourceException(String.format("Failed to send message to %s", address.toString()), amqEx);
+         }
+      }
+
+      public ClientMessage receiveMessage(SimpleString address, long timeout, boolean browseOnly) {
+         checkSession();
+
+         ClientConsumer consumer = null;
+         try {
+            consumer = session.createConsumer(address, browseOnly);
+         }
+         catch (ActiveMQException amqEx) {
+            throw new EmbeddedActiveMQResourceException(String.format("Failed to create consumer for %s", address.toString()), amqEx);
+         }
+
+         ClientMessage message = null;
+         if (timeout > 0) {
+            try {
+               message = consumer.receive(timeout);
+            }
+            catch (ActiveMQException amqEx) {
+               throw new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive( timeout = %d ) for %s failed", timeout, address.toString()), amqEx);
+            }
+         }
+         else if (timeout == 0) {
+            try {
+               message = consumer.receiveImmediate();
+            }
+            catch (ActiveMQException amqEx) {
+               throw new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receiveImmediate() for %s failed", address.toString()), amqEx);
+            }
+         }
+         else {
+            try {
+               message = consumer.receive();
+            }
+            catch (ActiveMQException amqEx) {
+               throw new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive() for %s failed", address.toString()), amqEx);
+            }
+         }
+
+         return message;
+      }
+
+      void checkSession() {
+         getInternalClient();
+         if (session == null) {
+            throw new IllegalStateException("ClientSession is null - has the InternalClient been started?");
+         }
+      }
+   }
+}


Mime
View raw message