activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [18/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:48 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQSession.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQSession.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQSession.java
new file mode 100644
index 0000000..4be89fe
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQSession.java
@@ -0,0 +1,1276 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionInProgressException;
+import javax.transaction.xa.XAResource;
+
+import org.apache.activemq6.selector.filter.FilterException;
+import org.apache.activemq6.selector.SelectorParser;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQQueueExistsException;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.api.core.client.ClientConsumer;
+import org.apache.activemq6.api.core.client.ClientProducer;
+import org.apache.activemq6.api.core.client.ClientSession;
+import org.apache.activemq6.api.core.client.ClientSession.AddressQuery;
+import org.apache.activemq6.api.core.client.ClientSession.QueueQuery;
+
+/**
+ * HornetQ implementation of a JMS Session.
+ * <br>
+ * Note that we *do not* support JMS ASF (Application Server Facilities) optional
+ * constructs such as ConnectionConsumer
+ *
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ *
+ *
+ */
+public class HornetQSession implements QueueSession, TopicSession
+{
+   public static final int TYPE_GENERIC_SESSION = 0;
+
+   public static final int TYPE_QUEUE_SESSION = 1;
+
+   public static final int TYPE_TOPIC_SESSION = 2;
+
+   private static SimpleString REJECTING_FILTER = new SimpleString("_HQX=-1");
+
+   private final HornetQConnection connection;
+
+   private final ClientSession session;
+
+   private final int sessionType;
+
+   private final int ackMode;
+
+   private final boolean transacted;
+
+   private final boolean xa;
+
+   private boolean recoverCalled;
+
+   private final Set<HornetQMessageConsumer> consumers = new HashSet<HornetQMessageConsumer>();
+
+   // Constructors --------------------------------------------------
+
+   protected HornetQSession(final HornetQConnection connection,
+                            final boolean transacted,
+                            final boolean xa,
+                            final int ackMode,
+                            final ClientSession session,
+                            final int sessionType)
+   {
+      this.connection = connection;
+
+      this.ackMode = ackMode;
+
+      this.session = session;
+
+      this.sessionType = sessionType;
+
+      this.transacted = transacted;
+
+      this.xa = xa;
+   }
+
+   // Session implementation ----------------------------------------
+
+   public BytesMessage createBytesMessage() throws JMSException
+   {
+      checkClosed();
+
+      return new HornetQBytesMessage(session);
+   }
+
+   public MapMessage createMapMessage() throws JMSException
+   {
+      checkClosed();
+
+      return new HornetQMapMessage(session);
+   }
+
+   public Message createMessage() throws JMSException
+   {
+      checkClosed();
+
+      return new HornetQMessage(session);
+   }
+
+   public ObjectMessage createObjectMessage() throws JMSException
+   {
+      checkClosed();
+
+      return new HornetQObjectMessage(session);
+   }
+
+   public ObjectMessage createObjectMessage(final Serializable object) throws JMSException
+   {
+      checkClosed();
+
+      HornetQObjectMessage msg = new HornetQObjectMessage(session);
+
+      msg.setObject(object);
+
+      return msg;
+   }
+
+   public StreamMessage createStreamMessage() throws JMSException
+   {
+      checkClosed();
+
+      return new HornetQStreamMessage(session);
+   }
+
+   public TextMessage createTextMessage() throws JMSException
+   {
+      checkClosed();
+
+      HornetQTextMessage msg = new HornetQTextMessage(session);
+
+      msg.setText(null);
+
+      return msg;
+   }
+
+   public TextMessage createTextMessage(final String text) throws JMSException
+   {
+      checkClosed();
+
+      HornetQTextMessage msg = new HornetQTextMessage(session);
+
+      msg.setText(text);
+
+      return msg;
+   }
+
+   public boolean getTransacted() throws JMSException
+   {
+      checkClosed();
+
+      return transacted;
+   }
+
+   public int getAcknowledgeMode() throws JMSException
+   {
+      checkClosed();
+
+      return ackMode;
+   }
+
+   public boolean isXA()
+   {
+      return xa;
+   }
+
+   public void commit() throws JMSException
+   {
+      if (!transacted)
+      {
+         throw new IllegalStateException("Cannot commit a non-transacted session");
+      }
+      if (xa)
+      {
+         throw new TransactionInProgressException("Cannot call commit on an XA session");
+      }
+      try
+      {
+         session.commit();
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public void rollback() throws JMSException
+   {
+      if (!transacted)
+      {
+         throw new IllegalStateException("Cannot rollback a non-transacted session");
+      }
+      if (xa)
+      {
+         throw new TransactionInProgressException("Cannot call rollback on an XA session");
+      }
+
+      try
+      {
+         session.rollback();
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public void close() throws JMSException
+   {
+      connection.getThreadAwareContext().assertNotCompletionListenerThread();
+      connection.getThreadAwareContext().assertNotMessageListenerThread();
+      synchronized (connection)
+      {
+         try
+         {
+            for (HornetQMessageConsumer cons : new HashSet<HornetQMessageConsumer>(consumers))
+            {
+               cons.close();
+            }
+
+            session.close();
+
+            connection.removeSession(this);
+         }
+         catch (HornetQException e)
+         {
+            throw JMSExceptionHelper.convertFromHornetQException(e);
+         }
+      }
+   }
+
+   public void recover() throws JMSException
+   {
+      if (transacted)
+      {
+         throw new IllegalStateException("Cannot recover a transacted session");
+      }
+
+      try
+      {
+         session.rollback(true);
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+
+      recoverCalled = true;
+   }
+
+   public MessageListener getMessageListener() throws JMSException
+   {
+      checkClosed();
+
+      return null;
+   }
+
+   public void setMessageListener(final MessageListener listener) throws JMSException
+   {
+      checkClosed();
+   }
+
+   public void run()
+   {
+   }
+
+   public MessageProducer createProducer(final Destination destination) throws JMSException
+   {
+      if (destination != null && !(destination instanceof HornetQDestination))
+      {
+         throw new InvalidDestinationException("Not a HornetQ Destination:" + destination);
+      }
+
+      try
+      {
+         HornetQDestination jbd = (HornetQDestination)destination;
+
+         if (jbd != null)
+         {
+            ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());
+
+            if (!response.isExists())
+            {
+               throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
+            }
+
+            connection.addKnownDestination(jbd.getSimpleAddress());
+         }
+
+         ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress());
+
+         return new HornetQMessageProducer(connection, producer, jbd, session);
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public MessageConsumer createConsumer(final Destination destination) throws JMSException
+   {
+      return createConsumer(destination, null, false);
+   }
+
+   public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException
+   {
+      return createConsumer(destination, messageSelector, false);
+   }
+
+   public MessageConsumer createConsumer(final Destination destination,
+                                         final String messageSelector,
+                                         final boolean noLocal) throws JMSException
+   {
+      if (destination == null)
+      {
+         throw new InvalidDestinationException("Cannot create a consumer with a null destination");
+      }
+
+      if (!(destination instanceof HornetQDestination))
+      {
+         throw new InvalidDestinationException("Not a HornetQDestination:" + destination);
+      }
+
+      HornetQDestination jbdest = (HornetQDestination)destination;
+
+      if (jbdest.isTemporary() && !connection.containsTemporaryQueue(jbdest.getSimpleAddress()))
+      {
+         throw new JMSException("Can not create consumer for temporary destination " + destination +
+                                " from another JMS connection");
+      }
+
+      return createConsumer(jbdest, null, messageSelector, noLocal, ConsumerDurability.NON_DURABLE);
+   }
+
+   public Queue createQueue(final String queueName) throws JMSException
+   {
+      // As per spec. section 4.11
+      if (sessionType == HornetQSession.TYPE_TOPIC_SESSION)
+      {
+         throw new IllegalStateException("Cannot create a queue using a TopicSession");
+      }
+
+      try
+      {
+         HornetQQueue queue = lookupQueue(queueName, false);
+
+         if (queue == null)
+         {
+            queue = lookupQueue(queueName, true);
+         }
+
+         if (queue == null)
+         {
+            throw new JMSException("There is no queue with name " + queueName);
+         }
+         else
+         {
+            return queue;
+         }
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public Topic createTopic(final String topicName) throws JMSException
+   {
+      // As per spec. section 4.11
+      if (sessionType == HornetQSession.TYPE_QUEUE_SESSION)
+      {
+         throw new IllegalStateException("Cannot create a topic on a QueueSession");
+      }
+
+      try
+      {
+         HornetQTopic topic = lookupTopic(topicName, false);
+
+         if (topic == null)
+         {
+            topic = lookupTopic(topicName, true);
+         }
+
+         if (topic == null)
+         {
+            throw new JMSException("There is no topic with name " + topicName);
+         }
+         else
+         {
+            return topic;
+         }
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException
+   {
+      return createDurableSubscriber(topic, name, null, false);
+   }
+
+   public TopicSubscriber createDurableSubscriber(final Topic topic,
+                                                  final String name,
+                                                  String messageSelector,
+                                                  final boolean noLocal) throws JMSException
+   {
+      // As per spec. section 4.11
+      if (sessionType == HornetQSession.TYPE_QUEUE_SESSION)
+      {
+         throw new IllegalStateException("Cannot create a durable subscriber on a QueueSession");
+      }
+      checkTopic(topic);
+      if (!(topic instanceof HornetQDestination))
+      {
+         throw new InvalidDestinationException("Not a HornetQTopic:" + topic);
+      }
+      if ("".equals(messageSelector))
+      {
+         messageSelector = null;
+      }
+
+      HornetQDestination jbdest = (HornetQDestination)topic;
+
+      if (jbdest.isQueue())
+      {
+         throw new InvalidDestinationException("Cannot create a subscriber on a queue");
+      }
+
+      return createConsumer(jbdest, name, messageSelector, noLocal, ConsumerDurability.DURABLE);
+   }
+
+   private void checkTopic(Topic topic) throws InvalidDestinationException
+   {
+      if (topic == null)
+      {
+         throw HornetQJMSClientBundle.BUNDLE.nullTopic();
+      }
+   }
+
+   @Override
+   public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException
+   {
+      return createSharedConsumer(topic, sharedSubscriptionName, null);
+   }
+
+   /**
+    * Note: Needs to throw an exception if a subscriptionName is already in use by another topic, or if the messageSelector is different
+    *
+    * validate multiple subscriptions on the same session.
+    * validate multiple subscriptions on different sessions
+    * validate failure in one connection while another connection stills fine.
+    * Validate different filters in different possible scenarios
+    *
+    * @param topic
+    * @param name
+    * @param messageSelector
+    * @return
+    * @throws JMSException
+    */
+   @Override
+   public MessageConsumer createSharedConsumer(Topic topic, String name, String messageSelector) throws JMSException
+   {
+      if (sessionType == HornetQSession.TYPE_QUEUE_SESSION)
+      {
+         throw new IllegalStateException("Cannot create a shared consumer on a QueueSession");
+      }
+      checkTopic(topic);
+      HornetQTopic localTopic;
+      if (topic instanceof HornetQTopic)
+      {
+         localTopic = (HornetQTopic)topic;
+      }
+      else
+      {
+         localTopic = new HornetQTopic(topic.getTopicName());
+      }
+      return internalCreateSharedConsumer(localTopic, name, messageSelector, ConsumerDurability.NON_DURABLE, true);
+   }
+
+   @Override
+   public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException
+   {
+      return createDurableConsumer(topic, name, null, false);
+   }
+
+   @Override
+   public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException
+   {
+      if (sessionType == HornetQSession.TYPE_QUEUE_SESSION)
+      {
+         throw new IllegalStateException("Cannot create a durable consumer on a QueueSession");
+      }
+      checkTopic(topic);
+      HornetQTopic localTopic;
+      if (topic instanceof HornetQTopic)
+      {
+         localTopic = (HornetQTopic)topic;
+      }
+      else
+      {
+         localTopic = new HornetQTopic(topic.getTopicName());
+      }
+      return createConsumer(localTopic, name, messageSelector, noLocal, ConsumerDurability.DURABLE);
+   }
+
+   @Override
+   public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException
+   {
+      return createSharedDurableConsumer(topic, name, null);
+   }
+
+   @Override
+   public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException
+   {
+      if (sessionType == HornetQSession.TYPE_QUEUE_SESSION)
+      {
+         throw new IllegalStateException("Cannot create a shared durable consumer on a QueueSession");
+      }
+
+      checkTopic(topic);
+
+      HornetQTopic localTopic;
+
+      if (topic instanceof HornetQTopic)
+      {
+         localTopic = (HornetQTopic)topic;
+      }
+      else
+      {
+         localTopic = new HornetQTopic(topic.getTopicName());
+      }
+      return internalCreateSharedConsumer(localTopic, name, messageSelector, ConsumerDurability.DURABLE, true);
+   }
+
+   enum ConsumerDurability
+   {
+      DURABLE, NON_DURABLE;
+   }
+
+
+   /**
+    * This is an internal method for shared consumers
+    */
+   private HornetQMessageConsumer internalCreateSharedConsumer(final HornetQDestination dest,
+                                                               final String subscriptionName,
+                                                               String selectorString,
+                                                               ConsumerDurability durability,
+                                                               final boolean shared) throws JMSException
+   {
+      try
+      {
+
+         if (dest.isQueue())
+         {
+            // This is not really possible unless someone makes a mistake on code
+            // createSharedConsumer only accpets Topics by declaration
+            throw new RuntimeException("Internal error: createSharedConsumer is only meant for Topics");
+         }
+
+         if (subscriptionName == null)
+         {
+            throw HornetQJMSClientBundle.BUNDLE.invalidSubscriptionName();
+         }
+
+         selectorString = "".equals(selectorString) ? null : selectorString;
+
+         SimpleString coreFilterString = null;
+
+         if (selectorString != null)
+         {
+            coreFilterString = new SimpleString(SelectorTranslator.convertToHornetQFilterString(selectorString));
+         }
+
+         ClientConsumer consumer;
+
+         SimpleString autoDeleteQueueName = null;
+
+         AddressQuery response = session.addressQuery(dest.getSimpleAddress());
+
+         if (!response.isExists())
+         {
+            throw HornetQJMSClientBundle.BUNDLE.destinationDoesNotExist(dest.getSimpleAddress());
+         }
+
+         SimpleString queueName;
+
+         if (dest.isTemporary() && durability == ConsumerDurability.DURABLE)
+         {
+            throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
+         }
+
+         queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(),
+                                                                                               subscriptionName));
+
+         if (durability == ConsumerDurability.DURABLE)
+         {
+            try
+            {
+               session.createSharedQueue(dest.getSimpleAddress(), queueName, coreFilterString, true);
+            }
+            catch (HornetQQueueExistsException ignored)
+            {
+               // We ignore this because querying and then creating the queue wouldn't be idempotent
+               // we could also add a parameter to ignore existence what would require a bigger work around to avoid
+               // compatibility.
+            }
+         }
+         else
+         {
+            session.createSharedQueue(dest.getSimpleAddress(), queueName, coreFilterString, false);
+         }
+
+         consumer = session.createConsumer(queueName, null, false);
+
+         HornetQMessageConsumer jbc = new HornetQMessageConsumer(connection, this,
+                                                                 consumer,
+                                                                 false,
+                                                                 dest,
+                                                                 selectorString,
+                                                                 autoDeleteQueueName);
+
+         consumers.add(jbc);
+
+         return jbc;
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+
+
+   private HornetQMessageConsumer createConsumer(final HornetQDestination dest,
+                                                 final String subscriptionName,
+                                                 String selectorString, final boolean noLocal,
+                                                 ConsumerDurability durability) throws JMSException
+   {
+      try
+      {
+         selectorString = "".equals(selectorString) ? null : selectorString;
+
+         if (noLocal)
+         {
+            connection.setHasNoLocal();
+
+            String filter;
+            if (connection.getClientID() != null)
+            {
+               filter =
+                        HornetQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + connection.getClientID() +
+                                 "'";
+            }
+            else
+            {
+               filter = HornetQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + connection.getUID() + "'";
+            }
+
+            if (selectorString != null)
+            {
+               selectorString += " AND " + filter;
+            }
+            else
+            {
+               selectorString = filter;
+            }
+         }
+
+         SimpleString coreFilterString = null;
+
+         if (selectorString != null)
+         {
+            coreFilterString = new SimpleString(SelectorTranslator.convertToHornetQFilterString(selectorString));
+         }
+
+         ClientConsumer consumer;
+
+         SimpleString autoDeleteQueueName = null;
+
+         if (dest.isQueue())
+         {
+            AddressQuery response = session.addressQuery(dest.getSimpleAddress());
+
+            if (!response.isExists())
+            {
+               throw new InvalidDestinationException("Queue " + dest.getName() + " does not exist");
+            }
+
+            connection.addKnownDestination(dest.getSimpleAddress());
+
+            consumer = session.createConsumer(dest.getSimpleAddress(), coreFilterString, false);
+         }
+         else
+         {
+            AddressQuery response = session.addressQuery(dest.getSimpleAddress());
+
+            if (!response.isExists())
+            {
+               throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist");
+            }
+
+            connection.addKnownDestination(dest.getSimpleAddress());
+
+            SimpleString queueName;
+
+            if (subscriptionName == null)
+            {
+               if (durability != ConsumerDurability.NON_DURABLE)
+                  throw new RuntimeException();
+               // Non durable sub
+
+               queueName = new SimpleString(UUID.randomUUID().toString());
+
+               session.createTemporaryQueue(dest.getSimpleAddress(), queueName, coreFilterString);
+
+               consumer = session.createConsumer(queueName, null, false);
+
+               autoDeleteQueueName = queueName;
+            }
+            else
+            {
+               // Durable sub
+               if (durability != ConsumerDurability.DURABLE)
+                  throw new RuntimeException();
+               if (connection.getClientID() == null)
+               {
+                  throw new IllegalStateException("Cannot create durable subscription - client ID has not been set");
+               }
+
+               if (dest.isTemporary())
+               {
+                  throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
+               }
+
+               queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(),
+                                                                                                     subscriptionName));
+
+               QueueQuery subResponse = session.queueQuery(queueName);
+
+               if (!subResponse.isExists())
+               {
+                  session.createQueue(dest.getSimpleAddress(), queueName, coreFilterString, true);
+               }
+               else
+               {
+                  // Already exists
+                  if (subResponse.getConsumerCount() > 0)
+                  {
+                     throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
+                  }
+
+                  // From javax.jms.Session Javadoc (and also JMS 1.1 6.11.1):
+                  // A client can change an existing durable subscription by
+                  // creating a durable
+                  // TopicSubscriber with the same name and a new topic and/or
+                  // message selector.
+                  // Changing a durable subscriber is equivalent to
+                  // unsubscribing (deleting) the old
+                  // one and creating a new one.
+
+                  SimpleString oldFilterString = subResponse.getFilterString();
+
+                  boolean selectorChanged = coreFilterString == null && oldFilterString != null ||
+                                            oldFilterString == null &&
+                                            coreFilterString != null ||
+                                            oldFilterString != null &&
+                                            coreFilterString != null &&
+                                            !oldFilterString.equals(coreFilterString);
+
+                  SimpleString oldTopicName = subResponse.getAddress();
+
+                  boolean topicChanged = !oldTopicName.equals(dest.getSimpleAddress());
+
+                  if (selectorChanged || topicChanged)
+                  {
+                     // Delete the old durable sub
+                     session.deleteQueue(queueName);
+
+                     // Create the new one
+                     session.createQueue(dest.getSimpleAddress(), queueName, coreFilterString, true);
+                  }
+               }
+
+               consumer = session.createConsumer(queueName, null, false);
+            }
+         }
+
+         HornetQMessageConsumer jbc = new HornetQMessageConsumer(connection,
+                                                                 this,
+                                                                 consumer,
+                                                                 noLocal,
+                                                                 dest,
+                                                                 selectorString,
+                                                                 autoDeleteQueueName);
+
+         consumers.add(jbc);
+
+         return jbc;
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public void ackAllConsumers() throws JMSException
+   {
+      checkClosed();
+   }
+
+   public QueueBrowser createBrowser(final Queue queue) throws JMSException
+   {
+      return createBrowser(queue, null);
+   }
+
+   public QueueBrowser createBrowser(final Queue queue, String filterString) throws JMSException
+   {
+      // As per spec. section 4.11
+      if (sessionType == HornetQSession.TYPE_TOPIC_SESSION)
+      {
+         throw new IllegalStateException("Cannot create a browser on a TopicSession");
+      }
+      if (queue == null)
+      {
+         throw new InvalidDestinationException("Cannot create a browser with a null queue");
+      }
+      if (!(queue instanceof HornetQDestination))
+      {
+         throw new InvalidDestinationException("Not a HornetQQueue:" + queue);
+      }
+      if ("".equals(filterString))
+      {
+         filterString = null;
+      }
+
+      // eager test of the filter syntax as required by JMS spec
+      try
+      {
+         if (filterString != null)
+         {
+            SelectorParser.parse(filterString.trim());
+         }
+      }
+      catch (FilterException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(HornetQJMSClientBundle.BUNDLE.invalidFilter(e, new SimpleString(filterString)));
+      }
+
+      HornetQDestination jbq = (HornetQDestination)queue;
+
+      if (!jbq.isQueue())
+      {
+         throw new InvalidDestinationException("Cannot create a browser on a topic");
+      }
+
+      try
+      {
+         AddressQuery message = session.addressQuery(new SimpleString(jbq.getAddress()));
+         if (!message.isExists())
+         {
+            throw new InvalidDestinationException(jbq.getAddress() + " does not exist");
+         }
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+
+      return new HornetQQueueBrowser((HornetQQueue)jbq, filterString, session);
+
+   }
+
+   public TemporaryQueue createTemporaryQueue() throws JMSException
+   {
+      // As per spec. section 4.11
+      if (sessionType == HornetQSession.TYPE_TOPIC_SESSION)
+      {
+         throw new IllegalStateException("Cannot create a temporary queue using a TopicSession");
+      }
+
+      try
+      {
+         HornetQTemporaryQueue queue = HornetQDestination.createTemporaryQueue(this);
+
+         SimpleString simpleAddress = queue.getSimpleAddress();
+
+         session.createTemporaryQueue(simpleAddress, simpleAddress);
+
+         connection.addTemporaryQueue(simpleAddress);
+
+         return queue;
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public TemporaryTopic createTemporaryTopic() throws JMSException
+   {
+      // As per spec. section 4.11
+      if (sessionType == HornetQSession.TYPE_QUEUE_SESSION)
+      {
+         throw new IllegalStateException("Cannot create a temporary topic on a QueueSession");
+      }
+
+      try
+      {
+         HornetQTemporaryTopic topic = HornetQDestination.createTemporaryTopic(this);
+
+         SimpleString simpleAddress = topic.getSimpleAddress();
+
+         // We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
+         // checks when routing messages to a topic that
+         // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
+         // subscriptions - core has no notion of a topic
+
+         session.createTemporaryQueue(simpleAddress, simpleAddress, HornetQSession.REJECTING_FILTER);
+
+         connection.addTemporaryQueue(simpleAddress);
+
+         return topic;
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public void unsubscribe(final String name) throws JMSException
+   {
+      // As per spec. section 4.11
+      if (sessionType == HornetQSession.TYPE_QUEUE_SESSION)
+      {
+         throw new IllegalStateException("Cannot unsubscribe using a QueueSession");
+      }
+
+      SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(),
+                                                                                                         name));
+
+      try
+      {
+         QueueQuery response = session.queueQuery(queueName);
+
+         if (!response.isExists())
+         {
+            throw new InvalidDestinationException("Cannot unsubscribe, subscription with name " + name +
+                                                  " does not exist");
+         }
+
+         if (response.getConsumerCount() != 0)
+         {
+            throw new IllegalStateException("Cannot unsubscribe durable subscription " + name +
+                                            " since it has active subscribers");
+         }
+
+         session.deleteQueue(queueName);
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   // XASession implementation
+
+   public Session getSession() throws JMSException
+   {
+      if (!xa)
+      {
+         throw new IllegalStateException("Isn't an XASession");
+      }
+
+      return this;
+   }
+
+   public XAResource getXAResource()
+   {
+      return session.getXAResource();
+   }
+
+   // QueueSession implementation
+
+   public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException
+   {
+      return (QueueReceiver)createConsumer(queue, messageSelector);
+   }
+
+   public QueueReceiver createReceiver(final Queue queue) throws JMSException
+   {
+      return (QueueReceiver)createConsumer(queue);
+   }
+
+   public QueueSender createSender(final Queue queue) throws JMSException
+   {
+      return (QueueSender)createProducer(queue);
+   }
+
+   // XAQueueSession implementation
+
+   public QueueSession getQueueSession() throws JMSException
+   {
+      return (QueueSession)getSession();
+   }
+
+   // TopicSession implementation
+
+   public TopicPublisher createPublisher(final Topic topic) throws JMSException
+   {
+      return (TopicPublisher)createProducer(topic);
+   }
+
+   public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal) throws JMSException
+   {
+      return (TopicSubscriber)createConsumer(topic, messageSelector, noLocal);
+   }
+
+   public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
+   {
+      return (TopicSubscriber)createConsumer(topic);
+   }
+
+   // XATopicSession implementation
+
+   public TopicSession getTopicSession() throws JMSException
+   {
+      return (TopicSession)getSession();
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public String toString()
+   {
+      return "HornetQSession->" + session;
+   }
+
+   public ClientSession getCoreSession()
+   {
+      return session;
+   }
+
+   public boolean isRecoverCalled()
+   {
+      return recoverCalled;
+   }
+
+   public void setRecoverCalled(final boolean recoverCalled)
+   {
+      this.recoverCalled = recoverCalled;
+   }
+
+   public void deleteTemporaryTopic(final HornetQDestination tempTopic) throws JMSException
+   {
+      if (!tempTopic.isTemporary())
+      {
+         throw new InvalidDestinationException("Not a temporary topic " + tempTopic);
+      }
+
+      try
+      {
+         AddressQuery response = session.addressQuery(tempTopic.getSimpleAddress());
+
+         if (!response.isExists())
+         {
+            throw new InvalidDestinationException("Cannot delete temporary topic " + tempTopic.getName() +
+                                                  " does not exist");
+         }
+
+         if (response.getQueueNames().size() > 1)
+         {
+            throw new IllegalStateException("Cannot delete temporary topic " + tempTopic.getName() +
+                                            " since it has subscribers");
+         }
+
+         SimpleString address = tempTopic.getSimpleAddress();
+
+         session.deleteQueue(address);
+
+         connection.removeTemporaryQueue(address);
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public void deleteTemporaryQueue(final HornetQDestination tempQueue) throws JMSException
+   {
+      if (!tempQueue.isTemporary())
+      {
+         throw new InvalidDestinationException("Not a temporary queue " + tempQueue);
+      }
+      try
+      {
+         QueueQuery response = session.queueQuery(tempQueue.getSimpleAddress());
+
+         if (!response.isExists())
+         {
+            throw new InvalidDestinationException("Cannot delete temporary queue " + tempQueue.getName() +
+                                                  " does not exist");
+         }
+
+         if (response.getConsumerCount() > 0)
+         {
+            throw new IllegalStateException("Cannot delete temporary queue " + tempQueue.getName() +
+                                            " since it has subscribers");
+         }
+
+         SimpleString address = tempQueue.getSimpleAddress();
+
+         session.deleteQueue(address);
+
+         connection.removeTemporaryQueue(address);
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public void start() throws JMSException
+   {
+      try
+      {
+         session.start();
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public void stop() throws JMSException
+   {
+      try
+      {
+         session.stop();
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public void removeConsumer(final HornetQMessageConsumer consumer)
+   {
+      consumers.remove(consumer);
+   }
+
+   // Package protected ---------------------------------------------
+
+   void deleteQueue(final SimpleString queueName) throws JMSException
+   {
+      if (!session.isClosed())
+      {
+         try
+         {
+            session.deleteQueue(queueName);
+         }
+         catch (HornetQException ignore)
+         {
+            // Exception on deleting queue shouldn't prevent close from completing
+         }
+      }
+   }
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private void checkClosed() throws JMSException
+   {
+      if (session.isClosed())
+      {
+         throw new IllegalStateException("Session is closed");
+      }
+   }
+
+   private HornetQQueue lookupQueue(final String queueName, boolean isTemporary) throws HornetQException
+   {
+      HornetQQueue queue;
+
+      if (isTemporary)
+      {
+         queue = HornetQDestination.createTemporaryQueue(queueName);
+      }
+      else
+      {
+         queue = HornetQDestination.createQueue(queueName);
+      }
+
+      QueueQuery response = session.queueQuery(queue.getSimpleAddress());
+
+      if (response.isExists())
+      {
+         return queue;
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   private HornetQTopic lookupTopic(final String topicName, final boolean isTemporary) throws HornetQException
+   {
+
+      HornetQTopic topic;
+
+      if (isTemporary)
+      {
+         topic = HornetQDestination.createTemporaryTopic(topicName);
+      }
+      else
+      {
+         topic = HornetQDestination.createTopic(topicName);
+      }
+
+      AddressQuery query = session.addressQuery(topic.getSimpleAddress());
+
+      if (!query.isExists())
+      {
+         return null;
+      }
+      else
+      {
+         return topic;
+      }
+   }
+
+   // Inner classes -------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQStreamMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQStreamMessage.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQStreamMessage.java
new file mode 100644
index 0000000..86182a8
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQStreamMessage.java
@@ -0,0 +1,466 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.StreamMessage;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.api.core.client.ClientMessage;
+import org.apache.activemq6.api.core.client.ClientSession;
+import org.apache.activemq6.core.client.impl.ClientMessageImpl;
+import org.apache.activemq6.utils.DataConstants;
+
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadBoolean;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadByte;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadBytes;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadChar;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadDouble;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadFloat;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadInteger;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadLong;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadObject;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadShort;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadString;
+
+/**
+ * HornetQ implementation of a JMS StreamMessage.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ * Some parts based on JBM 1.x class by:
+ *
+ * @author Norbert Lataille (Norbert.Lataille@m4x.org)
+ * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ */
+public final class HornetQStreamMessage extends HornetQMessage implements StreamMessage
+{
+   public static final byte TYPE = Message.STREAM_TYPE;
+
+   protected HornetQStreamMessage(final ClientSession session)
+   {
+      super(HornetQStreamMessage.TYPE, session);
+   }
+
+   protected HornetQStreamMessage(final ClientMessage message, final ClientSession session)
+   {
+      super(message, session);
+   }
+
+   public HornetQStreamMessage(final StreamMessage foreign, final ClientSession session) throws JMSException
+   {
+      super(foreign, HornetQStreamMessage.TYPE, session);
+
+      foreign.reset();
+
+      try
+      {
+         while (true)
+         {
+            Object obj = foreign.readObject();
+            writeObject(obj);
+         }
+      }
+      catch (MessageEOFException e)
+      {
+         // Ignore
+      }
+   }
+
+   // For testing only
+   public HornetQStreamMessage()
+   {
+      message = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1500);
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public byte getType()
+   {
+      return HornetQStreamMessage.TYPE;
+   }
+
+   // StreamMessage implementation ----------------------------------
+
+   public boolean readBoolean() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return streamReadBoolean(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public byte readByte() throws JMSException
+   {
+      checkRead();
+
+      try
+      {
+         return streamReadByte(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public short readShort() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return streamReadShort(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public char readChar() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return streamReadChar(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public int readInt() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return streamReadInteger(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public long readLong() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return streamReadLong(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public float readFloat() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return streamReadFloat(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public double readDouble() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return streamReadDouble(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public String readString() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return streamReadString(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   /**
+    * len here is used to control how many more bytes to read
+    */
+   private int len = 0;
+
+   public int readBytes(final byte[] value) throws JMSException
+   {
+      checkRead();
+      try
+      {
+         Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value);
+
+         len = pairRead.getA();
+         return pairRead.getB();
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public Object readObject() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return streamReadObject(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public void writeBoolean(final boolean value) throws JMSException
+   {
+      checkWrite();
+      getBuffer().writeByte(DataConstants.BOOLEAN);
+      getBuffer().writeBoolean(value);
+   }
+
+   public void writeByte(final byte value) throws JMSException
+   {
+      checkWrite();
+      getBuffer().writeByte(DataConstants.BYTE);
+      getBuffer().writeByte(value);
+   }
+
+   public void writeShort(final short value) throws JMSException
+   {
+      checkWrite();
+      getBuffer().writeByte(DataConstants.SHORT);
+      getBuffer().writeShort(value);
+   }
+
+   public void writeChar(final char value) throws JMSException
+   {
+      checkWrite();
+      getBuffer().writeByte(DataConstants.CHAR);
+      getBuffer().writeShort((short)value);
+   }
+
+   public void writeInt(final int value) throws JMSException
+   {
+      checkWrite();
+      getBuffer().writeByte(DataConstants.INT);
+      getBuffer().writeInt(value);
+   }
+
+   public void writeLong(final long value) throws JMSException
+   {
+      checkWrite();
+      getBuffer().writeByte(DataConstants.LONG);
+      getBuffer().writeLong(value);
+   }
+
+   public void writeFloat(final float value) throws JMSException
+   {
+      checkWrite();
+      getBuffer().writeByte(DataConstants.FLOAT);
+      getBuffer().writeInt(Float.floatToIntBits(value));
+   }
+
+   public void writeDouble(final double value) throws JMSException
+   {
+      checkWrite();
+      getBuffer().writeByte(DataConstants.DOUBLE);
+      getBuffer().writeLong(Double.doubleToLongBits(value));
+   }
+
+   public void writeString(final String value) throws JMSException
+   {
+      checkWrite();
+      getBuffer().writeByte(DataConstants.STRING);
+      getBuffer().writeNullableString(value);
+   }
+
+   public void writeBytes(final byte[] value) throws JMSException
+   {
+      checkWrite();
+      getBuffer().writeByte(DataConstants.BYTES);
+      getBuffer().writeInt(value.length);
+      getBuffer().writeBytes(value);
+   }
+
+   public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException
+   {
+      checkWrite();
+      getBuffer().writeByte(DataConstants.BYTES);
+      getBuffer().writeInt(length);
+      getBuffer().writeBytes(value, offset, length);
+   }
+
+   public void writeObject(final Object value) throws JMSException
+   {
+      if (value instanceof String)
+      {
+         writeString((String)value);
+      }
+      else if (value instanceof Boolean)
+      {
+         writeBoolean((Boolean)value);
+      }
+      else if (value instanceof Byte)
+      {
+         writeByte((Byte)value);
+      }
+      else if (value instanceof Short)
+      {
+         writeShort((Short)value);
+      }
+      else if (value instanceof Integer)
+      {
+         writeInt((Integer)value);
+      }
+      else if (value instanceof Long)
+      {
+         writeLong((Long)value);
+      }
+      else if (value instanceof Float)
+      {
+         writeFloat((Float)value);
+      }
+      else if (value instanceof Double)
+      {
+         writeDouble((Double)value);
+      }
+      else if (value instanceof byte[])
+      {
+         writeBytes((byte[])value);
+      }
+      else if (value instanceof Character)
+      {
+         writeChar((Character)value);
+      }
+      else if (value == null)
+      {
+         writeString(null);
+      }
+      else
+      {
+         throw new MessageFormatException("Invalid object type: " + value.getClass());
+      }
+   }
+
+   public void reset() throws JMSException
+   {
+      if (!readOnly)
+      {
+         readOnly = true;
+      }
+      getBuffer().resetReaderIndex();
+   }
+
+   // HornetQRAMessage overrides ----------------------------------------
+
+   @Override
+   public void clearBody() throws JMSException
+   {
+      super.clearBody();
+
+      getBuffer().clear();
+   }
+
+   @Override
+   public void doBeforeSend() throws Exception
+   {
+      reset();
+   }
+
+   private HornetQBuffer getBuffer()
+   {
+      return message.getBodyBuffer();
+   }
+
+   @SuppressWarnings("rawtypes")
+   @Override
+   public boolean isBodyAssignableTo(Class c)
+   {
+      return false;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryQueue.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryQueue.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryQueue.java
new file mode 100644
index 0000000..e8ad86b
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryQueue.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.TemporaryQueue;
+
+
+/**
+ * HornetQ implementation of a JMS TemporaryQueue.
+ * <br>
+ * This class can be instantiated directly.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 3569 $</tt>
+ *
+ */
+public class HornetQTemporaryQueue extends HornetQQueue implements TemporaryQueue
+{
+   // Constants -----------------------------------------------------
+
+   private static final long serialVersionUID = -4624930377557954624L;
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+
+   // TemporaryQueue implementation ------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   /**
+    * @param address
+    * @param name
+    * @param session
+    */
+   public HornetQTemporaryQueue(String address, String name, HornetQSession session)
+   {
+      super(address, name, true, session);
+   }
+
+   @Override
+   public String toString()
+   {
+      return "HornetQTemporaryQueue[" + name + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryTopic.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryTopic.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryTopic.java
new file mode 100644
index 0000000..442f0fc
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryTopic.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.TemporaryTopic;
+
+/**
+ * A HornetQTemporaryTopic
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQTemporaryTopic extends HornetQTopic implements TemporaryTopic
+{
+
+   // Constants -----------------------------------------------------
+
+   private static final long serialVersionUID = 845450764835635266L;
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   protected HornetQTemporaryTopic(final String address, final String name,
+                                final HornetQSession session)
+   {
+      super(address, name, true, session);
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTextMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTextMessage.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTextMessage.java
new file mode 100644
index 0000000..4684cb2
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTextMessage.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.api.core.client.ClientMessage;
+import org.apache.activemq6.api.core.client.ClientSession;
+
+import static org.apache.activemq6.reader.TextMessageUtil.readBodyText;
+import static org.apache.activemq6.reader.TextMessageUtil.writeBodyText;
+
+
+/**
+ * HornetQ implementation of a JMS TextMessage.
+ * <br>
+ * This class was ported from SpyTextMessage in JBossMQ.
+ *
+ * @author Norbert Lataille (Norbert.Lataille@m4x.org)
+ * @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
+ * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ * @version $Revision: 3412 $
+ */
+public class HornetQTextMessage extends HornetQMessage implements TextMessage
+{
+   // Constants -----------------------------------------------------
+
+   public static final byte TYPE = Message.TEXT_TYPE;
+
+   // Attributes ----------------------------------------------------
+
+   // We cache it locally - it's more performant to cache as a SimpleString, the AbstractChannelBuffer write
+   // methods are more efficient for a SimpleString
+   private SimpleString text;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public HornetQTextMessage(final ClientSession session)
+   {
+      super(HornetQTextMessage.TYPE, session);
+   }
+
+   public HornetQTextMessage(final ClientMessage message, final ClientSession session)
+   {
+      super(message, session);
+   }
+
+   /**
+    * A copy constructor for non-HornetQ JMS TextMessages.
+    */
+   public HornetQTextMessage(final TextMessage foreign, final ClientSession session) throws JMSException
+   {
+      super(foreign, HornetQTextMessage.TYPE, session);
+
+      setText(foreign.getText());
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public byte getType()
+   {
+      return HornetQTextMessage.TYPE;
+   }
+
+   // TextMessage implementation ------------------------------------
+
+   public void setText(final String text) throws JMSException
+   {
+      checkWrite();
+
+      if (text != null)
+      {
+         this.text = new SimpleString(text);
+      }
+      else
+      {
+         this.text = null;
+      }
+
+      writeBodyText(message, this.text);
+   }
+
+   public String getText()
+   {
+      if (text != null)
+      {
+         return text.toString();
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   @Override
+   public void clearBody() throws JMSException
+   {
+      super.clearBody();
+
+      text = null;
+   }
+
+   // HornetQRAMessage override -----------------------------------------
+
+   @Override
+   public void doBeforeReceive() throws HornetQException
+   {
+      super.doBeforeReceive();
+
+      text = readBodyText(message);
+   }
+
+   @Override
+   protected <T> T getBodyInternal(Class<T> c)
+   {
+      return (T) getText();
+   }
+
+   @Override
+   public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class c)
+   {
+      if (text == null)
+         return true;
+      return c.isAssignableFrom(java.lang.String.class);
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopic.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopic.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopic.java
new file mode 100644
index 0000000..9b1d835
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopic.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.Topic;
+
+import org.apache.activemq6.api.core.SimpleString;
+
+/**
+ * HornetQ implementation of a JMS Topic.
+ * <br>
+ * This class can be instantiated directly.
+ *
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 8737 $</tt>
+ *
+ */
+public class HornetQTopic extends HornetQDestination implements Topic
+{
+   // Constants -----------------------------------------------------
+
+   private static final long serialVersionUID = 7873614001276404156L;
+   // Static --------------------------------------------------------
+
+   public static SimpleString createAddressFromName(final String name)
+   {
+      return new SimpleString(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + name);
+   }
+
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public HornetQTopic(final String name)
+   {
+      super(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + name, name, false, false, null);
+   }
+
+
+   /**
+    * @param address
+    * @param name
+    * @param temporary
+    * @param session
+    */
+   protected HornetQTopic(String address, String name, boolean temporary, HornetQSession session)
+   {
+      super(address, name, temporary, false, session);
+   }
+
+
+   // Topic implementation ------------------------------------------
+
+   public String getTopicName()
+   {
+      return name;
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public String toString()
+   {
+      return "HornetQTopic[" + name + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopicConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopicConnectionFactory.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopicConnectionFactory.java
new file mode 100644
index 0000000..26b0e72
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopicConnectionFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.TopicConnectionFactory;
+
+import org.apache.activemq6.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.client.ServerLocator;
+import org.apache.activemq6.api.jms.JMSFactoryType;
+
+/**
+ * A class that represents a TopicConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class HornetQTopicConnectionFactory extends HornetQConnectionFactory implements TopicConnectionFactory
+{
+   private static final long serialVersionUID = 7317051989866548455L;
+
+   /**
+    *
+    */
+   public HornetQTopicConnectionFactory()
+   {
+      super();
+   }
+
+   /**
+    * @param serverLocator
+    */
+   public HornetQTopicConnectionFactory(ServerLocator serverLocator)
+   {
+      super(serverLocator);
+   }
+
+
+   /**
+    * @param ha
+    * @param groupConfiguration
+    */
+   public HornetQTopicConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
+   {
+      super(ha, groupConfiguration);
+   }
+
+   /**
+    * @param ha
+    * @param initialConnectors
+    */
+   public HornetQTopicConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors)
+   {
+      super(ha, initialConnectors);
+   }
+
+   public int getFactoryType()
+   {
+      return JMSFactoryType.TOPIC_CF.intValue();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnection.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnection.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnection.java
new file mode 100644
index 0000000..e6b0edf
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnection.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.XAQueueConnection;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicConnection;
+import javax.jms.XATopicSession;
+
+import org.apache.activemq6.api.core.client.ClientSessionFactory;
+
+/**
+ * HornetQ implementation of a JMS XAConnection.
+ * <p>
+ * The flat implementation of {@link XATopicConnection} and {@link XAQueueConnection} is per design,
+ * following common practices of JMS 1.1.
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public final class HornetQXAConnection extends HornetQConnection implements XATopicConnection, XAQueueConnection
+{
+
+   public HornetQXAConnection(final String username, final String password, final int connectionType,
+                              final String clientID, final int dupsOKBatchSize, final int transactionBatchSize,
+                              final ClientSessionFactory sessionFactory)
+   {
+      super(username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, sessionFactory);
+   }
+
+   @Override
+   public XASession createXASession() throws JMSException
+   {
+      checkClosed();
+      return (XASession)createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, HornetQSession.TYPE_GENERIC_SESSION);
+   }
+
+   @Override
+   public XAQueueSession createXAQueueSession() throws JMSException
+   {
+      checkClosed();
+      return (XAQueueSession)createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, HornetQSession.TYPE_QUEUE_SESSION);
+
+   }
+
+   @Override
+   public XATopicSession createXATopicSession() throws JMSException
+   {
+      checkClosed();
+      return (XATopicSession)createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, HornetQSession.TYPE_TOPIC_SESSION);
+   }
+
+   @Override
+   protected boolean isXA()
+   {
+      return true;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnectionFactory.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnectionFactory.java
new file mode 100644
index 0000000..b0ae3d7
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnectionFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.XAQueueConnectionFactory;
+import javax.jms.XATopicConnectionFactory;
+
+import org.apache.activemq6.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.client.ServerLocator;
+import org.apache.activemq6.api.jms.JMSFactoryType;
+
+/**
+ * A class that represents a XAConnectionFactory.
+ * <p>
+ * We consider the XAConnectionFactory to be the most complete possible option. It can be casted to any other connection factory since it is fully functional
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class HornetQXAConnectionFactory extends HornetQConnectionFactory implements XATopicConnectionFactory,
+   XAQueueConnectionFactory
+{
+   private static final long serialVersionUID = 743611571839154115L;
+
+   /**
+    *
+    */
+   public HornetQXAConnectionFactory()
+   {
+      super();
+   }
+
+   /**
+    * @param serverLocator
+    */
+   public HornetQXAConnectionFactory(ServerLocator serverLocator)
+   {
+      super(serverLocator);
+   }
+
+   /**
+    * @param ha
+    * @param groupConfiguration
+    */
+   public HornetQXAConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
+   {
+      super(ha, groupConfiguration);
+   }
+
+   /**
+    * @param ha
+    * @param initialConnectors
+    */
+   public HornetQXAConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors)
+   {
+      super(ha, initialConnectors);
+   }
+
+   @Override
+   public int getFactoryType()
+   {
+      return JMSFactoryType.XA_CF.intValue();
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAJMSContext.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAJMSContext.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAJMSContext.java
new file mode 100644
index 0000000..040f7db
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAJMSContext.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.XAJMSContext;
+
+public class HornetQXAJMSContext extends HornetQJMSContext implements XAJMSContext
+{
+   public HornetQXAJMSContext(HornetQConnectionForContext connection, ThreadAwareContext threadAwareContext)
+   {
+      super(connection, threadAwareContext);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAQueueConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAQueueConnectionFactory.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAQueueConnectionFactory.java
new file mode 100644
index 0000000..c8875ec
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAQueueConnectionFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.XAQueueConnectionFactory;
+
+import org.apache.activemq6.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.client.ServerLocator;
+import org.apache.activemq6.api.jms.JMSFactoryType;
+
+/**
+ * A class that represents a XAQueueConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class HornetQXAQueueConnectionFactory extends HornetQConnectionFactory implements XAQueueConnectionFactory
+{
+   private static final long serialVersionUID = 8612457847251087454L;
+
+   /**
+    *
+    */
+   public HornetQXAQueueConnectionFactory()
+   {
+      super();
+   }
+
+   /**
+    * @param serverLocator
+    */
+   public HornetQXAQueueConnectionFactory(ServerLocator serverLocator)
+   {
+      super(serverLocator);
+   }
+
+   /**
+    * @param ha
+    * @param groupConfiguration
+    */
+   public HornetQXAQueueConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
+   {
+      super(ha, groupConfiguration);
+   }
+
+   /**
+    * @param ha
+    * @param initialConnectors
+    */
+   public HornetQXAQueueConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors)
+   {
+      super(ha, initialConnectors);
+   }
+
+   public int getFactoryType()
+   {
+      return JMSFactoryType.QUEUE_XA_CF.intValue();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXASession.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXASession.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXASession.java
new file mode 100644
index 0000000..65aa067
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXASession.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.XAQueueSession;
+import javax.jms.XATopicSession;
+
+import org.apache.activemq6.api.core.client.ClientSession;
+
+/**
+ * A HornetQXASession
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class HornetQXASession extends HornetQSession implements XAQueueSession, XATopicSession
+{
+
+   /**
+    * @param connection
+    * @param transacted
+    * @param xa
+    * @param ackMode
+    * @param session
+    * @param sessionType
+    */
+   protected HornetQXASession(HornetQConnection connection,
+                              boolean transacted,
+                              boolean xa,
+                              int ackMode,
+                              ClientSession session,
+                              int sessionType)
+   {
+      super(connection, transacted, xa, ackMode, session, sessionType);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXATopicConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXATopicConnectionFactory.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXATopicConnectionFactory.java
new file mode 100644
index 0000000..d626c14
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXATopicConnectionFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.XATopicConnectionFactory;
+
+import org.apache.activemq6.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.client.ServerLocator;
+import org.apache.activemq6.api.jms.JMSFactoryType;
+
+/**
+ * A class that represents a XATopicConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class HornetQXATopicConnectionFactory extends HornetQConnectionFactory implements XATopicConnectionFactory
+{
+   private static final long serialVersionUID = -7018290426884419693L;
+
+   /**
+    *
+    */
+   public HornetQXATopicConnectionFactory()
+   {
+      super();
+   }
+
+   /**
+    * @param serverLocator
+    */
+   public HornetQXATopicConnectionFactory(final ServerLocator serverLocator)
+   {
+      super(serverLocator);
+   }
+
+   /**
+    * @param ha
+    * @param groupConfiguration
+    */
+   public HornetQXATopicConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
+   {
+      super(ha, groupConfiguration);
+   }
+
+   /**
+    * @param ha
+    * @param initialConnectors
+    */
+   public HornetQXATopicConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors)
+   {
+      super(ha, initialConnectors);
+   }
+
+   public int getFactoryType()
+   {
+      return JMSFactoryType.TOPIC_XA_CF.intValue();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JMSExceptionHelper.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JMSExceptionHelper.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JMSExceptionHelper.java
new file mode 100644
index 0000000..ba5a224
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JMSExceptionHelper.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+
+import org.apache.activemq6.api.core.HornetQException;
+
+/**
+ *
+ * A JMSExceptionHelper
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public final class JMSExceptionHelper
+{
+
+   public static JMSException convertFromHornetQException(final HornetQException me)
+   {
+      JMSException je;
+      switch (me.getType())
+      {
+         case CONNECTION_TIMEDOUT:
+            je = new JMSException(me.getMessage());
+            break;
+
+         case ILLEGAL_STATE:
+            je = new javax.jms.IllegalStateException(me.getMessage());
+            break;
+
+         case INTERNAL_ERROR:
+            je = new JMSException(me.getMessage());
+            break;
+
+         case INVALID_FILTER_EXPRESSION:
+            je = new InvalidSelectorException(me.getMessage());
+            break;
+
+         case NOT_CONNECTED:
+            je = new JMSException(me.getMessage());
+            break;
+
+         case OBJECT_CLOSED:
+            je = new javax.jms.IllegalStateException(me.getMessage());
+            break;
+
+         case QUEUE_DOES_NOT_EXIST:
+            je = new InvalidDestinationException(me.getMessage());
+            break;
+
+         case QUEUE_EXISTS:
+            je = new InvalidDestinationException(me.getMessage());
+            break;
+
+         case SECURITY_EXCEPTION:
+            je = new JMSSecurityException(me.getMessage());
+            break;
+
+         case UNSUPPORTED_PACKET:
+            je = new javax.jms.IllegalStateException(me.getMessage());
+            break;
+
+         case TRANSACTION_ROLLED_BACK:
+            je = new javax.jms.TransactionRolledBackException(me.getMessage());
+            break;
+
+         default:
+            je = new JMSException(me.getMessage());
+      }
+
+      je.setStackTrace(me.getStackTrace());
+
+      je.initCause(me);
+
+      return je;
+   }
+}


Mime
View raw message