activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [40/52] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 Rename HornetQ* classes to ActiveMQ*
Date Tue, 18 Nov 2014 23:38:33 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessage.java
new file mode 100644
index 0000000..c955f41
--- /dev/null
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessage.java
@@ -0,0 +1,1089 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.jms.client;
+
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.activemq.api.core.ActiveMQBuffer;
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.jms.ActiveMQJMSConstants;
+import org.apache.activemq.core.message.impl.MessageInternal;
+import org.apache.activemq.reader.MessageUtil;
+import org.apache.activemq.utils.UUID;
+
+
+/**
+ * ActiveMQ implementation of a JMS Message.
+ * <br>
+ * JMS Messages only live on the client side - the server only deals with MessageImpl
+ * instances
+ *
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:bershath@yahoo.com">Tyronne Wickramarathne</a> Partially ported from JBossMQ implementation
+ *         originally written by:
+ * @author Norbert Lataille (Norbert.Lataille@m4x.org)
+ * @author Hiram Chirino (Cojonudo14@hotmail.com)
+ * @author David Maplesden (David.Maplesden@orion.co.nz)
+ * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public class ActiveMQMessage implements javax.jms.Message
+{
+   // Constants -----------------------------------------------------
+   public static final byte TYPE = org.apache.activemq.api.core.Message.DEFAULT_TYPE;
+
+   public static Map<String, Object> coreMaptoJMSMap(final Map<String, Object> coreMessage)
+   {
+      Map<String, Object> jmsMessage = new HashMap<String, Object>();
+
+      String deliveryMode = (Boolean)coreMessage.get("durable") ? "PERSISTENT" : "NON_PERSISTENT";
+      byte priority = (Byte)coreMessage.get("priority");
+      long timestamp = (Long)coreMessage.get("timestamp");
+      long expiration = (Long)coreMessage.get("expiration");
+
+      jmsMessage.put("JMSPriority", priority);
+      jmsMessage.put("JMSTimestamp", timestamp);
+      jmsMessage.put("JMSExpiration", expiration);
+      jmsMessage.put("JMSDeliveryMode", deliveryMode);
+
+      for (Map.Entry<String, Object> entry : coreMessage.entrySet())
+      {
+         if (entry.getKey().equals("type") || entry.getKey().equals("durable") ||
+             entry.getKey().equals("expiration") ||
+             entry.getKey().equals("timestamp") ||
+             entry.getKey().equals("priority"))
+         {
+            // Ignore
+         }
+         else if (entry.getKey().equals("userID"))
+         {
+            jmsMessage.put("JMSMessageID", entry.getValue().toString());
+         }
+         else
+         {
+            Object value = entry.getValue();
+            if (value instanceof SimpleString)
+            {
+               jmsMessage.put(entry.getKey(), value.toString());
+            }
+            else
+            {
+               jmsMessage.put(entry.getKey(), value);
+            }
+         }
+      }
+
+      return jmsMessage;
+   }
+
+   // Static --------------------------------------------------------
+
+   private static final HashSet<String> reservedIdentifiers = new HashSet<String>();
+   static
+   {
+      ActiveMQMessage.reservedIdentifiers.add("NULL");
+      ActiveMQMessage.reservedIdentifiers.add("TRUE");
+      ActiveMQMessage.reservedIdentifiers.add("FALSE");
+      ActiveMQMessage.reservedIdentifiers.add("NOT");
+      ActiveMQMessage.reservedIdentifiers.add("AND");
+      ActiveMQMessage.reservedIdentifiers.add("OR");
+      ActiveMQMessage.reservedIdentifiers.add("BETWEEN");
+      ActiveMQMessage.reservedIdentifiers.add("LIKE");
+      ActiveMQMessage.reservedIdentifiers.add("IN");
+      ActiveMQMessage.reservedIdentifiers.add("IS");
+      ActiveMQMessage.reservedIdentifiers.add("ESCAPE");
+   }
+
+   public static ActiveMQMessage createMessage(final ClientMessage message, final ClientSession session)
+   {
+      int type = message.getType();
+
+      ActiveMQMessage msg;
+
+      switch (type)
+      {
+         case ActiveMQMessage.TYPE: // 0
+         {
+            msg = new ActiveMQMessage(message, session);
+            break;
+         }
+         case ActiveMQBytesMessage.TYPE: // 4
+         {
+            msg = new ActiveMQBytesMessage(message, session);
+            break;
+         }
+         case ActiveMQMapMessage.TYPE: // 5
+         {
+            msg = new ActiveMQMapMessage(message, session);
+            break;
+         }
+         case ActiveMQObjectMessage.TYPE:
+         {
+            msg = new ActiveMQObjectMessage(message, session);
+            break;
+         }
+         case ActiveMQStreamMessage.TYPE: // 6
+         {
+            msg = new ActiveMQStreamMessage(message, session);
+            break;
+         }
+         case ActiveMQTextMessage.TYPE: // 3
+         {
+            msg = new ActiveMQTextMessage(message, session);
+            break;
+         }
+         default:
+         {
+            throw new JMSRuntimeException("Invalid message type " + type);
+         }
+      }
+
+      return msg;
+   }
+
+   // Attributes ----------------------------------------------------
+
+   // The underlying message
+   protected ClientMessage message;
+
+   private ClientSession session;
+
+   // Read-only?
+   protected boolean readOnly;
+
+   // Properties read-only?
+   protected boolean propertiesReadOnly;
+
+   // Cache it
+   private Destination dest;
+
+   // Cache it
+   private String msgID;
+
+   // Cache it
+   private Destination replyTo;
+
+   // Cache it
+   private String jmsCorrelationID;
+
+   // Cache it
+   private String jmsType;
+
+   private boolean individualAck;
+
+   private long jmsDeliveryTime;
+
+   // Constructors --------------------------------------------------
+
+   /*
+    * Create a new message prior to sending
+    */
+   protected ActiveMQMessage(final byte type, final ClientSession session)
+   {
+      message = session.createMessage(type, true, 0, System.currentTimeMillis(), (byte)4);
+
+   }
+
+   protected ActiveMQMessage(final ClientSession session)
+   {
+      this(ActiveMQMessage.TYPE, session);
+   }
+
+   /**
+    * Constructor for when receiving a message from the server
+    */
+   public ActiveMQMessage(final ClientMessage message, final ClientSession session)
+   {
+      this.message = message;
+
+      readOnly = true;
+
+      propertiesReadOnly = true;
+
+      this.session = session;
+   }
+
+   /*
+    * A constructor that takes a foreign message
+    */
+   public ActiveMQMessage(final Message foreign, final ClientSession session) throws JMSException
+   {
+      this(foreign, ActiveMQMessage.TYPE, session);
+   }
+
+   public ActiveMQMessage()
+   {
+   }
+
+   protected ActiveMQMessage(final Message foreign, final byte type, final ClientSession session) throws JMSException
+   {
+      this(type, session);
+
+      setJMSTimestamp(foreign.getJMSTimestamp());
+
+      String value = System.getProperty(ActiveMQJMSConstants.JMS_ACTIVEMQ_ENABLE_BYTE_ARRAY_JMS_CORRELATION_ID_PROPERTY_NAME);
+
+      boolean supportBytesId = !"false".equals(value);
+
+      if (supportBytesId)
+      {
+         try
+         {
+            byte[] corrIDBytes = foreign.getJMSCorrelationIDAsBytes();
+            setJMSCorrelationIDAsBytes(corrIDBytes);
+         }
+         catch (JMSException e)
+         {
+            // specified as String
+            String corrIDString = foreign.getJMSCorrelationID();
+            if (corrIDString != null)
+            {
+               setJMSCorrelationID(corrIDString);
+            }
+         }
+      }
+      else
+      {
+         // Some providers, like WSMQ do automatic conversions between native byte[] correlation id
+         // and String correlation id. This makes it impossible for ActiveMQ to guarantee to return the correct
+         // type as set by the user
+         // So we allow the behaviour to be overridden by a system property
+         // https://jira.jboss.org/jira/browse/HORNETQ-356
+         // https://jira.jboss.org/jira/browse/HORNETQ-332
+         String corrIDString = foreign.getJMSCorrelationID();
+         if (corrIDString != null)
+         {
+            setJMSCorrelationID(corrIDString);
+         }
+      }
+
+      setJMSReplyTo(foreign.getJMSReplyTo());
+      setJMSDestination(foreign.getJMSDestination());
+      setJMSDeliveryMode(foreign.getJMSDeliveryMode());
+      setJMSExpiration(foreign.getJMSExpiration());
+      setJMSPriority(foreign.getJMSPriority());
+      setJMSType(foreign.getJMSType());
+
+      // We can't avoid a cast warning here since getPropertyNames() is on the JMS API
+      for (Enumeration<String> props = foreign.getPropertyNames(); props.hasMoreElements();)
+      {
+         String name = props.nextElement();
+
+         Object prop = foreign.getObjectProperty(name);
+
+         setObjectProperty(name, prop);
+      }
+   }
+
+   // javax.jmx.Message implementation ------------------------------
+
+   public String getJMSMessageID()
+   {
+      if (msgID == null)
+      {
+         UUID uid = message.getUserID();
+
+         msgID = uid == null ? null : "ID:" + uid.toString();
+      }
+      return msgID;
+   }
+
+   public void setJMSMessageID(final String jmsMessageID) throws JMSException
+   {
+      if (jmsMessageID != null && !jmsMessageID.startsWith("ID:"))
+      {
+         throw new JMSException("JMSMessageID must start with ID:");
+      }
+
+      message.setUserID(null);
+
+      msgID = jmsMessageID;
+   }
+
+   public long getJMSTimestamp() throws JMSException
+   {
+      return message.getTimestamp();
+   }
+
+   public void setJMSTimestamp(final long timestamp) throws JMSException
+   {
+      message.setTimestamp(timestamp);
+   }
+
+   public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+   {
+      return MessageUtil.getJMSCorrelationIDAsBytes(message);
+   }
+
+   public void setJMSCorrelationIDAsBytes(final byte[] correlationID) throws JMSException
+   {
+      try
+      {
+         MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID);
+      }
+      catch (ActiveMQException e)
+      {
+         JMSException ex = new JMSException(e.getMessage());
+         ex.initCause(e);
+         throw ex;
+      }
+   }
+
+   public void setJMSCorrelationID(final String correlationID) throws JMSException
+   {
+      MessageUtil.setJMSCorrelationID(message, correlationID);
+      jmsCorrelationID = correlationID;
+   }
+
+   public String getJMSCorrelationID() throws JMSException
+   {
+      if (jmsCorrelationID == null)
+      {
+         jmsCorrelationID = MessageUtil.getJMSCorrelationID(message);
+      }
+
+      return jmsCorrelationID;
+   }
+
+   public Destination getJMSReplyTo() throws JMSException
+   {
+      if (replyTo == null)
+      {
+
+         SimpleString repl = MessageUtil.getJMSReplyTo(message);
+
+         if (repl != null)
+         {
+            replyTo = ActiveMQDestination.fromAddress(repl.toString());
+         }
+      }
+      return replyTo;
+   }
+
+   public void setJMSReplyTo(final Destination dest) throws JMSException
+   {
+
+      if (dest == null)
+      {
+         MessageUtil.setJMSReplyTo(message, null);
+         replyTo = null;
+      }
+      else
+      {
+         if (dest instanceof ActiveMQDestination == false)
+         {
+            throw new InvalidDestinationException("Not a ActiveMQ destination " + dest);
+         }
+
+         ActiveMQDestination jbd = (ActiveMQDestination)dest;
+
+         MessageUtil.setJMSReplyTo(message, jbd.getSimpleAddress());
+
+         replyTo = jbd;
+      }
+   }
+
+   public Destination getJMSDestination() throws JMSException
+   {
+      if (dest == null)
+      {
+         SimpleString sdest = message.getAddress();
+
+         dest = sdest == null ? null : ActiveMQDestination.fromAddress(sdest.toString());
+      }
+
+      return dest;
+   }
+
+   public void setJMSDestination(final Destination destination) throws JMSException
+   {
+      dest = destination;
+   }
+
+   public int getJMSDeliveryMode() throws JMSException
+   {
+      return message.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+   }
+
+   public void setJMSDeliveryMode(final int deliveryMode) throws JMSException
+   {
+      if (deliveryMode == DeliveryMode.PERSISTENT)
+      {
+         message.setDurable(true);
+      }
+      else if (deliveryMode == DeliveryMode.NON_PERSISTENT)
+      {
+         message.setDurable(false);
+      }
+      else
+      {
+         throw ActiveMQJMSClientBundle.BUNDLE.illegalDeliveryMode(deliveryMode);
+      }
+   }
+
+   public boolean getJMSRedelivered() throws JMSException
+   {
+      return message.getDeliveryCount() > 1;
+   }
+
+   public void setJMSRedelivered(final boolean redelivered) throws JMSException
+   {
+      if (!redelivered)
+      {
+         message.setDeliveryCount(1);
+      }
+      else
+      {
+         if (message.getDeliveryCount() > 1)
+         {
+            // do nothing
+         }
+         else
+         {
+            message.setDeliveryCount(2);
+         }
+      }
+   }
+
+   public void setJMSType(final String type) throws JMSException
+   {
+      if (type != null)
+      {
+         MessageUtil.setJMSType(message, type);
+
+         jmsType = type;
+      }
+   }
+
+   public String getJMSType() throws JMSException
+   {
+      if (jmsType == null)
+      {
+         jmsType = MessageUtil.getJMSType(message);
+      }
+      return jmsType;
+   }
+
+   public long getJMSExpiration() throws JMSException
+   {
+      return message.getExpiration();
+   }
+
+   public void setJMSExpiration(final long expiration) throws JMSException
+   {
+      message.setExpiration(expiration);
+   }
+
+   public int getJMSPriority() throws JMSException
+   {
+      return message.getPriority();
+   }
+
+   public void setJMSPriority(final int priority) throws JMSException
+   {
+      checkPriority(priority);
+
+      message.setPriority((byte)priority);
+   }
+
+   public void clearProperties() throws JMSException
+   {
+
+      MessageUtil.clearProperties(message);
+
+      propertiesReadOnly = false;
+   }
+
+   public void clearBody() throws JMSException
+   {
+      readOnly = false;
+   }
+
+   public boolean propertyExists(final String name) throws JMSException
+   {
+      return MessageUtil.propertyExists(message, name);
+   }
+
+   public boolean getBooleanProperty(final String name) throws JMSException
+   {
+      try
+      {
+         return message.getBooleanProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public byte getByteProperty(final String name) throws JMSException
+   {
+      try
+      {
+         return message.getByteProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public short getShortProperty(final String name) throws JMSException
+   {
+      try
+      {
+         return message.getShortProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public int getIntProperty(final String name) throws JMSException
+   {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
+      {
+         return message.getDeliveryCount();
+      }
+
+      try
+      {
+         return message.getIntProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public long getLongProperty(final String name) throws JMSException
+   {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
+      {
+         return message.getDeliveryCount();
+      }
+
+      try
+      {
+         return message.getLongProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public float getFloatProperty(final String name) throws JMSException
+   {
+      try
+      {
+         return message.getFloatProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public double getDoubleProperty(final String name) throws JMSException
+   {
+      try
+      {
+         return message.getDoubleProperty(new SimpleString(name));
+      }
+      catch (ActiveMQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public String getStringProperty(final String name) throws JMSException
+   {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
+      {
+         return String.valueOf(message.getDeliveryCount());
+      }
+
+      try
+      {
+         if (MessageUtil.JMSXGROUPID.equals(name))
+         {
+            return message.getStringProperty(org.apache.activemq.api.core.Message.HDR_GROUP_ID);
+         }
+         else
+         {
+            return message.getStringProperty(new SimpleString(name));
+         }
+      }
+      catch (ActiveMQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public Object getObjectProperty(final String name) throws JMSException
+   {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
+      {
+         return String.valueOf(message.getDeliveryCount());
+      }
+
+      Object val = message.getObjectProperty(name);
+      if (val instanceof SimpleString)
+      {
+         val = ((SimpleString)val).toString();
+      }
+      return val;
+   }
+
+   @SuppressWarnings("rawtypes")
+   @Override
+   public Enumeration getPropertyNames() throws JMSException
+   {
+      return Collections.enumeration(MessageUtil.getPropertyNames(message));
+   }
+
+   public void setBooleanProperty(final String name, final boolean value) throws JMSException
+   {
+      checkProperty(name);
+
+      message.putBooleanProperty(new SimpleString(name), value);
+   }
+
+   public void setByteProperty(final String name, final byte value) throws JMSException
+   {
+      checkProperty(name);
+      message.putByteProperty(new SimpleString(name), value);
+   }
+
+   public void setShortProperty(final String name, final short value) throws JMSException
+   {
+      checkProperty(name);
+      message.putShortProperty(new SimpleString(name), value);
+   }
+
+   public void setIntProperty(final String name, final int value) throws JMSException
+   {
+      checkProperty(name);
+      message.putIntProperty(new SimpleString(name), value);
+   }
+
+   public void setLongProperty(final String name, final long value) throws JMSException
+   {
+      checkProperty(name);
+      message.putLongProperty(new SimpleString(name), value);
+   }
+
+   public void setFloatProperty(final String name, final float value) throws JMSException
+   {
+      checkProperty(name);
+      message.putFloatProperty(new SimpleString(name), value);
+   }
+
+   public void setDoubleProperty(final String name, final double value) throws JMSException
+   {
+      checkProperty(name);
+      message.putDoubleProperty(new SimpleString(name), value);
+   }
+
+   public void setStringProperty(final String name, final String value) throws JMSException
+   {
+      checkProperty(name);
+
+      if (MessageUtil.JMSXGROUPID.equals(name))
+      {
+         message.putStringProperty(org.apache.activemq.api.core.Message.HDR_GROUP_ID, SimpleString.toSimpleString(value));
+      }
+      else
+      {
+         message.putStringProperty(new SimpleString(name),  SimpleString.toSimpleString(value));
+      }
+   }
+
+   public void setObjectProperty(final String name, final Object value) throws JMSException
+   {
+      if (ActiveMQJMSConstants.JMS_ACTIVEMQ_OUTPUT_STREAM.equals(name))
+      {
+         setOutputStream((OutputStream)value);
+
+         return;
+      }
+      else if (ActiveMQJMSConstants.JMS_ACTIVEMQ_SAVE_STREAM.equals(name))
+      {
+         saveToOutputStream((OutputStream)value);
+
+         return;
+      }
+
+      checkProperty(name);
+
+      if (ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM.equals(name))
+      {
+         setInputStream((InputStream)value);
+
+         return;
+      }
+
+      try
+      {
+         message.putObjectProperty(new SimpleString(name), value);
+      }
+      catch (ActiveMQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public void acknowledge() throws JMSException
+   {
+      if (session != null)
+      {
+         try
+         {
+            if (individualAck)
+            {
+               message.individualAcknowledge();
+            }
+
+            session.commit();
+         }
+         catch (ActiveMQException e)
+         {
+            throw JMSExceptionHelper.convertFromActiveMQException(e);
+         }
+      }
+   }
+
+   @Override
+   public long getJMSDeliveryTime() throws JMSException
+   {
+      Long value;
+      try
+      {
+         value = message.getLongProperty(org.apache.activemq.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME);
+      }
+      catch (Exception e)
+      {
+         return 0;
+      }
+
+      if (value == null)
+      {
+         return 0;
+      }
+      else
+      {
+         return value.longValue();
+      }
+   }
+
+   @Override
+   public void setJMSDeliveryTime(long deliveryTime) throws JMSException
+   {
+      message.putLongProperty(org.apache.activemq.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, deliveryTime);
+   }
+
+   @Override
+   public <T> T getBody(Class<T> c) throws JMSException
+   {
+      if (isBodyAssignableTo(c))
+      {
+         return getBodyInternal(c);
+      }
+      // XXX HORNETQ-1209 Do we need translations here?
+      throw new MessageFormatException("Body not assignable to " + c);
+   }
+
+   @SuppressWarnings("unchecked")
+   protected <T> T getBodyInternal(Class<T> c) throws MessageFormatException
+   {
+      InputStream is = ((MessageInternal)message).getBodyInputStream();
+      try
+      {
+         ObjectInputStream ois = new ObjectInputStream(is);
+         return (T)ois.readObject();
+      }
+      catch (Exception e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+
+   @Override
+   public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class c)
+   {
+      /**
+       * From the specs:
+       * <p>
+       * If the message is a {@code Message} (but not one of its subtypes) then this method will
+       * return true irrespective of the value of this parameter.
+       */
+      return true;
+   }
+
+   /**
+    * Helper method for {@link #isBodyAssignableTo(Class)}.
+    * @return true if the message has no body.
+    */
+   protected boolean hasNoBody()
+   {
+      return message.getBodySize() == 0;
+   }
+
+   // Public --------------------------------------------------------
+
+   public void setIndividualAcknowledge()
+   {
+      this.individualAck = true;
+   }
+
+   public void resetMessageID(final String newMsgID)
+   {
+      this.msgID = newMsgID;
+   }
+
+   public ClientMessage getCoreMessage()
+   {
+      return message;
+   }
+
+   public void doBeforeSend() throws Exception
+   {
+      message.getBodyBuffer().resetReaderIndex();
+   }
+
+   public void checkBuffer()
+   {
+      message.getBodyBuffer();
+   }
+
+   public void doBeforeReceive() throws ActiveMQException
+   {
+      message.checkCompletion();
+
+      ActiveMQBuffer body = message.getBodyBuffer();
+
+      if (body != null)
+      {
+         body.resetReaderIndex();
+      }
+   }
+
+   public byte getType()
+   {
+      return ActiveMQMessage.TYPE;
+   }
+
+   public void setInputStream(final InputStream input) throws JMSException
+   {
+      checkStream();
+      if (readOnly)
+      {
+         throw ActiveMQJMSClientBundle.BUNDLE.messageNotWritable();
+      }
+
+      message.setBodyInputStream(input);
+   }
+
+   public void setOutputStream(final OutputStream output) throws JMSException
+   {
+      checkStream();
+      if (!readOnly)
+      {
+         throw new IllegalStateException("OutputStream property is only valid on received messages");
+      }
+
+      try
+      {
+         message.setOutputStream(output);
+      }
+      catch (ActiveMQException e)
+      {
+         throw JMSExceptionHelper.convertFromActiveMQException(e);
+      }
+   }
+
+   public void saveToOutputStream(final OutputStream output) throws JMSException
+   {
+      checkStream();
+      if (!readOnly)
+      {
+         throw new IllegalStateException("OutputStream property is only valid on received messages");
+      }
+
+      try
+      {
+         message.saveToOutputStream(output);
+      }
+      catch (ActiveMQException e)
+      {
+         throw JMSExceptionHelper.convertFromActiveMQException(e);
+      }
+   }
+
+   public boolean waitCompletionOnStream(final long timeWait) throws JMSException
+   {
+      checkStream();
+      try
+      {
+         return message.waitOutputStreamCompletion(timeWait);
+      }
+      catch (ActiveMQException e)
+      {
+         throw JMSExceptionHelper.convertFromActiveMQException(e);
+      }
+   }
+
+   @Override
+   public String toString()
+   {
+      StringBuffer sb = new StringBuffer("ActiveMQMessage[");
+      sb.append(getJMSMessageID());
+      sb.append("]:");
+      sb.append(message.isDurable() ? "PERSISTENT" : "NON-PERSISTENT");
+      return sb.toString();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void checkWrite() throws JMSException
+   {
+      if (readOnly)
+      {
+         throw ActiveMQJMSClientBundle.BUNDLE.messageNotWritable();
+      }
+   }
+
+   protected void checkRead() throws JMSException
+   {
+      if (!readOnly)
+      {
+         throw ActiveMQJMSClientBundle.BUNDLE.messageNotReadable();
+      }
+   }
+
+   // Private ------------------------------------------------------------
+
+   private void checkStream() throws JMSException
+   {
+      if (!(message.getType() == ActiveMQBytesMessage.TYPE || message.getType() == ActiveMQStreamMessage.TYPE))
+      {
+         throw ActiveMQJMSClientBundle.BUNDLE.onlyValidForByteOrStreamMessages();
+      }
+   }
+
+   private void checkProperty(final String name) throws JMSException
+   {
+      if (propertiesReadOnly)
+      {
+         if (name.equals(ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM))
+         {
+            throw new MessageNotWriteableException("You cannot set the Input Stream on received messages. Did you mean " + ActiveMQJMSConstants.JMS_ACTIVEMQ_OUTPUT_STREAM +
+                                                   " or " +
+                                                   ActiveMQJMSConstants.JMS_ACTIVEMQ_SAVE_STREAM +
+                                                   "?");
+         }
+         else
+         {
+            throw ActiveMQJMSClientBundle.BUNDLE.messageNotWritable();
+         }
+      }
+
+      if (name == null)
+      {
+         throw ActiveMQJMSClientBundle.BUNDLE.nullArgumentNotAllowed("property");
+      }
+
+      if (name.equals(""))
+      {
+         throw new IllegalArgumentException("The name of a property must not be an empty String.");
+      }
+
+      if (!isValidJavaIdentifier(name))
+      {
+         throw ActiveMQJMSClientBundle.BUNDLE.invalidJavaIdentifier(name);
+      }
+
+      if (ActiveMQMessage.reservedIdentifiers.contains(name))
+      {
+         throw new JMSRuntimeException("The property name '" + name + "' is reserved due to selector syntax.");
+      }
+
+      if (name.startsWith("JMS_ACTIVEMQ"))
+      {
+         throw new JMSRuntimeException("The property name '" + name + "' is illegal since it starts with JMS_ACTIVEMQ");
+      }
+   }
+
+   private boolean isValidJavaIdentifier(final String s)
+   {
+      if (s == null || s.length() == 0)
+      {
+         return false;
+      }
+
+      char[] c = s.toCharArray();
+
+      if (!Character.isJavaIdentifierStart(c[0]))
+      {
+         return false;
+      }
+
+      for (int i = 1; i < c.length; i++)
+      {
+         if (!Character.isJavaIdentifierPart(c[i]))
+         {
+            return false;
+         }
+      }
+
+      return true;
+   }
+
+   private void checkPriority(final int priority) throws JMSException
+   {
+      if (priority < 0 || priority > 9)
+      {
+         throw new JMSException(priority + " is not valid: priority must be between 0 and 9");
+      }
+   }
+
+   // Inner classes -------------------------------------------------
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageConsumer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageConsumer.java
new file mode 100644
index 0000000..e721b3c
--- /dev/null
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageConsumer.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.jms.client;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientConsumer;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.MessageHandler;
+import org.apache.activemq.api.jms.ActiveMQJMSConstants;
+
+/**
+ * ActiveMQ implementation of a JMS MessageConsumer.
+ *
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ */
+public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscriber
+{
+   private final ClientConsumer consumer;
+
+   private MessageListener listener;
+
+   private MessageHandler coreListener;
+
+   private final ActiveMQConnection connection;
+
+   private final ActiveMQSession session;
+
+   private final int ackMode;
+
+   private final boolean noLocal;
+
+   private final ActiveMQDestination destination;
+
+   private final String selector;
+
+   private final SimpleString autoDeleteQueueName;
+
+   // Constructors --------------------------------------------------
+
+   protected ActiveMQMessageConsumer(final ActiveMQConnection connection,
+                                     final ActiveMQSession session,
+                                     final ClientConsumer consumer,
+                                     final boolean noLocal,
+                                     final ActiveMQDestination destination,
+                                     final String selector,
+                                     final SimpleString autoDeleteQueueName) throws JMSException
+   {
+      this.connection = connection;
+
+      this.session = session;
+
+      this.consumer = consumer;
+
+      ackMode = session.getAcknowledgeMode();
+
+      this.noLocal = noLocal;
+
+      this.destination = destination;
+
+      this.selector = selector;
+
+      this.autoDeleteQueueName = autoDeleteQueueName;
+   }
+
+   // MessageConsumer implementation --------------------------------
+
+   public String getMessageSelector() throws JMSException
+   {
+      checkClosed();
+
+      return selector;
+   }
+
+   public MessageListener getMessageListener() throws JMSException
+   {
+      checkClosed();
+
+      return listener;
+   }
+
+   public void setMessageListener(final MessageListener listener) throws JMSException
+   {
+      this.listener = listener;
+
+      coreListener = listener == null ? null : new JMSMessageListenerWrapper(connection, session, consumer, listener, ackMode);
+
+      try
+      {
+         consumer.setMessageHandler(coreListener);
+      }
+      catch (ActiveMQException e)
+      {
+         throw JMSExceptionHelper.convertFromActiveMQException(e);
+      }
+   }
+
+   public Message receive() throws JMSException
+   {
+      return getMessage(0, false);
+   }
+
+   public Message receive(final long timeout) throws JMSException
+   {
+      return getMessage(timeout, false);
+   }
+
+   public Message receiveNoWait() throws JMSException
+   {
+      return getMessage(0, true);
+   }
+
+   public void close() throws JMSException
+   {
+      try
+      {
+         consumer.close();
+
+         if (autoDeleteQueueName != null)
+         {
+            // If non durable subscriber need to delete subscription too
+            session.deleteQueue(autoDeleteQueueName);
+         }
+
+         session.removeConsumer(this);
+      }
+      catch (ActiveMQException e)
+      {
+         throw JMSExceptionHelper.convertFromActiveMQException(e);
+      }
+   }
+
+   // QueueReceiver implementation ----------------------------------
+
+   public Queue getQueue() throws JMSException
+   {
+      checkClosed();
+
+      return (Queue)destination;
+   }
+
+   // TopicSubscriber implementation --------------------------------
+
+   public Topic getTopic() throws JMSException
+   {
+      checkClosed();
+
+      return (Topic)destination;
+   }
+
+   public boolean getNoLocal() throws JMSException
+   {
+      checkClosed();
+
+      return noLocal;
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public String toString()
+   {
+      return "ActiveMQMessageConsumer[" + consumer + "]";
+   }
+
+   public boolean isClosed()
+   {
+      return consumer.isClosed();
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private void checkClosed() throws JMSException
+   {
+      if (consumer.isClosed() || session.getCoreSession().isClosed())
+      {
+         throw new IllegalStateException("Consumer is closed");
+      }
+   }
+
+   private ActiveMQMessage getMessage(final long timeout, final boolean noWait) throws JMSException
+   {
+      try
+      {
+         ClientMessage coreMessage;
+
+         if (noWait)
+         {
+            coreMessage = consumer.receiveImmediate();
+         }
+         else
+         {
+            coreMessage = consumer.receive(timeout);
+         }
+
+         ActiveMQMessage jmsMsg = null;
+
+         if (coreMessage != null)
+         {
+            boolean needSession =
+                     ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE;
+            jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? session.getCoreSession() : null);
+
+            jmsMsg.doBeforeReceive();
+
+            // We Do the ack after doBeforeRecive, as in the case of large messages, this may fail so we don't want messages redelivered
+            // https://issues.jboss.org/browse/JBPAPP-6110
+            if (session.getAcknowledgeMode() == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE)
+            {
+               jmsMsg.setIndividualAcknowledge();
+            }
+            else
+            {
+               coreMessage.acknowledge();
+            }
+         }
+
+         return jmsMsg;
+      }
+      catch (ActiveMQException e)
+      {
+         throw JMSExceptionHelper.convertFromActiveMQException(e);
+      }
+   }
+
+   // Inner classes -------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java
new file mode 100644
index 0000000..00aed53
--- /dev/null
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java
@@ -0,0 +1,601 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.jms.client;
+
+import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientProducer;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.utils.UUID;
+import org.apache.activemq.utils.UUIDGenerator;
+/**
+ * ActiveMQ implementation of a JMS MessageProducer.
+ *
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ */
+public class ActiveMQMessageProducer implements MessageProducer, QueueSender, TopicPublisher
+{
+   private final ActiveMQConnection connection;
+
+   private final SimpleString connID;
+
+   private final ClientProducer clientProducer;
+   private final ClientSession clientSession;
+
+   private boolean disableMessageID = false;
+
+   private boolean disableMessageTimestamp = false;
+
+   private int defaultPriority = Message.DEFAULT_PRIORITY;
+   private long defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
+   private int defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
+   private long defaultDeliveryDelay = Message.DEFAULT_DELIVERY_DELAY;
+
+   private final ActiveMQDestination defaultDestination;
+   // Constructors --------------------------------------------------
+
+   protected ActiveMQMessageProducer(final ActiveMQConnection connection, final ClientProducer producer,
+                                     final ActiveMQDestination defaultDestination, final ClientSession clientSession) throws JMSException
+   {
+      this.connection = connection;
+
+      connID = connection.getClientID() != null ? new SimpleString(connection.getClientID()) : connection.getUID();
+
+      this.clientProducer = producer;
+
+      this.defaultDestination = defaultDestination;
+
+      this.clientSession = clientSession;
+   }
+
+   // MessageProducer implementation --------------------------------
+
+   public void setDisableMessageID(final boolean value) throws JMSException
+   {
+      checkClosed();
+
+      disableMessageID = value;
+   }
+
+   public boolean getDisableMessageID() throws JMSException
+   {
+      checkClosed();
+
+      return disableMessageID;
+   }
+
+   public void setDisableMessageTimestamp(final boolean value) throws JMSException
+   {
+      checkClosed();
+
+      disableMessageTimestamp = value;
+   }
+
+   public boolean getDisableMessageTimestamp() throws JMSException
+   {
+      checkClosed();
+
+      return disableMessageTimestamp;
+   }
+
+   public void setDeliveryMode(final int deliveryMode) throws JMSException
+   {
+      checkClosed();
+      if (deliveryMode != DeliveryMode.NON_PERSISTENT && deliveryMode != DeliveryMode.PERSISTENT)
+      {
+         throw ActiveMQJMSClientBundle.BUNDLE.illegalDeliveryMode(deliveryMode);
+      }
+
+      defaultDeliveryMode = deliveryMode;
+   }
+
+   public int getDeliveryMode() throws JMSException
+   {
+      checkClosed();
+
+      return defaultDeliveryMode;
+   }
+
+   public void setPriority(final int defaultPriority) throws JMSException
+   {
+      checkClosed();
+
+      if (defaultPriority < 0 || defaultPriority > 9)
+      {
+         throw new JMSException("Illegal priority value: " + defaultPriority);
+      }
+
+      this.defaultPriority = defaultPriority;
+   }
+
+   public int getPriority() throws JMSException
+   {
+      checkClosed();
+
+      return defaultPriority;
+   }
+
+   public void setTimeToLive(final long timeToLive) throws JMSException
+   {
+      checkClosed();
+
+      defaultTimeToLive = timeToLive;
+   }
+
+   public long getTimeToLive() throws JMSException
+   {
+      checkClosed();
+
+      return defaultTimeToLive;
+   }
+
+   public Destination getDestination() throws JMSException
+   {
+      checkClosed();
+
+      return defaultDestination;
+   }
+
+   public void close() throws JMSException
+   {
+      connection.getThreadAwareContext().assertNotCompletionListenerThread();
+      try
+      {
+         clientProducer.close();
+      }
+      catch (ActiveMQException e)
+      {
+         throw JMSExceptionHelper.convertFromActiveMQException(e);
+      }
+   }
+
+   public void send(final Message message) throws JMSException
+   {
+      checkDefaultDestination();
+      doSendx(defaultDestination, message, defaultDeliveryMode, defaultPriority, defaultTimeToLive, null);
+   }
+
+   public void send(final Message message,
+                    final int deliveryMode,
+                    final int priority, final long timeToLive) throws JMSException
+   {
+      checkDefaultDestination();
+      doSendx(defaultDestination, message, deliveryMode, priority, timeToLive, null);
+   }
+
+   public void send(final Destination destination, final Message message) throws JMSException
+   {
+      send(destination, message, defaultDeliveryMode, defaultPriority, defaultTimeToLive);
+   }
+
+   public void send(final Destination destination, final Message message, final int deliveryMode, final int priority,
+                    final long timeToLive) throws JMSException
+   {
+      checkClosed();
+
+      checkDestination(destination);
+
+      doSendx((ActiveMQDestination)destination, message, deliveryMode, priority, timeToLive, null);
+   }
+
+   @Override
+   public void setDeliveryDelay(long deliveryDelay) throws JMSException
+   {
+      this.defaultDeliveryDelay = deliveryDelay;
+   }
+
+   @Override
+   public long getDeliveryDelay() throws JMSException
+   {
+      return defaultDeliveryDelay;
+   }
+
+   @Override
+   public void send(Message message, CompletionListener completionListener) throws JMSException
+   {
+      send(message, defaultDeliveryMode, defaultPriority, defaultTimeToLive, completionListener);
+   }
+
+   @Override
+   public void send(Message message, int deliveryMode, int priority, long timeToLive,
+                    CompletionListener completionListener) throws JMSException
+   {
+      checkCompletionListener(completionListener);
+      checkDefaultDestination();
+      doSendx(defaultDestination, message, deliveryMode, priority, timeToLive, completionListener);
+   }
+
+   @Override
+   public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException
+   {
+      send(destination, message, defaultDeliveryMode, defaultPriority, defaultTimeToLive, completionListener);
+   }
+
+   @Override
+   public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+                    CompletionListener completionListener) throws JMSException
+   {
+      checkClosed();
+
+      checkCompletionListener(completionListener);
+
+      checkDestination(destination);
+
+      doSendx((ActiveMQDestination)destination, message, deliveryMode, priority, timeToLive, completionListener);
+   }
+
+   // TopicPublisher Implementation ---------------------------------
+
+   public Topic getTopic() throws JMSException
+   {
+      return (Topic)getDestination();
+   }
+
+   public void publish(final Message message) throws JMSException
+   {
+      send(message);
+   }
+
+   public void publish(final Topic topic, final Message message) throws JMSException
+   {
+      send(topic, message);
+   }
+
+   public void publish(final Message message, final int deliveryMode, final int priority, final long timeToLive) throws JMSException
+   {
+      send(message, deliveryMode, priority, timeToLive);
+   }
+
+   public void publish(final Topic topic, final Message message, final int deliveryMode, final int priority,
+                       final long timeToLive) throws JMSException
+   {
+      checkDestination(topic);
+      doSendx((ActiveMQDestination)topic, message, deliveryMode, priority, timeToLive, null);
+   }
+
+   // QueueSender Implementation ------------------------------------
+
+   public void send(final Queue queue, final Message message) throws JMSException
+   {
+      send((Destination)queue, message);
+   }
+
+   public void send(final Queue queue, final Message message, final int deliveryMode, final int priority,
+                    final long timeToLive) throws JMSException
+   {
+      checkDestination(queue);
+      doSendx((ActiveMQDestination)queue, message, deliveryMode, priority, timeToLive, null);
+   }
+
+   public Queue getQueue() throws JMSException
+   {
+      return (Queue)getDestination();
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public String toString()
+   {
+      return "ActiveMQMessageProducer->" + clientProducer;
+   }
+
+   /**
+    * Check if the default destination has been set
+    */
+   private void checkDefaultDestination()
+   {
+      if (defaultDestination == null)
+      {
+         throw new UnsupportedOperationException("Cannot specify destination if producer has a default destination");
+      }
+   }
+
+   /**
+    * Check if the destination is sent correctly
+    */
+   private void checkDestination(Destination destination) throws InvalidDestinationException
+   {
+      if (destination != null && !(destination instanceof ActiveMQDestination))
+      {
+         throw new InvalidDestinationException("Not a ActiveMQ Destination:" + destination);
+      }
+      if (destination != null && defaultDestination != null)
+      {
+         throw new UnsupportedOperationException("Cannot specify destination if producer has a default destination");
+      }
+      if (destination == null)
+      {
+         throw ActiveMQJMSClientBundle.BUNDLE.nullTopic();
+      }
+   }
+
+   private void checkCompletionListener(CompletionListener completionListener)
+   {
+      if (completionListener == null)
+      {
+         throw ActiveMQJMSClientBundle.BUNDLE.nullArgumentNotAllowed("CompletionListener");
+      }
+   }
+
+
+   private void doSendx(ActiveMQDestination destination, final Message jmsMessage, final int deliveryMode,
+                        final int priority, final long timeToLive,
+                        CompletionListener completionListener) throws JMSException
+   {
+
+      jmsMessage.setJMSDeliveryMode(deliveryMode);
+
+      jmsMessage.setJMSPriority(priority);
+
+
+      if (timeToLive == 0)
+      {
+         jmsMessage.setJMSExpiration(0);
+      }
+      else
+      {
+         jmsMessage.setJMSExpiration(System.currentTimeMillis() + timeToLive);
+      }
+
+      if (!disableMessageTimestamp)
+      {
+         jmsMessage.setJMSTimestamp(System.currentTimeMillis());
+      }
+      else
+      {
+         jmsMessage.setJMSTimestamp(0);
+      }
+
+      SimpleString address = null;
+
+      if (destination == null)
+      {
+         if (defaultDestination == null)
+         {
+            throw new UnsupportedOperationException("Destination must be specified on send with an anonymous producer");
+         }
+
+         destination = defaultDestination;
+      }
+      else
+      {
+         if (defaultDestination != null)
+         {
+            if (!destination.equals(defaultDestination))
+            {
+               throw new UnsupportedOperationException("Where a default destination is specified " + "for the sender and a destination is "
+                                                          + "specified in the arguments to the send, "
+                                                          + "these destinations must be equal");
+            }
+         }
+
+         address = destination.getSimpleAddress();
+
+         if (!connection.containsKnownDestination(address))
+         {
+            try
+            {
+               ClientSession.AddressQuery query = clientSession.addressQuery(address);
+               if (!query.isExists())
+               {
+                  throw new InvalidDestinationException("Destination " + address + " does not exist");
+               }
+               else
+               {
+                  connection.addKnownDestination(address);
+               }
+            }
+            catch (ActiveMQException e)
+            {
+               throw JMSExceptionHelper.convertFromActiveMQException(e);
+            }
+         }
+      }
+
+      ActiveMQMessage activeMQJmsMessage;
+
+      boolean foreign = false;
+
+      // First convert from foreign message if appropriate
+      if (!(jmsMessage instanceof ActiveMQMessage))
+      {
+         // JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client,
+         // a message whose implementation is not one of its own.
+
+         if (jmsMessage instanceof BytesMessage)
+         {
+            activeMQJmsMessage = new ActiveMQBytesMessage((BytesMessage)jmsMessage, clientSession);
+         }
+         else if (jmsMessage instanceof MapMessage)
+         {
+            activeMQJmsMessage = new ActiveMQMapMessage((MapMessage)jmsMessage, clientSession);
+         }
+         else if (jmsMessage instanceof ObjectMessage)
+         {
+            activeMQJmsMessage = new ActiveMQObjectMessage((ObjectMessage)jmsMessage, clientSession);
+         }
+         else if (jmsMessage instanceof StreamMessage)
+         {
+            activeMQJmsMessage = new ActiveMQStreamMessage((StreamMessage)jmsMessage, clientSession);
+         }
+         else if (jmsMessage instanceof TextMessage)
+         {
+            activeMQJmsMessage = new ActiveMQTextMessage((TextMessage)jmsMessage, clientSession);
+         }
+         else
+         {
+            activeMQJmsMessage = new ActiveMQMessage(jmsMessage, clientSession);
+         }
+
+         // Set the destination on the original message
+         jmsMessage.setJMSDestination(destination);
+
+         foreign = true;
+      }
+      else
+      {
+         activeMQJmsMessage = (ActiveMQMessage)jmsMessage;
+      }
+
+      if (!disableMessageID)
+      {
+         // Generate a JMS id
+
+         UUID uid = UUIDGenerator.getInstance().generateUUID();
+
+         activeMQJmsMessage.getCoreMessage().setUserID(uid);
+
+         activeMQJmsMessage.resetMessageID(null);
+      }
+
+      if (foreign)
+      {
+         jmsMessage.setJMSMessageID(activeMQJmsMessage.getJMSMessageID());
+      }
+
+      activeMQJmsMessage.setJMSDestination(destination);
+
+      try
+      {
+         activeMQJmsMessage.doBeforeSend();
+      }
+      catch (Exception e)
+      {
+         JMSException je = new JMSException(e.getMessage());
+
+         je.initCause(e);
+
+         throw je;
+      }
+
+      if (defaultDeliveryDelay > 0)
+      {
+         activeMQJmsMessage.setJMSDeliveryTime(System.currentTimeMillis() + defaultDeliveryDelay);
+      }
+
+      ClientMessage coreMessage = activeMQJmsMessage.getCoreMessage();
+      coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID);
+
+      try
+      {
+         /**
+          * Using a completionListener requires wrapping using a {@link CompletionListenerWrapper},
+          * so we avoid it if we can.
+          */
+         if (completionListener != null)
+         {
+            clientProducer.send(address, coreMessage, new CompletionListenerWrapper(completionListener, jmsMessage, this));
+         }
+         else
+         {
+            clientProducer.send(address, coreMessage);
+         }
+      }
+      catch (ActiveMQException e)
+      {
+         throw JMSExceptionHelper.convertFromActiveMQException(e);
+      }
+   }
+
+   private void checkClosed() throws JMSException
+   {
+      if (clientProducer.isClosed() || clientSession.isClosed())
+      {
+         throw new IllegalStateException("Producer is closed");
+      }
+   }
+
+   private static final class CompletionListenerWrapper implements SendAcknowledgementHandler
+   {
+      private final CompletionListener completionListener;
+      private final Message jmsMessage;
+      private final ActiveMQMessageProducer producer;
+
+      /**
+       * @param jmsMessage
+       * @param producer
+       */
+      public CompletionListenerWrapper(CompletionListener listener, Message jmsMessage, ActiveMQMessageProducer producer)
+      {
+         this.completionListener = listener;
+         this.jmsMessage = jmsMessage;
+         this.producer = producer;
+      }
+
+      @Override
+      public void sendAcknowledged(org.apache.activemq.api.core.Message clientMessage)
+      {
+         if (jmsMessage instanceof StreamMessage)
+         {
+            try
+            {
+               ((StreamMessage)jmsMessage).reset();
+            }
+            catch (JMSException e)
+            {
+               // HORNETQ-1209 XXX ignore?
+            }
+         }
+         if (jmsMessage instanceof BytesMessage)
+         {
+            try
+            {
+               ((BytesMessage)jmsMessage).reset();
+            }
+            catch (JMSException e)
+            {
+               // HORNETQ-1209 XXX ignore?
+            }
+         }
+
+         try
+         {
+            producer.connection.getThreadAwareContext().setCurrentThread(true);
+            completionListener.onCompletion(jmsMessage);
+         }
+         finally
+         {
+            producer.connection.getThreadAwareContext().clearCurrentThread(true);
+         }
+      }
+
+      @Override
+      public String toString()
+      {
+         return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")";
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQObjectMessage.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQObjectMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQObjectMessage.java
new file mode 100644
index 0000000..8e7a1aa
--- /dev/null
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQObjectMessage.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.jms.client;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.ObjectMessage;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.Message;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientSession;
+
+/**
+ * ActiveMQ implementation of a JMS ObjectMessage.
+ * <br>
+ * Don't used ObjectMessage if you want good performance!
+ * <p>
+ * Serialization is slooooow!
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ */
+public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage
+{
+   // Constants -----------------------------------------------------
+
+   public static final byte TYPE = Message.OBJECT_TYPE;
+
+   // Attributes ----------------------------------------------------
+
+   // keep a snapshot of the Serializable Object as a byte[] to provide Object isolation
+   private byte[] data;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   protected ActiveMQObjectMessage(final ClientSession session)
+   {
+      super(ActiveMQObjectMessage.TYPE, session);
+   }
+
+   protected ActiveMQObjectMessage(final ClientMessage message, final ClientSession session)
+   {
+      super(message, session);
+   }
+
+   /**
+    * A copy constructor for foreign JMS ObjectMessages.
+    */
+   public ActiveMQObjectMessage(final ObjectMessage foreign, final ClientSession session) throws JMSException
+   {
+      super(foreign, ActiveMQObjectMessage.TYPE, session);
+
+      setObject(foreign.getObject());
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public byte getType()
+   {
+      return ActiveMQObjectMessage.TYPE;
+   }
+
+   @Override
+   public void doBeforeSend() throws Exception
+   {
+      message.getBodyBuffer().clear();
+      if (data != null)
+      {
+         message.getBodyBuffer().writeInt(data.length);
+         message.getBodyBuffer().writeBytes(data);
+      }
+
+      super.doBeforeSend();
+   }
+
+   @Override
+   public void doBeforeReceive() throws ActiveMQException
+   {
+      super.doBeforeReceive();
+      try
+      {
+         int len = message.getBodyBuffer().readInt();
+         data = new byte[len];
+         message.getBodyBuffer().readBytes(data);
+      }
+      catch (Exception e)
+      {
+         data = null;
+      }
+
+   }
+
+   // ObjectMessage implementation ----------------------------------
+
+   public void setObject(final Serializable object) throws JMSException
+   {
+      checkWrite();
+
+      if (object != null)
+      {
+         try
+         {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+            oos.writeObject(object);
+
+            oos.flush();
+
+            data = baos.toByteArray();
+         }
+         catch (Exception e)
+         {
+            JMSException je = new JMSException("Failed to serialize object");
+            je.setLinkedException(e);
+            je.initCause(e);
+            throw je;
+         }
+      }
+   }
+
+   // lazy deserialize the Object the first time the client requests it
+   public Serializable getObject() throws JMSException
+   {
+      if (data == null || data.length == 0)
+      {
+         return null;
+      }
+
+      try
+      {
+         ByteArrayInputStream bais = new ByteArrayInputStream(data);
+         ObjectInputStream ois = new org.apache.activemq.utils.ObjectInputStreamWithClassLoader(bais);
+         Serializable object = (Serializable)ois.readObject();
+         return object;
+      }
+      catch (Exception e)
+      {
+         JMSException je = new JMSException(e.getMessage());
+         je.setStackTrace(e.getStackTrace());
+         throw je;
+      }
+   }
+
+   @Override
+   public void clearBody() throws JMSException
+   {
+      super.clearBody();
+
+      data = null;
+   }
+
+   @Override
+   protected <T> T getBodyInternal(Class<T> c) throws MessageFormatException
+   {
+      try
+      {
+         return (T)getObject();
+      }
+      catch (JMSException e)
+      {
+         throw new MessageFormatException("Deserialization error on ActiveMQObjectMessage");
+      }
+   }
+
+   @Override
+   public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes")
+                                     Class c)
+   {
+      if (data == null) // we have no body
+         return true;
+      try
+      {
+         return Serializable.class == c || Object.class == c || c.isInstance(getObject());
+      }
+      catch (JMSException e)
+      {
+         return false;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueue.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueue.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueue.java
new file mode 100644
index 0000000..6bb32bd
--- /dev/null
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueue.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.jms.client;
+
+import javax.jms.Queue;
+
+import org.apache.activemq.api.core.SimpleString;
+
+/**
+ * ActiveMQ implementation of a JMS Queue.
+ * <br>
+ * This class can be instantiated directly.
+ *
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 8737 $</tt>
+ *
+ */
+public class ActiveMQQueue extends ActiveMQDestination implements Queue
+{
+   // Constants -----------------------------------------------------
+   private static final long serialVersionUID = -1106092883162295462L;
+
+   // Static --------------------------------------------------------
+
+   public static SimpleString createAddressFromName(final String name)
+   {
+      return new SimpleString(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX + name);
+   }
+
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ActiveMQQueue(final String name)
+   {
+      super(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX + name, name, false, true, null);
+   }
+
+
+
+   /**
+    * @param address
+    * @param name
+    * @param temporary
+    * @param session
+    */
+   public ActiveMQQueue(String address, String name, boolean temporary, ActiveMQSession session)
+   {
+      super(address, name, temporary, true, session);
+   }
+
+   public ActiveMQQueue(final String address, final String name)
+   {
+      super(address, name, false, true, null);
+   }
+
+   // Queue implementation ------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public String getQueueName()
+   {
+      return name;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "ActiveMQQueue[" + name + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueBrowser.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueBrowser.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueBrowser.java
new file mode 100644
index 0000000..eacff5f
--- /dev/null
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueBrowser.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.jms.client;
+
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientConsumer;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientSession;
+
+/**
+ * ActiveMQ implementation of a JMS QueueBrowser.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *
+ */
+public final class ActiveMQQueueBrowser implements QueueBrowser
+{
+   // Constants ------------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private final ClientSession session;
+
+   private ClientConsumer consumer;
+
+   private final ActiveMQQueue queue;
+
+   private SimpleString filterString;
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   protected ActiveMQQueueBrowser(final ActiveMQQueue queue, final String messageSelector, final ClientSession session) throws JMSException
+   {
+      this.session = session;
+      this.queue = queue;
+      if (messageSelector != null)
+      {
+         filterString = new SimpleString(SelectorTranslator.convertToActiveMQFilterString(messageSelector));
+      }
+   }
+
+   // QueueBrowser implementation -------------------------------------------------------------------
+
+   public void close() throws JMSException
+   {
+      if (consumer != null)
+      {
+         try
+         {
+            consumer.close();
+         }
+         catch (ActiveMQException e)
+         {
+            throw JMSExceptionHelper.convertFromActiveMQException(e);
+         }
+      }
+   }
+
+   public Enumeration getEnumeration() throws JMSException
+   {
+      try
+      {
+         close();
+
+         consumer = session.createConsumer(queue.getSimpleAddress(), filterString, true);
+
+         return new BrowserEnumeration();
+      }
+      catch (ActiveMQException e)
+      {
+         throw JMSExceptionHelper.convertFromActiveMQException(e);
+      }
+
+   }
+
+   public String getMessageSelector() throws JMSException
+   {
+      return filterString == null ? null : filterString.toString();
+   }
+
+   public Queue getQueue() throws JMSException
+   {
+      return queue;
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   @Override
+   public String toString()
+   {
+      return "ActiveMQQueueBrowser->" + consumer;
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+   private final class BrowserEnumeration implements Enumeration<ActiveMQMessage>
+   {
+      ClientMessage current = null;
+
+      public boolean hasMoreElements()
+      {
+         if (current == null)
+         {
+            try
+            {
+               current = consumer.receiveImmediate();
+            }
+            catch (ActiveMQException e)
+            {
+               return false;
+            }
+         }
+         return current != null;
+      }
+
+      public ActiveMQMessage nextElement()
+      {
+         ActiveMQMessage msg;
+         if (hasMoreElements())
+         {
+            ClientMessage next = current;
+            current = null;
+            msg = ActiveMQMessage.createMessage(next, session);
+            try
+            {
+               msg.doBeforeReceive();
+            }
+            catch (Exception e)
+            {
+               ActiveMQJMSClientLogger.LOGGER.errorCreatingMessage(e);
+
+               return null;
+            }
+            return msg;
+         }
+         else
+         {
+            throw new NoSuchElementException();
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueConnectionFactory.java
new file mode 100644
index 0000000..84f5fa9
--- /dev/null
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQQueueConnectionFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.jms.client;
+
+import javax.jms.QueueConnectionFactory;
+
+import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.api.jms.JMSFactoryType;
+
+/**
+ * A class that represents a QueueConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class ActiveMQQueueConnectionFactory extends ActiveMQConnectionFactory implements QueueConnectionFactory
+{
+   private static final long serialVersionUID = 5312455021322463546L;
+
+   /**
+    *
+    */
+   public ActiveMQQueueConnectionFactory()
+   {
+      super();
+   }
+
+   /**
+    * @param serverLocator
+    */
+   public ActiveMQQueueConnectionFactory(ServerLocator serverLocator)
+   {
+      super(serverLocator);
+   }
+
+   /**
+    * @param ha
+    * @param groupConfiguration
+    */
+   public ActiveMQQueueConnectionFactory(boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
+   {
+      super(ha, groupConfiguration);
+   }
+
+   /**
+    * @param ha
+    * @param initialConnectors
+    */
+   public ActiveMQQueueConnectionFactory(boolean ha, TransportConfiguration... initialConnectors)
+   {
+      super(ha, initialConnectors);
+   }
+
+   public int getFactoryType()
+   {
+      return JMSFactoryType.QUEUE_CF.intValue();
+   }
+}


Mime
View raw message