activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [25/52] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 Rename HornetQ* classes to ActiveMQ*
Date Tue, 18 Nov 2014 23:38:18 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageConsumer.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageConsumer.java
new file mode 100644
index 0000000..19c74b7
--- /dev/null
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageConsumer.java
@@ -0,0 +1,334 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+
+/**
+ * A wrapper for a message consumer
+ *
+ * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:jesper.pedersen@jboss.org">Jesper Pedersen</a>
+ */
+public class ActiveMQRAMessageConsumer implements MessageConsumer
+{
+   /** Whether trace is enabled */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /** The wrapped message consumer */
+   protected MessageConsumer consumer;
+
+   /** The session for this consumer */
+   protected ActiveMQRASession session;
+
+   /**
+    * Create a new wrapper
+    * @param consumer the consumer
+    * @param session the session
+    */
+   public ActiveMQRAMessageConsumer(final MessageConsumer consumer, final ActiveMQRASession session)
+   {
+      this.consumer = consumer;
+      this.session = session;
+
+      if (ActiveMQRAMessageConsumer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("new ActiveMQMessageConsumer " + this +
+                                            " consumer=" +
+                                            consumer +
+                                            " session=" +
+                                            session);
+      }
+   }
+
+   /**
+    * Close
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void close() throws JMSException
+   {
+      if (ActiveMQRAMessageConsumer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("close " + this);
+      }
+      try
+      {
+         closeConsumer();
+      }
+      finally
+      {
+         session.removeConsumer(this);
+      }
+   }
+
+   /**
+    * Check state
+    * @exception JMSException Thrown if an error occurs
+    */
+   void checkState() throws JMSException
+   {
+      if (ActiveMQRAMessageConsumer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("checkState()");
+      }
+      session.checkState();
+   }
+
+   /**
+    * Get message listener
+    * @return The listener
+    * @exception JMSException Thrown if an error occurs
+    */
+   public MessageListener getMessageListener() throws JMSException
+   {
+      if (ActiveMQRAMessageConsumer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getMessageListener()");
+      }
+
+      checkState();
+      session.checkStrict();
+      return consumer.getMessageListener();
+   }
+
+   /**
+    * Set message listener
+    * @param listener The listener
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setMessageListener(final MessageListener listener) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         checkState();
+         session.checkStrict();
+         if (listener == null)
+         {
+            consumer.setMessageListener(null);
+         }
+         else
+         {
+            consumer.setMessageListener(wrapMessageListener(listener));
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   /**
+    * Get message selector
+    * @return The selector
+    * @exception JMSException Thrown if an error occurs
+    */
+   public String getMessageSelector() throws JMSException
+   {
+      if (ActiveMQRAMessageConsumer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getMessageSelector()");
+      }
+
+      checkState();
+      return consumer.getMessageSelector();
+   }
+
+   /**
+    * Receive
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Message receive() throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRAMessageConsumer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("receive " + this);
+         }
+
+         checkState();
+         Message message = consumer.receive();
+
+         if (ActiveMQRAMessageConsumer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("received " + this + " result=" + message);
+         }
+
+         if (message == null)
+         {
+            return null;
+         }
+         else
+         {
+            return wrapMessage(message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   /**
+    * Receive
+    * @param timeout The timeout value
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Message receive(final long timeout) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRAMessageConsumer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("receive " + this + " timeout=" + timeout);
+         }
+
+         checkState();
+         Message message = consumer.receive(timeout);
+
+         if (ActiveMQRAMessageConsumer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("received " + this + " result=" + message);
+         }
+
+         if (message == null)
+         {
+            return null;
+         }
+         else
+         {
+            return wrapMessage(message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   /**
+    * Receive
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Message receiveNoWait() throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRAMessageConsumer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("receiveNoWait " + this);
+         }
+
+         checkState();
+         Message message = consumer.receiveNoWait();
+
+         if (ActiveMQRAMessageConsumer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("received " + this + " result=" + message);
+         }
+
+         if (message == null)
+         {
+            return null;
+         }
+         else
+         {
+            return wrapMessage(message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   /**
+    * Close consumer
+    * @exception JMSException Thrown if an error occurs
+    */
+   void closeConsumer() throws JMSException
+   {
+      if (ActiveMQRAMessageConsumer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("closeConsumer()");
+      }
+
+      consumer.close();
+   }
+
+   /**
+    * Wrap message
+    * @param message The message to be wrapped
+    * @return The wrapped message
+    */
+   Message wrapMessage(final Message message)
+   {
+      if (ActiveMQRAMessageConsumer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("wrapMessage(" + message + ")");
+      }
+
+      if (message instanceof BytesMessage)
+      {
+         return new ActiveMQRABytesMessage((BytesMessage)message, session);
+      }
+      else if (message instanceof MapMessage)
+      {
+         return new ActiveMQRAMapMessage((MapMessage)message, session);
+      }
+      else if (message instanceof ObjectMessage)
+      {
+         return new ActiveMQRAObjectMessage((ObjectMessage)message, session);
+      }
+      else if (message instanceof StreamMessage)
+      {
+         return new ActiveMQRAStreamMessage((StreamMessage)message, session);
+      }
+      else if (message instanceof TextMessage)
+      {
+         return new ActiveMQRATextMessage((TextMessage)message, session);
+      }
+      return new ActiveMQRAMessage(message, session);
+   }
+
+   /**
+    * Wrap message listener
+    * @param listener The listener to be wrapped
+    * @return The wrapped listener
+    */
+   MessageListener wrapMessageListener(final MessageListener listener)
+   {
+      if (ActiveMQRAMessageConsumer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getMessageSelector()");
+      }
+
+      return new ActiveMQRAMessageListener(listener, this);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageListener.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageListener.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageListener.java
new file mode 100644
index 0000000..42f0a42
--- /dev/null
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageListener.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+
+/**
+ * A wrapper for a message listener
+ * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:jesper.pedersen@jboss.org">Jesper Pedersen</a>
+ */
+public class ActiveMQRAMessageListener implements MessageListener
+{
+   /** Whether trace is enabled */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /** The message listener */
+   private final MessageListener listener;
+
+   /** The consumer */
+   private final ActiveMQRAMessageConsumer consumer;
+
+   /**
+    * Create a new wrapper
+    * @param listener the listener
+    * @param consumer the consumer
+    */
+   public ActiveMQRAMessageListener(final MessageListener listener, final ActiveMQRAMessageConsumer consumer)
+   {
+      if (ActiveMQRAMessageListener.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + listener + ", " + consumer + ")");
+      }
+
+      this.listener = listener;
+      this.consumer = consumer;
+   }
+
+   /**
+    * On message
+    * @param message The message
+    */
+   public void onMessage(Message message)
+   {
+      if (ActiveMQRAMessageListener.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("onMessage(" + message + ")");
+      }
+
+      message = consumer.wrapMessage(message);
+      listener.onMessage(message);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageProducer.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageProducer.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageProducer.java
new file mode 100644
index 0000000..4ba4271
--- /dev/null
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageProducer.java
@@ -0,0 +1,473 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.jms.CompletionListener;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+
+/**
+ * ActiveMQMessageProducer.
+ *
+ * @author <a href="adrian@jboss.com">Adrian Brock</a>
+ * @author <a href="jesper.pedersen@jboss.org">Jesper Pedersen</a>
+ */
+public class ActiveMQRAMessageProducer implements MessageProducer
+{
+   /** Whether trace is enabled */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /** The wrapped message producer */
+   protected MessageProducer producer;
+
+   /** The session for this consumer */
+   protected ActiveMQRASession session;
+
+   /**
+    * Create a new wrapper
+    * @param producer the producer
+    * @param session the session
+    */
+   public ActiveMQRAMessageProducer(final MessageProducer producer, final ActiveMQRASession session)
+   {
+      this.producer = producer;
+      this.session = session;
+
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("new ActiveMQMessageProducer " + this +
+                                            " producer=" +
+                                            producer +
+                                            " session=" +
+                                            session);
+      }
+   }
+
+   /**
+    * Close
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void close() throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("close " + this);
+      }
+      try
+      {
+         closeProducer();
+      }
+      finally
+      {
+         session.removeProducer(this);
+      }
+   }
+
+   /**
+    * Send message
+    * @param destination The destination
+    * @param message The message
+    * @param deliveryMode The delivery mode
+    * @param priority The priority
+    * @param timeToLive The time to live
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void send(final Destination destination,
+                    final Message message,
+                    final int deliveryMode,
+                    final int priority,
+                    final long timeToLive) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRAMessageProducer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("send " + this +
+                                               " destination=" +
+                                               destination +
+                                               " message=" +
+                                               message +
+                                               " deliveryMode=" +
+                                               deliveryMode +
+                                               " priority=" +
+                                               priority +
+                                               " ttl=" +
+                                               timeToLive);
+         }
+
+         checkState();
+
+         producer.send(destination, message, deliveryMode, priority, timeToLive);
+
+         if (ActiveMQRAMessageProducer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   /**
+    * Send message
+    * @param destination The destination
+    * @param message The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void send(final Destination destination, final Message message) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRAMessageProducer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("send " + this + " destination=" + destination + " message=" + message);
+         }
+
+         checkState();
+
+         producer.send(destination, message);
+
+         if (ActiveMQRAMessageProducer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   /**
+    * Send message
+    * @param message The message
+    * @param deliveryMode The delivery mode
+    * @param priority The priority
+    * @param timeToLive The time to live
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void send(final Message message, final int deliveryMode, final int priority, final long timeToLive) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRAMessageProducer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("send " + this +
+                                               " message=" +
+                                               message +
+                                               " deliveryMode=" +
+                                               deliveryMode +
+                                               " priority=" +
+                                               priority +
+                                               " ttl=" +
+                                               timeToLive);
+         }
+
+         checkState();
+
+         producer.send(message, deliveryMode, priority, timeToLive);
+
+         if (ActiveMQRAMessageProducer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   /**
+    * Send message
+    * @param message The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void send(final Message message) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRAMessageProducer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("send " + this + " message=" + message);
+         }
+
+         checkState();
+
+         producer.send(message);
+
+         if (ActiveMQRAMessageProducer.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   /**
+    * Get the delivery mode
+    * @return The mode
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int getDeliveryMode() throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getDeliveryMode()");
+      }
+
+      return producer.getDeliveryMode();
+   }
+
+   /**
+    * Get the destination
+    * @return The destination
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Destination getDestination() throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getDestination()");
+      }
+
+      return producer.getDestination();
+   }
+
+   /**
+    * Disable message id
+    * @return True if disable
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean getDisableMessageID() throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getDisableMessageID()");
+      }
+
+      return producer.getDisableMessageID();
+   }
+
+   /**
+    * Disable message timestamp
+    * @return True if disable
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean getDisableMessageTimestamp() throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getDisableMessageTimestamp()");
+      }
+
+      return producer.getDisableMessageTimestamp();
+   }
+
+   /**
+    * Get the priority
+    * @return The priority
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int getPriority() throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getPriority()");
+      }
+
+      return producer.getPriority();
+   }
+
+   /**
+    * Get the time to live
+    * @return The ttl
+    * @exception JMSException Thrown if an error occurs
+    */
+   public long getTimeToLive() throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getTimeToLive()");
+      }
+
+      return producer.getTimeToLive();
+   }
+
+   /**
+    * Set the delivery mode
+    * @param deliveryMode The mode
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setDeliveryMode(final int deliveryMode) throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setDeliveryMode(" + deliveryMode + ")");
+      }
+
+      producer.setDeliveryMode(deliveryMode);
+   }
+
+   /**
+    * Set disable message id
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setDisableMessageID(final boolean value) throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setDisableMessageID(" + value + ")");
+      }
+
+      producer.setDisableMessageID(value);
+   }
+
+   /**
+    * Set disable message timestamp
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setDisableMessageTimestamp(final boolean value) throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setDisableMessageTimestamp(" + value + ")");
+      }
+
+      producer.setDisableMessageTimestamp(value);
+   }
+
+   /**
+    * Set the priority
+    * @param defaultPriority The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setPriority(final int defaultPriority) throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setPriority(" + defaultPriority + ")");
+      }
+
+      producer.setPriority(defaultPriority);
+   }
+
+   /**
+    * Set the ttl
+    * @param timeToLive The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setTimeToLive(final long timeToLive) throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setTimeToLive(" + timeToLive + ")");
+      }
+
+      producer.setTimeToLive(timeToLive);
+   }
+
+   @Override
+   public void setDeliveryDelay(long deliveryDelay) throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setDeliveryDelay(" + deliveryDelay + ")");
+      }
+      producer.setDeliveryDelay(deliveryDelay);
+   }
+
+   @Override
+   public long getDeliveryDelay() throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getDeliveryDelay()");
+      }
+      return producer.getDeliveryDelay();
+   }
+
+   @Override
+   public void send(Message message, CompletionListener completionListener) throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("send(" + message + ", " + completionListener + ")");
+      }
+      producer.send(message, completionListener);
+   }
+
+   @Override
+   public void send(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("send(" + message + ", " + deliveryMode + ", " + priority + ", " + timeToLive +
+                  ", " + completionListener + ")");
+      }
+      producer.send(message, deliveryMode, priority, timeToLive, completionListener);
+   }
+
+   @Override
+   public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("send(" + destination + ", " + message + ", " + completionListener + ")");
+      }
+      producer.send(destination, message, completionListener);
+   }
+
+   @Override
+   public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException
+   {
+      if (ActiveMQRAMessageProducer.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("send(" + destination + ", " + message + ", " + deliveryMode + ", " + priority +
+                  ", " + timeToLive + ", " + completionListener + ")");
+      }
+      producer.send(destination, message, deliveryMode, priority, timeToLive, completionListener);
+   }
+
+    /**
+    * Check state
+    * @exception JMSException Thrown if an error occurs
+    */
+   void checkState() throws JMSException
+   {
+      session.checkState();
+   }
+
+   /**
+    * Close producer
+    * @exception JMSException Thrown if an error occurs
+    */
+   void closeProducer() throws JMSException
+   {
+      producer.close();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMetaData.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMetaData.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMetaData.java
new file mode 100644
index 0000000..d30c44d
--- /dev/null
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMetaData.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.resource.ResourceException;
+import javax.resource.spi.ManagedConnectionMetaData;
+
+
+/**
+ * Managed connection meta data
+ *
+ * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:jesper.pedersen@jboss.org">Jesper Pedersen</a>
+ */
+public class ActiveMQRAMetaData implements ManagedConnectionMetaData
+{
+   /** Trace enabled */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /** The managed connection */
+   private final ActiveMQRAManagedConnection mc;
+
+   /**
+    * Constructor
+    * @param mc The managed connection
+    */
+   public ActiveMQRAMetaData(final ActiveMQRAManagedConnection mc)
+   {
+      if (ActiveMQRAMetaData.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + mc + ")");
+      }
+
+      this.mc = mc;
+   }
+
+   /**
+    * Get the EIS product name
+    * @return The name
+    * @exception ResourceException Thrown if operation fails
+    */
+   public String getEISProductName() throws ResourceException
+   {
+      if (ActiveMQRAMetaData.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getEISProductName()");
+      }
+
+      return "ActiveMQ";
+   }
+
+   /**
+    * Get the EIS product version
+    * @return The version
+    * @exception ResourceException Thrown if operation fails
+    */
+   public String getEISProductVersion() throws ResourceException
+   {
+      if (ActiveMQRAMetaData.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getEISProductVersion()");
+      }
+
+      return "2.0";
+   }
+
+   /**
+    * Get the user name
+    * @return The user name
+    * @exception ResourceException Thrown if operation fails
+    */
+   public String getUserName() throws ResourceException
+   {
+      if (ActiveMQRAMetaData.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getUserName()");
+      }
+
+      return mc.getUserName();
+   }
+
+   /**
+     * Get the maximum number of connections -- RETURNS 0
+     * @return The number
+     * @exception ResourceException Thrown if operation fails
+     */
+   public int getMaxConnections() throws ResourceException
+   {
+      if (ActiveMQRAMetaData.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getMaxConnections()");
+      }
+
+      return 0;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAObjectMessage.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAObjectMessage.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAObjectMessage.java
new file mode 100644
index 0000000..3fc843e
--- /dev/null
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAObjectMessage.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+
+/**
+ * A wrapper for a message
+ *
+ * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:jesper.pedersen@jboss.org">Jesper Pedersen</a>
+ */
+public class ActiveMQRAObjectMessage extends ActiveMQRAMessage implements ObjectMessage
+{
+   /** Whether trace is enabled */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /**
+    * Create a new wrapper
+    * @param message the message
+    * @param session the session
+    */
+   public ActiveMQRAObjectMessage(final ObjectMessage message, final ActiveMQRASession session)
+   {
+      super(message, session);
+
+      if (ActiveMQRAObjectMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + message + ", " + session + ")");
+      }
+   }
+
+   /**
+    * Get the object
+    * @return The object
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Serializable getObject() throws JMSException
+   {
+      if (ActiveMQRAObjectMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getObject()");
+      }
+
+      return ((ObjectMessage)message).getObject();
+   }
+
+   /**
+    * Set the object
+    * @param object The object
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setObject(final Serializable object) throws JMSException
+   {
+      if (ActiveMQRAObjectMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setObject(" + object + ")");
+      }
+
+      ((ObjectMessage)message).setObject(object);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAProperties.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAProperties.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAProperties.java
new file mode 100644
index 0000000..c796271
--- /dev/null
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAProperties.java
@@ -0,0 +1,358 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import java.io.Serializable;
+import java.util.Hashtable;
+
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.utils.DefaultSensitiveStringCodec;
+import org.apache.activemq.utils.PasswordMaskingUtil;
+import org.apache.activemq.utils.SensitiveDataCodec;
+
+
+/**
+ * The RA default properties - these are set in the ra.xml file
+ *
+ * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:jesper.pedersen@jboss.org">Jesper Pedersen</a>
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class ActiveMQRAProperties extends ConnectionFactoryProperties implements Serializable
+{
+   /**
+    * Serial version UID
+    */
+   static final long serialVersionUID = -2772367477755473248L;
+   /**
+    * Trace enabled
+    */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /**
+    * The user name
+    */
+   private String userName;
+
+   /**
+    * The password
+    */
+   private String password = null;
+
+   /**
+    * Use Local TX instead of XA
+    */
+   private Boolean localTx = false;
+
+
+   /**
+    * Class used to locate the Transaction Manager.
+    * Using JBoss5 as the default locator
+    */
+   private String transactionManagerLocatorClass = "org.apache.activemq.integration.jboss.tm.JBoss5TransactionManagerLocator;org.apache.activemq.integration.jboss.tm.JBoss4TransactionManagerLocator";
+
+   /**
+    * Method used to locate the TM
+    */
+   private String transactionManagerLocatorMethod = "getTm;getTM";
+
+   private static final int DEFAULT_SETUP_ATTEMPTS = -1;
+
+   private static final long DEFAULT_SETUP_INTERVAL = 2 * 1000;
+
+   private int setupAttempts = DEFAULT_SETUP_ATTEMPTS;
+
+   private long setupInterval = DEFAULT_SETUP_INTERVAL;
+
+   private Hashtable<?, ?> jndiParams;
+
+   private boolean useJNDI;
+
+   private boolean useMaskedPassword = false;
+
+   private String passwordCodec;
+
+   private boolean initialized = false;
+
+   private transient SensitiveDataCodec<String> codecInstance;
+
+   /**
+    * Class used to get a JChannel
+    */
+   private String jgroupsChannelLocatorClass;
+
+   /**
+    * Name used to locate a JChannel
+    */
+   private String jgroupsChannelRefName;
+
+   /**
+    * Constructor
+    */
+   public ActiveMQRAProperties()
+   {
+      if (ActiveMQRAProperties.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor()");
+      }
+   }
+
+   /**
+    * Get the user name
+    *
+    * @return The value
+    */
+   public String getUserName()
+   {
+      if (ActiveMQRAProperties.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getUserName()");
+      }
+
+      return userName;
+   }
+
+   /**
+    * Set the user name
+    *
+    * @param userName The value
+    */
+   public void setUserName(final String userName)
+   {
+      if (ActiveMQRAProperties.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setUserName(" + userName + ")");
+      }
+
+      this.userName = userName;
+   }
+
+   /**
+    * Get the password
+    *
+    * @return The value
+    */
+   public String getPassword()
+   {
+      if (ActiveMQRAProperties.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getPassword()");
+      }
+
+      return password;
+   }
+
+   /**
+    * Set the password
+    * Based on UseMaskedPassword property, the password can be
+    * plain text or encoded string. However we cannot decide
+    * which is the case at this moment, because we don't know
+    * when the UseMaskedPassword and PasswordCodec are loaded. So for the moment
+    * we just save the password.
+    *
+    * @param password The value
+    */
+   public void setPassword(final String password)
+   {
+      if (ActiveMQRAProperties.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setPassword(****)");
+      }
+
+      this.password = password;
+   }
+
+   /**
+    * @return the useJNDI
+    */
+   public boolean isUseJNDI()
+   {
+      return useJNDI;
+   }
+
+   /**
+    * @param value the useJNDI to set
+    */
+   public void setUseJNDI(final Boolean value)
+   {
+      useJNDI = value;
+   }
+
+   /**
+    * @return return the jndi params to use
+    */
+   public Hashtable<?, ?> getParsedJndiParams()
+   {
+      return jndiParams;
+   }
+
+
+   public void setParsedJndiParams(Hashtable<?, ?> params)
+   {
+      jndiParams = params;
+   }
+
+   /**
+    * Get the use XA flag
+    *
+    * @return The value
+    */
+   public Boolean getUseLocalTx()
+   {
+      if (ActiveMQRAProperties.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getUseLocalTx()");
+      }
+
+      return localTx;
+   }
+
+   /**
+    * Set the use XA flag
+    *
+    * @param localTx The value
+    */
+   public void setUseLocalTx(final Boolean localTx)
+   {
+      if (ActiveMQRAProperties.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setUseLocalTx(" + localTx + ")");
+      }
+
+      this.localTx = localTx;
+   }
+
+
+   public void setTransactionManagerLocatorClass(final String transactionManagerLocatorClass)
+   {
+      this.transactionManagerLocatorClass = transactionManagerLocatorClass;
+   }
+
+   public String getTransactionManagerLocatorClass()
+   {
+      return transactionManagerLocatorClass;
+   }
+
+   public String getTransactionManagerLocatorMethod()
+   {
+      return transactionManagerLocatorMethod;
+   }
+
+   public void setTransactionManagerLocatorMethod(final String transactionManagerLocatorMethod)
+   {
+      this.transactionManagerLocatorMethod = transactionManagerLocatorMethod;
+   }
+
+   public int getSetupAttempts()
+   {
+      return setupAttempts;
+   }
+
+   public void setSetupAttempts(Integer setupAttempts)
+   {
+      this.setupAttempts = setupAttempts;
+   }
+
+   public long getSetupInterval()
+   {
+      return setupInterval;
+   }
+
+   public void setSetupInterval(Long setupInterval)
+   {
+      this.setupInterval = setupInterval;
+   }
+
+   public boolean isUseMaskedPassword()
+   {
+      return useMaskedPassword;
+   }
+
+   public void setUseMaskedPassword(boolean useMaskedPassword)
+   {
+      this.useMaskedPassword = useMaskedPassword;
+   }
+
+   public String getPasswordCodec()
+   {
+      return passwordCodec;
+   }
+
+   public void setPasswordCodec(String codecs)
+   {
+      passwordCodec = codecs;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "ActiveMQRAProperties[localTx=" + localTx +
+         ", userName=" + userName + ", password=****]";
+   }
+
+   public synchronized void init() throws ActiveMQException
+   {
+      if (initialized)
+         return;
+
+      if (useMaskedPassword)
+      {
+         codecInstance = new DefaultSensitiveStringCodec();
+
+         if (passwordCodec != null)
+         {
+            codecInstance = PasswordMaskingUtil.getCodec(passwordCodec);
+         }
+
+         try
+         {
+            if (password != null)
+            {
+               password = codecInstance.decode(password);
+            }
+         }
+         catch (Exception e)
+         {
+            throw ActiveMQRABundle.BUNDLE.errorDecodingPassword(e);
+         }
+
+      }
+      initialized = true;
+   }
+
+   public SensitiveDataCodec<String> getCodecInstance()
+   {
+      return codecInstance;
+   }
+
+   public String getJgroupsChannelLocatorClass()
+   {
+      return jgroupsChannelLocatorClass;
+   }
+
+   public void setJgroupsChannelLocatorClass(String jgroupsChannelLocatorClass)
+   {
+      this.jgroupsChannelLocatorClass = jgroupsChannelLocatorClass;
+   }
+
+   public String getJgroupsChannelRefName()
+   {
+      return jgroupsChannelRefName;
+   }
+
+   public void setJgroupsChannelRefName(String jgroupsChannelRefName)
+   {
+      this.jgroupsChannelRefName = jgroupsChannelRefName;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueReceiver.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueReceiver.java
new file mode 100644
index 0000000..43c023e
--- /dev/null
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueReceiver.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+
+
+/**
+ * A wrapper for a queue receiver
+ *
+ * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:jesper.pedersen@jboss.org">Jesper Pedersen</a>
+ */
+public class ActiveMQRAQueueReceiver extends ActiveMQRAMessageConsumer implements QueueReceiver
+{
+   /** Whether trace is enabled */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /**
+    * Create a new wrapper
+    * @param consumer the queue receiver
+    * @param session the session
+    */
+   public ActiveMQRAQueueReceiver(final QueueReceiver consumer, final ActiveMQRASession session)
+   {
+      super(consumer, session);
+
+      if (ActiveMQRAQueueReceiver.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + consumer + ", " + session + ")");
+      }
+   }
+
+   /**
+    * Get queue
+    * @return The queue
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Queue getQueue() throws JMSException
+   {
+      if (ActiveMQRAQueueReceiver.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getQueue()");
+      }
+
+      checkState();
+      return ((QueueReceiver)consumer).getQueue();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueSender.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueSender.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueSender.java
new file mode 100644
index 0000000..ea72d21
--- /dev/null
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueSender.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+
+/**
+ * ActiveMQQueueSender.
+ *
+ * @author <a href="adrian@jboss.com">Adrian Brock</a>
+ * @author <a href="jesper.pedersen@jboss.org">Jesper Pedersen</a>
+ */
+public class ActiveMQRAQueueSender extends ActiveMQRAMessageProducer implements QueueSender
+{
+   /** Whether trace is enabled */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /**
+    * Create a new wrapper
+    * @param producer the producer
+    * @param session the session
+    */
+   public ActiveMQRAQueueSender(final QueueSender producer, final ActiveMQRASession session)
+   {
+      super(producer, session);
+
+      if (ActiveMQRAQueueSender.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + producer + ", " + session + ")");
+      }
+   }
+
+   /**
+    * Get queue
+    * @return The queue
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Queue getQueue() throws JMSException
+   {
+      if (ActiveMQRAQueueSender.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getQueue()");
+      }
+
+      return ((QueueSender)producer).getQueue();
+   }
+
+   /**
+    * Send message
+    * @param destination The destination
+    * @param message The message
+    * @param deliveryMode The delivery mode
+    * @param priority The priority
+    * @param timeToLive The time to live
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void send(final Queue destination,
+                    final Message message,
+                    final int deliveryMode,
+                    final int priority,
+                    final long timeToLive) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRAQueueSender.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("send " + this +
+                                           " destination=" +
+                                           destination +
+                                           " message=" +
+                                           message +
+                                           " deliveryMode=" +
+                                           deliveryMode +
+                                           " priority=" +
+                                           priority +
+                                           " ttl=" +
+                                           timeToLive);
+         }
+
+         checkState();
+         producer.send(destination, message, deliveryMode, priority, timeToLive);
+
+         if (ActiveMQRAQueueSender.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   /**
+    * Send message
+    * @param destination The destination
+    * @param message The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void send(final Queue destination, final Message message) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRAQueueSender.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("send " + this + " destination=" + destination + " message=" + message);
+         }
+
+         checkState();
+         producer.send(destination, message);
+
+         if (ActiveMQRAQueueSender.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAService.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAService.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAService.java
new file mode 100644
index 0000000..430e855
--- /dev/null
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAService.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import java.util.Set;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+
+/**
+ * A ActiveMQRAService ensures that ActiveMQ Resource Adapter will be stopped *before* the ActiveMQ server.
+ * https://jira.jboss.org/browse/HORNETQ-339
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class ActiveMQRAService
+{
+   // Constants -----------------------------------------------------
+   // Attributes ----------------------------------------------------
+
+   private final MBeanServer mBeanServer;
+
+   private final String resourceAdapterObjectName;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ActiveMQRAService(final MBeanServer mBeanServer, final String resourceAdapterObjectName)
+   {
+      this.mBeanServer = mBeanServer;
+      this.resourceAdapterObjectName = resourceAdapterObjectName;
+   }
+
+   // Public --------------------------------------------------------
+
+   public void stop()
+   {
+      try
+      {
+         ObjectName objectName = new ObjectName(resourceAdapterObjectName);
+         Set<ObjectInstance> mbeanSet = mBeanServer.queryMBeans(objectName, null);
+
+         for (ObjectInstance mbean : mbeanSet)
+         {
+            String stateString = (String)mBeanServer.getAttribute(mbean.getObjectName(), "StateString");
+
+            if ("Started".equalsIgnoreCase(stateString) || "Starting".equalsIgnoreCase(stateString))
+            {
+               mBeanServer.invoke(mbean.getObjectName(), "stop", new Object[0], new String[0]);
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         ActiveMQRALogger.LOGGER.errorStoppingRA(e);
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}


Mime
View raw message