activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [21/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:51 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/SubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/SubscriptionInfo.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/SubscriptionInfo.java
new file mode 100644
index 0000000..8bffddf
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/SubscriptionInfo.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.api.jms.management;
+
+import org.apache.activemq6.utils.json.JSONArray;
+import org.apache.activemq6.utils.json.JSONObject;
+
+/**
+ * Helper class to create Java Objects from the
+ * JSON serialization returned by {@link TopicControl#listAllSubscriptionsAsJSON()} and related methods.
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class SubscriptionInfo
+{
+   private final String queueName;
+
+   private final String clientID;
+
+   private final String name;
+
+   private final boolean durable;
+
+   private final String selector;
+
+   private final int messageCount;
+
+   private final int deliveringCount;
+
+   // Static --------------------------------------------------------
+
+   /**
+    * Returns an array of SubscriptionInfo corresponding to the JSON serialization returned
+    * by {@link TopicControl#listAllSubscriptionsAsJSON()} and related methods.
+    */
+   public static SubscriptionInfo[] from(final String jsonString) throws Exception
+   {
+      JSONArray array = new JSONArray(jsonString);
+      SubscriptionInfo[] infos = new SubscriptionInfo[array.length()];
+      for (int i = 0; i < array.length(); i++)
+      {
+         JSONObject sub = array.getJSONObject(i);
+         SubscriptionInfo info = new SubscriptionInfo(sub.getString("queueName"),
+                                                      sub.optString("clientID", null),
+                                                      sub.optString("name", null),
+                                                      sub.getBoolean("durable"),
+                                                      sub.optString("selector", null),
+                                                      sub.getInt("messageCount"),
+                                                      sub.getInt("deliveringCount"));
+         infos[i] = info;
+      }
+
+      return infos;
+   }
+
+   // Constructors --------------------------------------------------
+
+   private SubscriptionInfo(final String queueName,
+                            final String clientID,
+                            final String name,
+                            final boolean durable,
+                            final String selector,
+                            final int messageCount,
+                            final int deliveringCount)
+   {
+      this.queueName = queueName;
+      this.clientID = clientID;
+      this.name = name;
+      this.durable = durable;
+      this.selector = selector;
+      this.messageCount = messageCount;
+      this.deliveringCount = deliveringCount;
+   }
+
+   // Public --------------------------------------------------------
+
+   /**
+    * Returns the name of the HornetQ core queue corresponding to this subscription.
+    */
+   public String getQueueName()
+   {
+      return queueName;
+   }
+
+   /**
+    * Returns the client ID of this subscription or {@code null}.
+    */
+   public String getClientID()
+   {
+      return clientID;
+   }
+
+   /**
+    * Returns the name of this subscription.
+    */
+   public String getName()
+   {
+      return name;
+   }
+
+   /**
+    * Returns whether this subscription is durable.
+    */
+   public boolean isDurable()
+   {
+      return durable;
+   }
+
+   /**
+    * Returns the JMS message selector associated to this subscription.
+    */
+   public String getSelector()
+   {
+      return selector;
+   }
+
+   /**
+    * Returns the number of messages currently held by this subscription.
+    */
+   public int getMessageCount()
+   {
+      return messageCount;
+   }
+
+   /**
+    * Returns the number of messages currently delivered to this subscription.
+    */
+   public int getDeliveringCount()
+   {
+      return deliveringCount;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/TopicControl.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/TopicControl.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/TopicControl.java
new file mode 100644
index 0000000..1ef88c8
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/TopicControl.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.api.jms.management;
+
+import java.util.Map;
+
+import javax.management.MBeanOperationInfo;
+
+import org.apache.activemq6.api.core.management.Operation;
+import org.apache.activemq6.api.core.management.Parameter;
+
+/**
+ * A TopicControl is used to manage a JMS Topic.
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public interface TopicControl extends DestinationControl
+{
+
+   /**
+    * Returns the number of (durable and non-durable) subscribers for this topic.
+    */
+   int getSubscriptionCount();
+
+   /**
+    * Returns the number of <em>durable</em> subscribers for this topic.
+    */
+   int getDurableSubscriptionCount();
+
+   /**
+    * Returns the number of <em>non-durable</em> subscribers for this topic.
+    */
+   int getNonDurableSubscriptionCount();
+
+   /**
+    * Returns the number of messages for all <em>durable</em> subscribers for this topic.
+    */
+   int getDurableMessageCount();
+
+   /**
+    * Returns the number of messages for all <em>non-durable</em> subscribers for this topic.
+    */
+   int getNonDurableMessageCount();
+
+   /**
+    * Returns the JNDI bindings associated  to this connection factory.
+    */
+   @Operation(desc = "Returns the list of JNDI bindings associated")
+   String[] getJNDIBindings();
+
+   /**
+    * Add the JNDI binding to this destination
+    */
+   @Operation(desc = "Adds the queue to another JNDI binding")
+   void addJNDI(@Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndi) throws Exception;
+
+
+
+   // Operations ----------------------------------------------------
+
+   /**
+    * Lists all the subscriptions for this topic (both durable and non-durable).
+    */
+   @Operation(desc = "List all subscriptions")
+   Object[] listAllSubscriptions() throws Exception;
+
+   /**
+    * Lists all the subscriptions for this topic (both durable and non-durable) using JSON serialization.
+    * <br>
+    * Java objects can be recreated from JSON serialization using {@link SubscriptionInfo#from(String)}.
+    */
+   @Operation(desc = "List all subscriptions")
+   String listAllSubscriptionsAsJSON() throws Exception;
+
+   /**
+    * Lists all the <em>durable</em> subscriptions for this topic.
+    */
+   @Operation(desc = "List only the durable subscriptions")
+   Object[] listDurableSubscriptions() throws Exception;
+
+   /**
+    * Lists all the <em>durable</em> subscriptions  using JSON serialization.
+    * <br>
+    * Java objects can be recreated from JSON serialization using {@link SubscriptionInfo#from(String)}.
+    */
+   @Operation(desc = "List only the durable subscriptions")
+   String listDurableSubscriptionsAsJSON() throws Exception;
+
+   /**
+    * Lists all the <em>non-durable</em> subscriptions for this topic.
+    */
+   @Operation(desc = "List only the non durable subscriptions")
+   Object[] listNonDurableSubscriptions() throws Exception;
+
+   /**
+    * Lists all the <em>non-durable</em> subscriptions  using JSON serialization.
+    * <br>
+    * Java objects can be recreated from JSON serialization using {@link SubscriptionInfo#from(String)}.
+    */
+   @Operation(desc = "List only the non durable subscriptions")
+   String listNonDurableSubscriptionsAsJSON() throws Exception;
+
+   /**
+    * Lists all the messages in this queue matching the specified queue representing the subscription.
+    * <br>
+    * 1 Map represents 1 message, keys are the message's properties and headers, values are the corresponding values.
+    */
+   @Operation(desc = "List all the message for the given subscription")
+   Map<String, Object>[] listMessagesForSubscription(@Parameter(name = "queueName", desc = "the name of the queue representing a subscription") String queueName) throws Exception;
+
+   /**
+    * Lists all the messages in this queue matching the specified queue representing the subscription using JSON serialization.
+    */
+   @Operation(desc = "List all the message for the given subscription")
+   String listMessagesForSubscriptionAsJSON(@Parameter(name = "queueName", desc = "the name of the queue representing a subscription") String queueName) throws Exception;
+
+   /**
+    * Counts the number of messages in the subscription specified by the specified client ID and subscription name. Only messages matching the filter will be counted.
+    * <br>
+    * Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
+    */
+   @Operation(desc = "Count the number of messages matching the filter for the given subscription")
+   int countMessagesForSubscription(@Parameter(name = "clientID", desc = "the client ID") String clientID,
+                                           @Parameter(name = "subscriptionName", desc = "the name of the durable subscription") String subscriptionName,
+                                           @Parameter(name = "filter", desc = "a JMS filter (can be empty)") String filter) throws Exception;
+
+   /**
+    * Drops the subscription specified by the specified client ID and subscription name.
+    */
+   @Operation(desc = "Drop a durable subscription", impact = MBeanOperationInfo.ACTION)
+   void dropDurableSubscription(@Parameter(name = "clientID", desc = "the client ID") String clientID,
+                                @Parameter(name = "subscriptionName", desc = "the name of the durable subscription") String subscriptionName) throws Exception;
+
+   /**
+    * Drops all subscriptions.
+    */
+   @Operation(desc = "Drop all subscriptions from this topic", impact = MBeanOperationInfo.ACTION)
+   void dropAllSubscriptions() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/package-info.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/package-info.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/package-info.java
new file mode 100644
index 0000000..1e75322
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+/**
+ * Management API for HornetQ JMS resources.
+ * <br>
+ * HornetQ JMS resources can be managed either using JMX or by sending JMS management messages to the
+ * server's special management address. Please refer to the user manual for more information.
+ */
+package org.apache.activemq6.api.jms.management;
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/package-info.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/package-info.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/package-info.java
new file mode 100644
index 0000000..ef279c2
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+/**
+ * API to create HornetQ JMS resources.
+ * <br>
+ * This package contains classes to create
+ * HornetQ JMS managed resources (ConnectionFactory, Queue and Topic).
+ *
+ */
+package org.apache.activemq6.api.jms;
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQBytesMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQBytesMessage.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQBytesMessage.java
new file mode 100644
index 0000000..c916af2
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQBytesMessage.java
@@ -0,0 +1,436 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.client.ClientMessage;
+import org.apache.activemq6.api.core.client.ClientSession;
+import org.apache.activemq6.core.message.impl.MessageImpl;
+
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesMessageReset;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadBoolean;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadByte;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadBytes;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadChar;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadDouble;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadFloat;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadInt;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadLong;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadShort;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadUTF;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadUnsignedByte;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadUnsignedShort;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteBoolean;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteByte;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteBytes;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteChar;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteDouble;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteFloat;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteInt;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteLong;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteObject;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteShort;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteUTF;
+
+/**
+ * HornetQ implementation of a JMS {@link BytesMessage}.
+ *
+ * @author Norbert Lataille (Norbert.Lataille@m4x.org)
+ * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ */
+public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
+{
+   // Static -------------------------------------------------------
+   public static final byte TYPE = Message.BYTES_TYPE;
+
+   // Attributes ----------------------------------------------------
+
+   private int bodyLength;
+
+   // Constructor ---------------------------------------------------
+
+   /**
+    * This constructor is used to construct messages prior to sending
+    */
+   protected HornetQBytesMessage(final ClientSession session)
+   {
+      super(HornetQBytesMessage.TYPE, session);
+   }
+
+   /**
+    * Constructor on receipt at client side
+    */
+   protected HornetQBytesMessage(final ClientMessage message, final ClientSession session)
+   {
+      super(message, session);
+   }
+
+   /**
+    * Foreign message constructor
+    */
+   public HornetQBytesMessage(final BytesMessage foreign, final ClientSession session) throws JMSException
+   {
+      super(foreign, HornetQBytesMessage.TYPE, session);
+
+      foreign.reset();
+
+      byte[] buffer = new byte[1024];
+      int n = foreign.readBytes(buffer);
+      while (n != -1)
+      {
+         writeBytes(buffer, 0, n);
+         n = foreign.readBytes(buffer);
+      }
+   }
+
+   // BytesMessage implementation -----------------------------------
+
+   public boolean readBoolean() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return bytesReadBoolean(message);
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public byte readByte() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return bytesReadByte(message);
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public int readUnsignedByte() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return bytesReadUnsignedByte(message);
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public short readShort() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return bytesReadShort(message);
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public int readUnsignedShort() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return bytesReadUnsignedShort(message);
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public char readChar() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return bytesReadChar(message);
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public int readInt() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return bytesReadInt(message);
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public long readLong() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return bytesReadLong(message);
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public float readFloat() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return bytesReadFloat(message);
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public double readDouble() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return bytesReadDouble(message);
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public String readUTF() throws JMSException
+   {
+      checkRead();
+      try
+      {
+         return bytesReadUTF(message);
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+      catch (Exception e)
+      {
+         JMSException je = new JMSException("Failed to get UTF");
+         je.setLinkedException(e);
+         je.initCause(e);
+         throw je;
+      }
+   }
+
+   public int readBytes(final byte[] value) throws JMSException
+   {
+      checkRead();
+      return bytesReadBytes(message, value);
+   }
+
+   public int readBytes(final byte[] value, final int length) throws JMSException
+   {
+      checkRead();
+      return bytesReadBytes(message, value, length);
+
+   }
+
+   public void writeBoolean(final boolean value) throws JMSException
+   {
+      checkWrite();
+      bytesWriteBoolean(message, value);
+   }
+
+   public void writeByte(final byte value) throws JMSException
+   {
+      checkWrite();
+      bytesWriteByte(message, value);
+   }
+
+   public void writeShort(final short value) throws JMSException
+   {
+      checkWrite();
+      bytesWriteShort(message, value);
+   }
+
+   public void writeChar(final char value) throws JMSException
+   {
+      checkWrite();
+      bytesWriteChar(message, value);
+   }
+
+   public void writeInt(final int value) throws JMSException
+   {
+      checkWrite();
+      bytesWriteInt(message, value);
+   }
+
+   public void writeLong(final long value) throws JMSException
+   {
+      checkWrite();
+      bytesWriteLong(message, value);
+   }
+
+   public void writeFloat(final float value) throws JMSException
+   {
+      checkWrite();
+      bytesWriteFloat(message, value);
+   }
+
+   public void writeDouble(final double value) throws JMSException
+   {
+      checkWrite();
+      bytesWriteDouble(message, value);
+   }
+
+   public void writeUTF(final String value) throws JMSException
+   {
+      checkWrite();
+      try
+      {
+         bytesWriteUTF(message, value);
+      }
+      catch (Exception e)
+      {
+         JMSException je = new JMSException("Failed to write UTF");
+         je.setLinkedException(e);
+         je.initCause(e);
+         throw je;
+      }
+
+   }
+
+   public void writeBytes(final byte[] value) throws JMSException
+   {
+      checkWrite();
+      bytesWriteBytes(message, value);
+   }
+
+   public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException
+   {
+      checkWrite();
+      bytesWriteBytes(message, value, offset, length);
+   }
+
+   public void writeObject(final Object value) throws JMSException
+   {
+      checkWrite();
+      if (!bytesWriteObject(message, value))
+      {
+         throw new MessageFormatException("Invalid object for properties");
+      }
+   }
+
+   public void reset() throws JMSException
+   {
+      if (!readOnly)
+      {
+         readOnly = true;
+
+         bodyLength = message.getBodySize();
+      }
+
+      bytesMessageReset(message);
+   }
+
+   @Override
+   public void doBeforeReceive() throws HornetQException
+   {
+      bodyLength = message.getBodySize();
+   }
+
+   // HornetQRAMessage overrides ----------------------------------------
+
+   @Override
+   public void clearBody() throws JMSException
+   {
+      super.clearBody();
+
+      try
+      {
+         getBuffer().clear();
+      }
+      catch (RuntimeException e)
+      {
+         JMSException e2 = new JMSException(e.getMessage());
+         e2.initCause(e);
+         throw e2;
+      }
+   }
+
+   public long getBodyLength() throws JMSException
+   {
+      checkRead();
+
+      return bodyLength;
+   }
+
+   @Override
+   public void doBeforeSend() throws Exception
+   {
+      reset();
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public byte getType()
+   {
+      return HornetQBytesMessage.TYPE;
+   }
+
+   private HornetQBuffer getBuffer()
+   {
+      return message.getBodyBuffer();
+   }
+
+   @Override
+   public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes")
+                                     Class c)
+   {
+      return c.isAssignableFrom(byte[].class);
+   }
+
+   @Override
+   protected <T> T getBodyInternal(Class<T> c)
+   {
+      if (bodyLength == 0)
+         return null;
+      byte[] dst = new byte[bodyLength];
+      message.getBodyBuffer().getBytes(MessageImpl.BODY_OFFSET, dst);
+      return (T)dst;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnection.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnection.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnection.java
new file mode 100644
index 0000000..bd59895
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnection.java
@@ -0,0 +1,862 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidClientIDException;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import java.lang.ref.WeakReference;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQExceptionType;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.api.core.client.ClientSession;
+import org.apache.activemq6.api.core.client.ClientSessionFactory;
+import org.apache.activemq6.api.core.client.FailoverEventListener;
+import org.apache.activemq6.api.core.client.FailoverEventType;
+import org.apache.activemq6.api.core.client.SessionFailureListener;
+import org.apache.activemq6.api.jms.HornetQJMSConstants;
+import org.apache.activemq6.core.client.impl.ClientSessionInternal;
+import org.apache.activemq6.core.version.Version;
+import org.apache.activemq6.reader.MessageUtil;
+import org.apache.activemq6.utils.ConcurrentHashSet;
+import org.apache.activemq6.utils.UUIDGenerator;
+import org.apache.activemq6.utils.VersionLoader;
+
+/**
+ * HornetQ implementation of a JMS Connection.
+ * <p>
+ * The flat implementation of {@link TopicConnection} and {@link QueueConnection} is per design,
+ * following the common usage of these as one flat API in JMS 1.1.
+ *
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ */
+public class HornetQConnection extends HornetQConnectionForContextImpl implements TopicConnection, QueueConnection
+{
+   // Constants ------------------------------------------------------------------------------------
+   public static final int TYPE_GENERIC_CONNECTION = 0;
+
+   public static final int TYPE_QUEUE_CONNECTION = 1;
+
+   public static final int TYPE_TOPIC_CONNECTION = 2;
+
+   public static final String EXCEPTION_FAILOVER = "FAILOVER";
+
+   public static final String EXCEPTION_DISCONNECT = "DISCONNECT";
+
+   public static final SimpleString CONNECTION_ID_PROPERTY_NAME = MessageUtil.CONNECTION_ID_PROPERTY_NAME;
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private final int connectionType;
+
+   private final Set<HornetQSession> sessions = new org.apache.activemq6.utils.ConcurrentHashSet<HornetQSession>();
+
+   private final Set<SimpleString> tempQueues = new org.apache.activemq6.utils.ConcurrentHashSet<SimpleString>();
+
+   private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<SimpleString>();
+
+   private volatile boolean hasNoLocal;
+
+   private volatile ExceptionListener exceptionListener;
+
+   private volatile FailoverEventListener failoverEventListener;
+
+   private volatile boolean justCreated = true;
+
+   private volatile ConnectionMetaData metaData;
+
+   private volatile boolean closed;
+
+   private volatile boolean started;
+
+   private String clientID;
+
+   private final ClientSessionFactory sessionFactory;
+
+   private final SimpleString uid;
+
+   private final String username;
+
+   private final String password;
+
+   private final SessionFailureListener listener = new JMSFailureListener(this);
+
+   private final FailoverEventListener failoverListener = new FailoverEventListenerImpl(this);
+
+   private final Version thisVersion;
+
+   private final int dupsOKBatchSize;
+
+   private final int transactionBatchSize;
+
+   private ClientSession initialSession;
+
+   private final Exception creationStack;
+
+   private HornetQConnectionFactory factoryReference;
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public HornetQConnection(final String username, final String password, final int connectionType,
+                            final String clientID, final int dupsOKBatchSize, final int transactionBatchSize,
+                            final ClientSessionFactory sessionFactory)
+   {
+      this.username = username;
+
+      this.password = password;
+
+      this.connectionType = connectionType;
+
+      this.clientID = clientID;
+
+      this.sessionFactory = sessionFactory;
+
+      uid = UUIDGenerator.getInstance().generateSimpleStringUUID();
+
+      thisVersion = VersionLoader.getVersion();
+
+      this.dupsOKBatchSize = dupsOKBatchSize;
+
+      this.transactionBatchSize = transactionBatchSize;
+
+      creationStack = new Exception();
+   }
+
+   /**
+    * This internal method serves basically the Resource Adapter.
+    * The resource adapter plays with an XASession and a non XASession.
+    * When there is no enlisted transaction, the EE specification mandates that the commit should
+    * be done as if it was a nonXA Session (i.e. SessionTransacted).
+    * For that reason we have this method to force that nonXASession, since the JMS Javadoc
+    * mandates createSession to return a XASession.
+    */
+   public Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException
+   {
+      checkClosed();
+
+      return createSessionInternal(false, transacted, acknowledgeMode, HornetQConnection.TYPE_GENERIC_CONNECTION);
+   }
+
+   /**
+    * This internal method serves basically the Resource Adapter.
+    * The resource adapter plays with an XASession and a non XASession.
+    * When there is no enlisted transaction, the EE specification mandates that the commit should
+    * be done as if it was a nonXA Session (i.e. SessionTransacted).
+    * For that reason we have this method to force that nonXASession, since the JMS Javadoc
+    * mandates createSession to return a XASession.
+    */
+   public Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+   {
+      checkClosed();
+
+      return createSessionInternal(false, transacted, acknowledgeMode, HornetQConnection.TYPE_TOPIC_CONNECTION);
+   }
+
+   /**
+    * This internal method serves basically the Resource Adapter.
+    * The resource adapter plays with an XASession and a non XASession.
+    * When there is no enlisted transaction, the EE specification mandates that the commit should
+    * be done as if it was a nonXA Session (i.e. SessionTransacted).
+    * For that reason we have this method to force that nonXASession, since the JMS Javadoc
+    * mandates createSession to return a XASession.
+    */
+   public Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+   {
+      checkClosed();
+
+      return createSessionInternal(false, transacted, acknowledgeMode, HornetQConnection.TYPE_QUEUE_CONNECTION);
+   }
+
+
+   // Connection implementation --------------------------------------------------------------------
+
+   public synchronized Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+   {
+      checkClosed();
+
+      return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), HornetQConnection.TYPE_GENERIC_CONNECTION);
+   }
+
+   public String getClientID() throws JMSException
+   {
+      checkClosed();
+
+      return clientID;
+   }
+
+   public void setClientID(final String clientID) throws JMSException
+   {
+      checkClosed();
+
+      if (this.clientID != null)
+      {
+         throw new IllegalStateException("Client id has already been set");
+      }
+
+      if (!justCreated)
+      {
+         throw new IllegalStateException("setClientID can only be called directly after the connection is created");
+      }
+
+      try
+      {
+         initialSession.addUniqueMetaData("jms-client-id", clientID);
+      }
+      catch (HornetQException e)
+      {
+         if (e.getType() == HornetQExceptionType.DUPLICATE_METADATA)
+         {
+            throw new InvalidClientIDException("clientID=" + clientID + " was already set into another connection");
+         }
+      }
+
+      this.clientID = clientID;
+      try
+      {
+         this.addSessionMetaData(initialSession);
+      }
+      catch (HornetQException e)
+      {
+         JMSException ex = new JMSException("Internal error setting metadata jms-client-id");
+         ex.setLinkedException(e);
+         ex.initCause(e);
+         throw ex;
+      }
+
+      justCreated = false;
+   }
+
+   public ConnectionMetaData getMetaData() throws JMSException
+   {
+      checkClosed();
+
+      justCreated = false;
+
+      if (metaData == null)
+      {
+         metaData = new HornetQConnectionMetaData(thisVersion);
+      }
+
+      return metaData;
+   }
+
+   public ExceptionListener getExceptionListener() throws JMSException
+   {
+      checkClosed();
+
+      justCreated = false;
+
+      return exceptionListener;
+   }
+
+   public void setExceptionListener(final ExceptionListener listener) throws JMSException
+   {
+      checkClosed();
+
+      exceptionListener = listener;
+      justCreated = false;
+   }
+
+   public synchronized void start() throws JMSException
+   {
+      checkClosed();
+
+      for (HornetQSession session : sessions)
+      {
+         session.start();
+      }
+
+      justCreated = false;
+      started = true;
+   }
+
+   public synchronized void signalStopToAllSessions()
+   {
+      for (HornetQSession session : sessions)
+      {
+         ClientSession coreSession = session.getCoreSession();
+         if (coreSession instanceof ClientSessionInternal)
+         {
+            ClientSessionInternal internalSession = (ClientSessionInternal) coreSession;
+            internalSession.setStopSignal();
+         }
+      }
+
+   }
+
+   public synchronized void stop() throws JMSException
+   {
+      threadAwareContext.assertNotMessageListenerThread();
+
+      checkClosed();
+
+      for (HornetQSession session : sessions)
+      {
+         session.stop();
+      }
+
+      justCreated = false;
+      started = false;
+   }
+
+   public final synchronized void close() throws JMSException
+   {
+      threadAwareContext.assertNotCompletionListenerThread();
+      threadAwareContext.assertNotMessageListenerThread();
+
+      if (closed)
+      {
+         return;
+      }
+
+      sessionFactory.close();
+
+      try
+      {
+         for (HornetQSession session : new HashSet<HornetQSession>(sessions))
+         {
+            session.close();
+         }
+
+         try
+         {
+            if (!tempQueues.isEmpty())
+            {
+               // Remove any temporary queues
+
+               for (SimpleString queueName : tempQueues)
+               {
+                  if (!initialSession.isClosed())
+                  {
+                     try
+                     {
+                        initialSession.deleteQueue(queueName);
+                     }
+                     catch (HornetQException ignore)
+                     {
+                        // Exception on deleting queue shouldn't prevent close from completing
+                     }
+                  }
+               }
+            }
+         }
+         finally
+         {
+            if (initialSession != null)
+            {
+               initialSession.close();
+            }
+         }
+
+         closed = true;
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   public ConnectionConsumer
+   createConnectionConsumer(final Destination destination, final String messageSelector,
+                            final ServerSessionPool sessionPool, final int maxMessages) throws JMSException
+   {
+      checkClosed();
+
+      checkTempQueues(destination);
+
+      // We offer a RA, so no need to implement this for MDBs
+      return null;
+   }
+
+   private void checkTempQueues(Destination destination) throws JMSException
+   {
+      HornetQDestination jbdest = (HornetQDestination) destination;
+
+      if (jbdest.isTemporary() && !containsTemporaryQueue(jbdest.getSimpleAddress()))
+      {
+         throw new JMSException("Can not create consumer for temporary destination " + destination +
+                                   " from another JMS connection");
+      }
+   }
+
+   public ConnectionConsumer
+   createDurableConnectionConsumer(final Topic topic, final String subscriptionName,
+                                   final String messageSelector, final ServerSessionPool sessionPool,
+                                   final int maxMessages) throws JMSException
+   {
+      checkClosed();
+      // As spec. section 4.11
+      if (connectionType == HornetQConnection.TYPE_QUEUE_CONNECTION)
+      {
+         String msg = "Cannot create a durable connection consumer on a QueueConnection";
+         throw new javax.jms.IllegalStateException(msg);
+      }
+      checkTempQueues(topic);
+      // We offer RA, so no need for this
+      return null;
+   }
+
+   @Override
+   public Session createSession(int sessionMode) throws JMSException
+   {
+      checkClosed();
+      return createSessionInternal(false, sessionMode == Session.SESSION_TRANSACTED, sessionMode, HornetQSession.TYPE_GENERIC_SESSION);
+
+   }
+
+   @Override
+   public Session createSession() throws JMSException
+   {
+      checkClosed();
+      return createSessionInternal(false, false, Session.AUTO_ACKNOWLEDGE, HornetQSession.TYPE_GENERIC_SESSION);
+   }
+
+   // QueueConnection implementation ---------------------------------------------------------------
+
+   public QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException
+   {
+      checkClosed();
+      return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), HornetQSession.TYPE_QUEUE_SESSION);
+   }
+
+   /**
+    * I'm keeping this as static as the same check will be done within RA.
+    * This is to conform with TCK Tests where we must return ackMode exactly as they want if transacted=false
+    */
+   public static int checkAck(boolean transacted, int acknowledgeMode)
+   {
+      if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED)
+      {
+         return Session.AUTO_ACKNOWLEDGE;
+      }
+
+      return acknowledgeMode;
+   }
+
+   public ConnectionConsumer
+   createConnectionConsumer(final Queue queue, final String messageSelector,
+                            final ServerSessionPool sessionPool, final int maxMessages) throws JMSException
+   {
+      checkClosed();
+      checkTempQueues(queue);
+      return null;
+   }
+
+   // TopicConnection implementation ---------------------------------------------------------------
+
+   public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+   {
+      checkClosed();
+      return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), HornetQSession.TYPE_TOPIC_SESSION);
+   }
+
+   public ConnectionConsumer
+   createConnectionConsumer(final Topic topic, final String messageSelector,
+                            final ServerSessionPool sessionPool, final int maxMessages) throws JMSException
+   {
+      checkClosed();
+      checkTempQueues(topic);
+      return null;
+   }
+
+   @Override
+   public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException
+   {
+      return null; // we offer RA
+   }
+
+   @Override
+   public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException
+   {
+      return null; // we offer RA
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   /**
+    * Sets a FailureListener for the session which is notified if a failure occurs on the session.
+    *
+    * @param listener the listener to add
+    * @throws JMSException
+    */
+   public void setFailoverListener(final FailoverEventListener listener) throws JMSException
+   {
+      checkClosed();
+
+      justCreated = false;
+
+      this.failoverEventListener = listener;
+
+   }
+
+   /**
+    * @return {@link FailoverEventListener} the current failover event listener for this connection
+    * @throws JMSException
+    */
+   public FailoverEventListener getFailoverListener() throws JMSException
+   {
+      checkClosed();
+
+      justCreated = false;
+
+      return failoverEventListener;
+   }
+
+   public void addTemporaryQueue(final SimpleString queueAddress)
+   {
+      tempQueues.add(queueAddress);
+      knownDestinations.add(queueAddress);
+   }
+
+   public void removeTemporaryQueue(final SimpleString queueAddress)
+   {
+      tempQueues.remove(queueAddress);
+   }
+
+   public void addKnownDestination(final SimpleString address)
+   {
+      knownDestinations.add(address);
+   }
+
+   public boolean containsKnownDestination(final SimpleString address)
+   {
+      return knownDestinations.contains(address);
+   }
+
+   public boolean containsTemporaryQueue(final SimpleString queueAddress)
+   {
+      return tempQueues.contains(queueAddress);
+   }
+
+   public boolean hasNoLocal()
+   {
+      return hasNoLocal;
+   }
+
+   public void setHasNoLocal()
+   {
+      hasNoLocal = true;
+   }
+
+   public SimpleString getUID()
+   {
+      return uid;
+   }
+
+   public void removeSession(final HornetQSession session)
+   {
+      sessions.remove(session);
+   }
+
+   public ClientSession getInitialSession()
+   {
+      return initialSession;
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // In case the user forgets to close the connection manually
+
+   @Override
+   protected final void finalize() throws Throwable
+   {
+      if (!closed)
+      {
+         HornetQJMSClientLogger.LOGGER.connectionLeftOpen(creationStack);
+
+         close();
+      }
+   }
+
+   protected boolean isXA()
+   {
+      return false;
+   }
+
+   protected final HornetQSession
+   createSessionInternal(final boolean isXA, final boolean transacted, int acknowledgeMode, final int type) throws JMSException
+   {
+      if (transacted)
+      {
+         acknowledgeMode = Session.SESSION_TRANSACTED;
+      }
+
+      try
+      {
+         ClientSession session;
+
+         if (acknowledgeMode == Session.SESSION_TRANSACTED)
+         {
+            session =
+               sessionFactory.createSession(username, password, isXA, false, false,
+                                            sessionFactory.getServerLocator().isPreAcknowledge(),
+                                            transactionBatchSize);
+         }
+         else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE)
+         {
+            session =
+               sessionFactory.createSession(username, password, isXA, true, true,
+                                            sessionFactory.getServerLocator().isPreAcknowledge(), 0);
+         }
+         else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
+         {
+            session =
+               sessionFactory.createSession(username, password, isXA, true, true,
+                                            sessionFactory.getServerLocator().isPreAcknowledge(), dupsOKBatchSize);
+         }
+         else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
+         {
+            session =
+               sessionFactory.createSession(username, password, isXA, true, false,
+                                            sessionFactory.getServerLocator().isPreAcknowledge(),
+                                            transactionBatchSize);
+         }
+         else if (acknowledgeMode == HornetQJMSConstants.INDIVIDUAL_ACKNOWLEDGE)
+         {
+            session =
+               sessionFactory.createSession(username, password, isXA, true, false, false, transactionBatchSize);
+         }
+         else if (acknowledgeMode == HornetQJMSConstants.PRE_ACKNOWLEDGE)
+         {
+            session = sessionFactory.createSession(username, password, isXA, true, false, true, transactionBatchSize);
+         }
+         else
+         {
+            throw new JMSRuntimeException("Invalid ackmode: " + acknowledgeMode);
+         }
+
+         justCreated = false;
+
+         // Setting multiple times on different sessions doesn't matter since RemotingConnection
+         // maintains
+         // a set (no duplicates)
+         session.addFailureListener(listener);
+         session.addFailoverListener(failoverListener);
+
+         HornetQSession jbs = createHQSession(isXA, transacted, acknowledgeMode, session, type);
+
+         sessions.add(jbs);
+
+         if (started)
+         {
+            session.start();
+         }
+
+         this.addSessionMetaData(session);
+
+         return jbs;
+      }
+      catch (HornetQException e)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(e);
+      }
+   }
+
+   // Private --------------------------------------------------------------------------------------
+
+   /**
+    * @param transacted
+    * @param acknowledgeMode
+    * @param session
+    * @param type
+    * @return
+    */
+   protected HornetQSession createHQSession(boolean isXA, boolean transacted, int acknowledgeMode, ClientSession session, int type)
+   {
+      if (isXA)
+      {
+         return new HornetQXASession(this, transacted, true, acknowledgeMode, session, type);
+      }
+      else
+      {
+         return new HornetQSession(this, transacted, false, acknowledgeMode, session, type);
+      }
+   }
+
+   protected final void checkClosed() throws JMSException
+   {
+      if (closed)
+      {
+         throw new IllegalStateException("Connection is closed");
+      }
+   }
+
+   public void authorize() throws JMSException
+   {
+      try
+      {
+         initialSession = sessionFactory.createSession(username, password, false, false, false, false, 0);
+
+         addSessionMetaData(initialSession);
+
+         initialSession.addFailureListener(listener);
+         initialSession.addFailoverListener(failoverListener);
+      }
+      catch (HornetQException me)
+      {
+         throw JMSExceptionHelper.convertFromHornetQException(me);
+      }
+   }
+
+   private void addSessionMetaData(ClientSession session) throws HornetQException
+   {
+      session.addMetaData("jms-session", "");
+      if (clientID != null)
+      {
+         session.addMetaData("jms-client-id", clientID);
+      }
+   }
+
+   public void setReference(HornetQConnectionFactory factory)
+   {
+      this.factoryReference = factory;
+   }
+
+   public boolean isStarted()
+   {
+      return started;
+   }
+
+
+   // Inner classes --------------------------------------------------------------------------------
+
+   private static class JMSFailureListener implements SessionFailureListener
+   {
+      private final WeakReference<HornetQConnection> connectionRef;
+
+      JMSFailureListener(final HornetQConnection connection)
+      {
+         connectionRef = new WeakReference<HornetQConnection>(connection);
+      }
+
+      @Override
+      public synchronized void connectionFailed(final HornetQException me, boolean failedOver)
+      {
+         if (me == null)
+         {
+            return;
+         }
+
+         HornetQConnection conn = connectionRef.get();
+
+         if (conn != null)
+         {
+            try
+            {
+               final ExceptionListener exceptionListener = conn.getExceptionListener();
+
+               if (exceptionListener != null)
+               {
+                  final JMSException je =
+                     new JMSException(me.toString(), failedOver ? EXCEPTION_FAILOVER : EXCEPTION_DISCONNECT);
+
+                  je.initCause(me);
+
+                  new Thread(new Runnable()
+                  {
+                     public void run()
+                     {
+                        exceptionListener.onException(je);
+                     }
+                  }).start();
+               }
+            }
+            catch (JMSException e)
+            {
+               if (!conn.closed)
+               {
+                  HornetQJMSClientLogger.LOGGER.errorCallingExcListener(e);
+               }
+            }
+         }
+      }
+
+      @Override
+      public void connectionFailed(final HornetQException me, boolean failedOver, String scaleDownTargetNodeID)
+      {
+         connectionFailed(me, failedOver);
+      }
+
+      public void beforeReconnect(final HornetQException me)
+      {
+
+      }
+
+   }
+
+   private static class FailoverEventListenerImpl implements FailoverEventListener
+   {
+      private final WeakReference<HornetQConnection> connectionRef;
+
+      FailoverEventListenerImpl(final HornetQConnection connection)
+      {
+         connectionRef = new WeakReference<HornetQConnection>(connection);
+      }
+
+      @Override
+      public void failoverEvent(final FailoverEventType eventType)
+      {
+         HornetQConnection conn = connectionRef.get();
+
+         if (conn != null)
+         {
+            try
+            {
+               final FailoverEventListener failoverListener = conn.getFailoverListener();
+
+               if (failoverListener != null)
+               {
+
+                  new Thread(new Runnable()
+                  {
+                     public void run()
+                     {
+                        failoverListener.failoverEvent(eventType);
+                     }
+                  }).start();
+               }
+            }
+            catch (JMSException e)
+            {
+               if (!conn.closed)
+               {
+                  HornetQJMSClientLogger.LOGGER.errorCallingFailoverListener(e);
+               }
+            }
+         }
+
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionFactory.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionFactory.java
new file mode 100644
index 0000000..9fe7aae
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionFactory.java
@@ -0,0 +1,821 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.JMSSecurityException;
+import javax.jms.JMSSecurityRuntimeException;
+import javax.jms.QueueConnection;
+import javax.jms.TopicConnection;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XAJMSContext;
+import javax.jms.XAQueueConnection;
+import javax.jms.XATopicConnection;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import java.io.Serializable;
+
+import org.apache.activemq6.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.client.ClientSessionFactory;
+import org.apache.activemq6.api.core.client.HornetQClient;
+import org.apache.activemq6.api.core.client.ServerLocator;
+import org.apache.activemq6.api.jms.JMSFactoryType;
+import org.apache.activemq6.jms.referenceable.ConnectionFactoryObjectFactory;
+import org.apache.activemq6.jms.referenceable.SerializableObjectRefAddr;
+
+/**
+ * HornetQ implementation of a JMS ConnectionFactory.
+ *
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public class HornetQConnectionFactory implements Serializable, Referenceable, ConnectionFactory, XAConnectionFactory
+{
+   private static final long serialVersionUID = -2810634789345348326L;
+
+   private final ServerLocator serverLocator;
+
+   private String clientID;
+
+   private int dupsOKBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
+
+   private int transactionBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
+
+   private boolean readOnly;
+
+   public HornetQConnectionFactory()
+   {
+      serverLocator = null;
+   }
+
+   public HornetQConnectionFactory(final ServerLocator serverLocator)
+   {
+      this.serverLocator = serverLocator;
+
+      serverLocator.disableFinalizeCheck();
+   }
+
+   public HornetQConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
+   {
+      if (ha)
+      {
+         serverLocator = HornetQClient.createServerLocatorWithHA(groupConfiguration);
+      }
+      else
+      {
+         serverLocator = HornetQClient.createServerLocatorWithoutHA(groupConfiguration);
+      }
+
+      serverLocator.disableFinalizeCheck();
+   }
+
+   public HornetQConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors)
+   {
+      if (ha)
+      {
+         serverLocator = HornetQClient.createServerLocatorWithHA(initialConnectors);
+      }
+      else
+      {
+         serverLocator = HornetQClient.createServerLocatorWithoutHA(initialConnectors);
+      }
+
+      serverLocator.disableFinalizeCheck();
+   }
+
+   // ConnectionFactory implementation -------------------------------------------------------------
+
+   public Connection createConnection() throws JMSException
+   {
+      return createConnection(null, null);
+   }
+
+   public Connection createConnection(final String username, final String password) throws JMSException
+   {
+      return createConnectionInternal(username, password, false, HornetQConnection.TYPE_GENERIC_CONNECTION);
+   }
+
+   @Override
+   public JMSContext createContext()
+   {
+      return createContext(null, null);
+   }
+
+   @Override
+   public JMSContext createContext(final int sessionMode)
+   {
+      return createContext(null, null, sessionMode);
+   }
+
+   @Override
+   public JMSContext createContext(final String userName, final String password)
+   {
+      return createContext(userName, password, JMSContext.AUTO_ACKNOWLEDGE);
+   }
+
+   @Override
+   public JMSContext createContext(String userName, String password, int sessionMode)
+   {
+      validateSessionMode(sessionMode);
+      try
+      {
+         HornetQConnection connection =
+            createConnectionInternal(userName, password, false, HornetQConnection.TYPE_GENERIC_CONNECTION);
+         return connection.createContext(sessionMode);
+      }
+      catch (JMSSecurityException e)
+      {
+         throw new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e);
+      }
+      catch (JMSException e)
+      {
+         throw JmsExceptionUtils.convertToRuntimeException(e);
+      }
+   }
+
+   /**
+    * @param mode
+    */
+   private static void validateSessionMode(int mode)
+   {
+      switch (mode)
+      {
+         case JMSContext.AUTO_ACKNOWLEDGE:
+         case JMSContext.CLIENT_ACKNOWLEDGE:
+         case JMSContext.DUPS_OK_ACKNOWLEDGE:
+         case JMSContext.SESSION_TRANSACTED:
+         {
+            return;
+         }
+         default:
+            throw new JMSRuntimeException("Invalid Session Mode: " + mode);
+      }
+   }
+
+   // QueueConnectionFactory implementation --------------------------------------------------------
+
+   public QueueConnection createQueueConnection() throws JMSException
+   {
+      return createQueueConnection(null, null);
+   }
+
+   public QueueConnection createQueueConnection(final String username, final String password) throws JMSException
+   {
+      return createConnectionInternal(username, password, false, HornetQConnection.TYPE_QUEUE_CONNECTION);
+   }
+
+   // TopicConnectionFactory implementation --------------------------------------------------------
+
+   public TopicConnection createTopicConnection() throws JMSException
+   {
+      return createTopicConnection(null, null);
+   }
+
+   public TopicConnection createTopicConnection(final String username, final String password) throws JMSException
+   {
+      return createConnectionInternal(username, password, false, HornetQConnection.TYPE_TOPIC_CONNECTION);
+   }
+
+   // XAConnectionFactory implementation -----------------------------------------------------------
+
+   public XAConnection createXAConnection() throws JMSException
+   {
+      return createXAConnection(null, null);
+   }
+
+   public XAConnection createXAConnection(final String username, final String password) throws JMSException
+   {
+      return (XAConnection) createConnectionInternal(username, password, true, HornetQConnection.TYPE_GENERIC_CONNECTION);
+   }
+
+   @Override
+   public XAJMSContext createXAContext()
+   {
+      return createXAContext(null, null);
+   }
+
+   @Override
+   public XAJMSContext createXAContext(String userName, String password)
+   {
+      try
+      {
+         HornetQConnection connection =
+            createConnectionInternal(userName, password, true, HornetQConnection.TYPE_GENERIC_CONNECTION);
+         return connection.createXAContext();
+      }
+      catch (JMSSecurityException e)
+      {
+         throw new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e);
+      }
+      catch (JMSException e)
+      {
+         throw JmsExceptionUtils.convertToRuntimeException(e);
+      }
+   }
+
+   // XAQueueConnectionFactory implementation ------------------------------------------------------
+
+   public XAQueueConnection createXAQueueConnection() throws JMSException
+   {
+      return createXAQueueConnection(null, null);
+   }
+
+   public XAQueueConnection createXAQueueConnection(final String username, final String password) throws JMSException
+   {
+      return (XAQueueConnection) createConnectionInternal(username, password, true, HornetQConnection.TYPE_QUEUE_CONNECTION);
+   }
+
+   // XATopicConnectionFactory implementation ------------------------------------------------------
+
+   public XATopicConnection createXATopicConnection() throws JMSException
+   {
+      return createXATopicConnection(null, null);
+   }
+
+   public XATopicConnection createXATopicConnection(final String username, final String password) throws JMSException
+   {
+      return (XATopicConnection) createConnectionInternal(username, password, true, HornetQConnection.TYPE_TOPIC_CONNECTION);
+   }
+
+   @Override
+   public Reference getReference() throws NamingException
+   {
+      return new Reference(this.getClass().getCanonicalName(),
+                           new SerializableObjectRefAddr("HornetQ-CF", this),
+                           ConnectionFactoryObjectFactory.class.getCanonicalName(),
+                           null);
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public boolean isHA()
+   {
+      return serverLocator.isHA();
+   }
+
+   public synchronized String getConnectionLoadBalancingPolicyClassName()
+   {
+      return serverLocator.getConnectionLoadBalancingPolicyClassName();
+   }
+
+   public synchronized void setConnectionLoadBalancingPolicyClassName(final String connectionLoadBalancingPolicyClassName)
+   {
+      checkWrite();
+      serverLocator.setConnectionLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName);
+   }
+
+   public synchronized TransportConfiguration[] getStaticConnectors()
+   {
+      return serverLocator.getStaticTransportConfigurations();
+   }
+
+   public synchronized DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
+   {
+      return serverLocator.getDiscoveryGroupConfiguration();
+   }
+
+   public synchronized String getClientID()
+   {
+      return clientID;
+   }
+
+   public synchronized void setClientID(final String clientID)
+   {
+      checkWrite();
+      this.clientID = clientID;
+   }
+
+   public synchronized int getDupsOKBatchSize()
+   {
+      return dupsOKBatchSize;
+   }
+
+   public synchronized void setDupsOKBatchSize(final int dupsOKBatchSize)
+   {
+      checkWrite();
+      this.dupsOKBatchSize = dupsOKBatchSize;
+   }
+
+   public synchronized int getTransactionBatchSize()
+   {
+      return transactionBatchSize;
+   }
+
+   public synchronized void setTransactionBatchSize(final int transactionBatchSize)
+   {
+      checkWrite();
+      this.transactionBatchSize = transactionBatchSize;
+   }
+
+   public synchronized long getClientFailureCheckPeriod()
+   {
+      return serverLocator.getClientFailureCheckPeriod();
+   }
+
+   public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
+   {
+      checkWrite();
+      serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+   }
+
+   public synchronized long getConnectionTTL()
+   {
+      return serverLocator.getConnectionTTL();
+   }
+
+   public synchronized void setConnectionTTL(final long connectionTTL)
+   {
+      checkWrite();
+      serverLocator.setConnectionTTL(connectionTTL);
+   }
+
+   public synchronized long getCallTimeout()
+   {
+      return serverLocator.getCallTimeout();
+   }
+
+   public synchronized void setCallTimeout(final long callTimeout)
+   {
+      checkWrite();
+      serverLocator.setCallTimeout(callTimeout);
+   }
+
+   public synchronized long getCallFailoverTimeout()
+   {
+      return serverLocator.getCallFailoverTimeout();
+   }
+
+   public synchronized void setCallFailoverTimeout(final long callTimeout)
+   {
+      checkWrite();
+      serverLocator.setCallFailoverTimeout(callTimeout);
+   }
+
+   public synchronized int getConsumerWindowSize()
+   {
+      return serverLocator.getConsumerWindowSize();
+   }
+
+   public synchronized void setConsumerWindowSize(final int consumerWindowSize)
+   {
+      checkWrite();
+      serverLocator.setConsumerWindowSize(consumerWindowSize);
+   }
+
+   public synchronized int getConsumerMaxRate()
+   {
+      return serverLocator.getConsumerMaxRate();
+   }
+
+   public synchronized void setConsumerMaxRate(final int consumerMaxRate)
+   {
+      checkWrite();
+      serverLocator.setConsumerMaxRate(consumerMaxRate);
+   }
+
+   public synchronized int getConfirmationWindowSize()
+   {
+      return serverLocator.getConfirmationWindowSize();
+   }
+
+   public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
+   {
+      checkWrite();
+      serverLocator.setConfirmationWindowSize(confirmationWindowSize);
+   }
+
+   public synchronized int getProducerMaxRate()
+   {
+      return serverLocator.getProducerMaxRate();
+   }
+
+   public synchronized void setProducerMaxRate(final int producerMaxRate)
+   {
+      checkWrite();
+      serverLocator.setProducerMaxRate(producerMaxRate);
+   }
+
+   public synchronized int getProducerWindowSize()
+   {
+      return serverLocator.getProducerWindowSize();
+   }
+
+   public synchronized void setProducerWindowSize(final int producerWindowSize)
+   {
+      checkWrite();
+      serverLocator.setProducerWindowSize(producerWindowSize);
+   }
+
+   /**
+    * @param cacheLargeMessagesClient
+    */
+   public synchronized void setCacheLargeMessagesClient(final boolean cacheLargeMessagesClient)
+   {
+      checkWrite();
+      serverLocator.setCacheLargeMessagesClient(cacheLargeMessagesClient);
+   }
+
+   public synchronized boolean isCacheLargeMessagesClient()
+   {
+      return serverLocator.isCacheLargeMessagesClient();
+   }
+
+   public synchronized int getMinLargeMessageSize()
+   {
+      return serverLocator.getMinLargeMessageSize();
+   }
+
+   public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
+   {
+      checkWrite();
+      serverLocator.setMinLargeMessageSize(minLargeMessageSize);
+   }
+
+   public synchronized boolean isBlockOnAcknowledge()
+   {
+      return serverLocator.isBlockOnAcknowledge();
+   }
+
+   public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+   {
+      checkWrite();
+      serverLocator.setBlockOnAcknowledge(blockOnAcknowledge);
+   }
+
+   public synchronized boolean isBlockOnNonDurableSend()
+   {
+      return serverLocator.isBlockOnNonDurableSend();
+   }
+
+   public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
+   {
+      checkWrite();
+      serverLocator.setBlockOnNonDurableSend(blockOnNonDurableSend);
+   }
+
+   public synchronized boolean isBlockOnDurableSend()
+   {
+      return serverLocator.isBlockOnDurableSend();
+   }
+
+   public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
+   {
+      checkWrite();
+      serverLocator.setBlockOnDurableSend(blockOnDurableSend);
+   }
+
+   public synchronized boolean isAutoGroup()
+   {
+      return serverLocator.isAutoGroup();
+   }
+
+   public synchronized void setAutoGroup(final boolean autoGroup)
+   {
+      checkWrite();
+      serverLocator.setAutoGroup(autoGroup);
+   }
+
+   public synchronized boolean isPreAcknowledge()
+   {
+      return serverLocator.isPreAcknowledge();
+   }
+
+   public synchronized void setPreAcknowledge(final boolean preAcknowledge)
+   {
+      checkWrite();
+      serverLocator.setPreAcknowledge(preAcknowledge);
+   }
+
+   public synchronized long getRetryInterval()
+   {
+      return serverLocator.getRetryInterval();
+   }
+
+   public synchronized void setRetryInterval(final long retryInterval)
+   {
+      checkWrite();
+      serverLocator.setRetryInterval(retryInterval);
+   }
+
+   public synchronized long getMaxRetryInterval()
+   {
+      return serverLocator.getMaxRetryInterval();
+   }
+
+   public synchronized void setMaxRetryInterval(final long retryInterval)
+   {
+      checkWrite();
+      serverLocator.setMaxRetryInterval(retryInterval);
+   }
+
+   public synchronized double getRetryIntervalMultiplier()
+   {
+      return serverLocator.getRetryIntervalMultiplier();
+   }
+
+   public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
+   {
+      checkWrite();
+      serverLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
+   }
+
+   public synchronized int getReconnectAttempts()
+   {
+      return serverLocator.getReconnectAttempts();
+   }
+
+   public synchronized void setReconnectAttempts(final int reconnectAttempts)
+   {
+      checkWrite();
+      serverLocator.setReconnectAttempts(reconnectAttempts);
+   }
+
+   public synchronized void setInitialConnectAttempts(final int reconnectAttempts)
+   {
+      checkWrite();
+      serverLocator.setInitialConnectAttempts(reconnectAttempts);
+   }
+
+   public synchronized int getInitialConnectAttempts()
+   {
+      checkWrite();
+      return serverLocator.getInitialConnectAttempts();
+   }
+
+   public synchronized boolean isFailoverOnInitialConnection()
+   {
+      return serverLocator.isFailoverOnInitialConnection();
+   }
+
+   public synchronized void setFailoverOnInitialConnection(final boolean failover)
+   {
+      checkWrite();
+      serverLocator.setFailoverOnInitialConnection(failover);
+   }
+
+   public synchronized boolean isUseGlobalPools()
+   {
+      return serverLocator.isUseGlobalPools();
+   }
+
+   public synchronized void setUseGlobalPools(final boolean useGlobalPools)
+   {
+      checkWrite();
+      serverLocator.setUseGlobalPools(useGlobalPools);
+   }
+
+   public synchronized int getScheduledThreadPoolMaxSize()
+   {
+      return serverLocator.getScheduledThreadPoolMaxSize();
+   }
+
+   public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
+   {
+      checkWrite();
+      serverLocator.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
+   }
+
+   public synchronized int getThreadPoolMaxSize()
+   {
+      return serverLocator.getThreadPoolMaxSize();
+   }
+
+   public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
+   {
+      checkWrite();
+      serverLocator.setThreadPoolMaxSize(threadPoolMaxSize);
+   }
+
+   public synchronized int getInitialMessagePacketSize()
+   {
+      return serverLocator.getInitialMessagePacketSize();
+   }
+
+   public synchronized void setInitialMessagePacketSize(final int size)
+   {
+      checkWrite();
+      serverLocator.setInitialMessagePacketSize(size);
+   }
+
+   public void setGroupID(final String groupID)
+   {
+      serverLocator.setGroupID(groupID);
+   }
+
+   public String getGroupID()
+   {
+      return serverLocator.getGroupID();
+   }
+
+   public boolean isCompressLargeMessage()
+   {
+      return serverLocator.isCompressLargeMessage();
+   }
+
+   public void setCompressLargeMessage(boolean avoidLargeMessages)
+   {
+      serverLocator.setCompressLargeMessage(avoidLargeMessages);
+   }
+
+   public void close()
+   {
+      ServerLocator locator0 = serverLocator;
+      if (locator0 != null)
+         locator0.close();
+   }
+
+   public ServerLocator getServerLocator()
+   {
+      return serverLocator;
+   }
+
+   public int getFactoryType()
+   {
+      return JMSFactoryType.CF.intValue();
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   protected synchronized HornetQConnection createConnectionInternal(final String username,
+                                                                     final String password,
+                                                                     final boolean isXA,
+                                                                     final int type) throws JMSException
+   {
+      readOnly = true;
+
+      ClientSessionFactory factory;
+
+      try
+      {
+         factory = serverLocator.createSessionFactory();
+      }
+      catch (Exception e)
+      {
+         JMSException jmse = new JMSException("Failed to create session factory");
+
+         jmse.initCause(e);
+         jmse.setLinkedException(e);
+
+         throw jmse;
+      }
+
+      HornetQConnection connection = null;
+
+      if (isXA)
+      {
+         if (type == HornetQConnection.TYPE_GENERIC_CONNECTION)
+         {
+            connection = new HornetQXAConnection(username,
+                                                 password,
+                                                 type,
+                                                 clientID,
+                                                 dupsOKBatchSize,
+                                                 transactionBatchSize,
+                                                 factory);
+         }
+         else if (type == HornetQConnection.TYPE_QUEUE_CONNECTION)
+         {
+            connection =
+               new HornetQXAConnection(username,
+                                       password,
+                                       type,
+                                       clientID,
+                                       dupsOKBatchSize,
+                                       transactionBatchSize,
+                                       factory);
+         }
+         else if (type == HornetQConnection.TYPE_TOPIC_CONNECTION)
+         {
+            connection =
+               new HornetQXAConnection(username,
+                                       password,
+                                       type,
+                                       clientID,
+                                       dupsOKBatchSize,
+                                       transactionBatchSize,
+                                       factory);
+         }
+      }
+      else
+      {
+         if (type == HornetQConnection.TYPE_GENERIC_CONNECTION)
+         {
+            connection = new HornetQConnection(username,
+                                               password,
+                                               type,
+                                               clientID,
+                                               dupsOKBatchSize,
+                                               transactionBatchSize,
+                                               factory);
+         }
+         else if (type == HornetQConnection.TYPE_QUEUE_CONNECTION)
+         {
+            connection =
+               new HornetQConnection(username,
+                                     password,
+                                     type,
+                                     clientID,
+                                     dupsOKBatchSize,
+                                     transactionBatchSize,
+                                     factory);
+         }
+         else if (type == HornetQConnection.TYPE_TOPIC_CONNECTION)
+         {
+            connection =
+               new HornetQConnection(username,
+                                     password,
+                                     type,
+                                     clientID,
+                                     dupsOKBatchSize,
+                                     transactionBatchSize,
+                                     factory);
+         }
+      }
+
+      if (connection == null)
+      {
+         throw new JMSException("Failed to create connection: invalid type " + type);
+      }
+      connection.setReference(this);
+
+      try
+      {
+         connection.authorize();
+      }
+      catch (JMSException e)
+      {
+         try
+         {
+            connection.close();
+         }
+         catch (JMSException me)
+         {
+         }
+         throw e;
+      }
+
+      return connection;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "HornetQConnectionFactory [serverLocator=" + serverLocator +
+         ", clientID=" +
+         clientID +
+         ", consumerWindowSize = " +
+         getConsumerWindowSize() +
+         ", dupsOKBatchSize=" +
+         dupsOKBatchSize +
+         ", transactionBatchSize=" +
+         transactionBatchSize +
+         ", readOnly=" +
+         readOnly +
+         "]";
+   }
+
+
+   // Private --------------------------------------------------------------------------------------
+
+   private void checkWrite()
+   {
+      if (readOnly)
+      {
+         throw new IllegalStateException("Cannot set attribute on HornetQConnectionFactory after it has been used");
+      }
+   }
+
+   @Override
+   protected void finalize() throws Throwable
+   {
+      try
+      {
+         serverLocator.close();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         //not much we can do here
+      }
+      super.finalize();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContext.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContext.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContext.java
new file mode 100644
index 0000000..80528b4
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContext.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.JMSContext;
+import javax.jms.XAJMSContext;
+
+/**
+ * Interface created to support reference counting all contexts using it.
+ * <p>
+ * Necessary to support {@code JMSContext.close()} conditions.
+ * @see JMSContext
+ */
+public interface HornetQConnectionForContext extends javax.jms.Connection
+{
+   JMSContext createContext(int sessionMode);
+
+   XAJMSContext createXAContext();
+
+   void closeFromContext();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContextImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContextImpl.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContextImpl.java
new file mode 100644
index 0000000..b77d08d
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContextImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.activemq6.jms.client;
+
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Session;
+import javax.jms.XAJMSContext;
+
+import org.apache.activemq6.api.jms.HornetQJMSConstants;
+import org.apache.activemq6.utils.ReferenceCounter;
+import org.apache.activemq6.utils.ReferenceCounterUtil;
+
+public abstract class HornetQConnectionForContextImpl implements HornetQConnectionForContext
+{
+
+   final Runnable closeRunnable = new Runnable()
+   {
+      public void run()
+      {
+         try
+         {
+            close();
+         }
+         catch (JMSException e)
+         {
+            throw JmsExceptionUtils.convertToRuntimeException(e);
+         }
+      }
+   };
+
+   final ReferenceCounter refCounter = new ReferenceCounterUtil(closeRunnable);
+
+   protected final ThreadAwareContext threadAwareContext = new ThreadAwareContext();
+
+   public JMSContext createContext(int sessionMode)
+   {
+      switch (sessionMode)
+      {
+         case Session.AUTO_ACKNOWLEDGE:
+         case Session.CLIENT_ACKNOWLEDGE:
+         case Session.DUPS_OK_ACKNOWLEDGE:
+         case Session.SESSION_TRANSACTED:
+         case HornetQJMSConstants.INDIVIDUAL_ACKNOWLEDGE:
+         case HornetQJMSConstants.PRE_ACKNOWLEDGE:
+            break;
+         default:
+            throw new JMSRuntimeException("Invalid ackmode: " + sessionMode);
+      }
+      refCounter.increment();
+
+      return new HornetQJMSContext(this, sessionMode, threadAwareContext);
+   }
+
+   public XAJMSContext createXAContext()
+   {
+      refCounter.increment();
+
+      return new HornetQXAJMSContext(this, threadAwareContext);
+   }
+
+   @Override
+   public void closeFromContext()
+   {
+      refCounter.decrement();
+   }
+
+   protected void incrementRefCounter()
+   {
+      refCounter.increment();
+   }
+
+   public ThreadAwareContext getThreadAwareContext()
+   {
+      return threadAwareContext;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionMetaData.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionMetaData.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionMetaData.java
new file mode 100644
index 0000000..453cb9e
--- /dev/null
+++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionMetaData.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.client;
+
+import java.util.Enumeration;
+import java.util.Vector;
+
+import javax.jms.ConnectionMetaData;
+import javax.jms.JMSException;
+
+import org.apache.activemq6.core.version.Version;
+
+/**
+ * HornetQ implementation of a JMS ConnectionMetaData.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ *
+ */
+public class HornetQConnectionMetaData implements ConnectionMetaData
+{
+   // Constants -----------------------------------------------------
+
+   private static final String HORNETQ = "HornetQ";
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final Version serverVersion;
+
+   // Constructors --------------------------------------------------
+
+   /**
+    * Create a new HornetQConnectionMetaData object.
+    */
+   public HornetQConnectionMetaData(final Version serverVersion)
+   {
+      this.serverVersion = serverVersion;
+   }
+
+   // ConnectionMetaData implementation -----------------------------
+
+   public String getJMSVersion() throws JMSException
+   {
+      return "2.0";
+   }
+
+   public int getJMSMajorVersion() throws JMSException
+   {
+      return 2;
+   }
+
+   public int getJMSMinorVersion() throws JMSException
+   {
+      return 0;
+   }
+
+   public String getJMSProviderName() throws JMSException
+   {
+      return HornetQConnectionMetaData.HORNETQ;
+   }
+
+   public String getProviderVersion() throws JMSException
+   {
+      return serverVersion.getFullVersion();
+   }
+
+   public int getProviderMajorVersion() throws JMSException
+   {
+      return serverVersion.getMajorVersion();
+   }
+
+   public int getProviderMinorVersion() throws JMSException
+   {
+      return serverVersion.getMinorVersion();
+   }
+
+   public Enumeration getJMSXPropertyNames() throws JMSException
+   {
+      Vector<Object> v = new Vector<Object>();
+      v.add("JMSXGroupID");
+      v.add("JMSXGroupSeq");
+      v.add("JMSXDeliveryCount");
+      return v.elements();
+   }
+}


Mime
View raw message