activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [34/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:42:04 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/ChannelImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/ChannelImpl.java
new file mode 100644
index 0000000..b186fe9
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/ChannelImpl.java
@@ -0,0 +1,705 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQInterruptedException;
+import org.apache.activemq6.api.core.Interceptor;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+import org.apache.activemq6.core.protocol.core.Channel;
+import org.apache.activemq6.core.protocol.core.ChannelHandler;
+import org.apache.activemq6.core.protocol.core.CommandConfirmationHandler;
+import org.apache.activemq6.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq6.core.protocol.core.Packet;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+
+/**
+ * A ChannelImpl
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public final class ChannelImpl implements Channel
+{
+   public enum CHANNEL_ID
+   {
+      /**
+       * Used for core protocol management.
+       */
+      PING(0),
+      /**
+       * Session creation and attachment.
+       */
+      SESSION(1),
+      /**
+       * Replication, i.e. for backups that do not share the journal.
+       */
+      REPLICATION(2),
+      /**
+       * cluster used for controlling nodes in a cluster remotely
+       */
+      CLUSTER(3),
+      /**
+       * Channels [0-9] are reserved for the system, user channels must be greater than that.
+       */
+      USER(10);
+
+      public final long id;
+
+      CHANNEL_ID(long id)
+      {
+         this.id = id;
+      }
+
+      protected static String idToString(long code)
+      {
+         for (CHANNEL_ID channel : EnumSet.allOf(CHANNEL_ID.class))
+         {
+            if (channel.id == code) return channel.toString();
+         }
+         return Long.toString(code);
+      }
+   }
+
+   private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled();
+
+   private volatile long id;
+
+   private ChannelHandler handler;
+
+   private Packet response;
+
+   private final java.util.Queue<Packet> resendCache;
+
+   private volatile int firstStoredCommandID;
+
+   private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1);
+
+   private volatile CoreRemotingConnection connection;
+
+   private volatile boolean closed;
+
+   private final Lock lock = new ReentrantLock();
+
+   private final Condition sendCondition = lock.newCondition();
+
+   private final Condition failoverCondition = lock.newCondition();
+
+   private final Object sendLock = new Object();
+
+   private final Object sendBlockingLock = new Object();
+
+   private boolean failingOver;
+
+   private final int confWindowSize;
+
+   private int receivedBytes;
+
+   private CommandConfirmationHandler commandConfirmationHandler;
+
+   private volatile boolean transferring;
+
+   private final List<Interceptor> interceptors;
+
+   public ChannelImpl(final CoreRemotingConnection connection, final long id, final int confWindowSize, final List<Interceptor> interceptors)
+   {
+      this.connection = connection;
+
+      this.id = id;
+
+      this.confWindowSize = confWindowSize;
+
+      if (confWindowSize != -1)
+      {
+         resendCache = new ConcurrentLinkedQueue<Packet>();
+      }
+      else
+      {
+         resendCache = null;
+      }
+
+      this.interceptors = interceptors;
+   }
+
+   public boolean supports(final byte packetType)
+   {
+      int version = connection.getClientVersion();
+
+      switch (packetType)
+      {
+         case PacketImpl.CLUSTER_TOPOLOGY_V2:
+            return version >= 122;
+         case PacketImpl.DISCONNECT_CONSUMER:
+            return version >= 124;
+         case PacketImpl.CLUSTER_TOPOLOGY_V3:
+            return version >= 125;
+         case PacketImpl.DISCONNECT_V2:
+            return version >= 125;
+         default:
+            return true;
+      }
+   }
+
+   public long getID()
+   {
+      return id;
+   }
+
+   public int getLastConfirmedCommandID()
+   {
+      return lastConfirmedCommandID.get();
+   }
+
+   public Lock getLock()
+   {
+      return lock;
+   }
+
+   public int getConfirmationWindowSize()
+   {
+      return confWindowSize;
+   }
+
+   public void returnBlocking()
+   {
+      returnBlocking(null);
+   }
+
+   public void returnBlocking(Throwable cause)
+   {
+      lock.lock();
+
+      try
+      {
+         response = new HornetQExceptionMessage(HornetQClientMessageBundle.BUNDLE.unblockingACall(cause));
+
+         sendCondition.signal();
+      }
+      finally
+      {
+         lock.unlock();
+      }
+   }
+
+   public boolean sendAndFlush(final Packet packet)
+   {
+      return send(packet, true, false);
+   }
+
+   public boolean send(final Packet packet)
+   {
+      return send(packet, false, false);
+   }
+
+   public boolean sendBatched(final Packet packet)
+   {
+      return send(packet, false, true);
+   }
+
+   public void setTransferring(boolean transferring)
+   {
+      this.transferring = transferring;
+   }
+
+   // This must never called by more than one thread concurrently
+   public boolean send(final Packet packet, final boolean flush, final boolean batch)
+   {
+      if (invokeInterceptors(packet, interceptors, connection) != null)
+      {
+         return false;
+      }
+
+      synchronized (sendLock)
+      {
+         packet.setChannelID(id);
+
+         if (isTrace)
+         {
+            HornetQClientLogger.LOGGER.trace("Sending packet nonblocking " + packet + " on channeID=" + id);
+         }
+
+         HornetQBuffer buffer = packet.encode(connection);
+
+         lock.lock();
+
+         try
+         {
+            if (failingOver)
+            {
+               // TODO - don't hardcode this timeout
+               try
+               {
+                  failoverCondition.await(10000, TimeUnit.MILLISECONDS);
+               }
+               catch (InterruptedException e)
+               {
+                  throw new HornetQInterruptedException(e);
+               }
+            }
+
+            // Sanity check
+            if (transferring)
+            {
+               throw new IllegalStateException("Cannot send a packet while channel is doing failover");
+            }
+
+            if (resendCache != null && packet.isRequiresConfirmations())
+            {
+               resendCache.add(packet);
+            }
+         }
+         finally
+         {
+            lock.unlock();
+         }
+
+         if (isTrace)
+         {
+            HornetQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id);
+         }
+
+
+         // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
+         // buffer is full, preventing any incoming buffers being handled and blocking failover
+         connection.getTransportConnection().write(buffer, flush, batch);
+
+         return true;
+      }
+   }
+
+   /**
+    * Due to networking issues or server issues the server may take longer to answer than expected.. the client may timeout the call throwing an exception
+    * and the client could eventually retry another call, but the server could then answer a previous command issuing a class-cast-exception.
+    * The expectedPacket will be used to filter out undesirable packets that would belong to previous calls.
+    */
+   public Packet sendBlocking(final Packet packet, byte expectedPacket) throws HornetQException
+   {
+      String interceptionResult = invokeInterceptors(packet, interceptors, connection);
+
+      if (interceptionResult != null)
+      {
+         // if we don't throw an exception here the client might not unblock
+         throw HornetQClientMessageBundle.BUNDLE.interceptorRejectedPacket(interceptionResult);
+      }
+
+      if (closed)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.connectionDestroyed();
+      }
+
+      if (connection.getBlockingCallTimeout() == -1)
+      {
+         throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
+      }
+
+      // Synchronized since can't be called concurrently by more than one thread and this can occur
+      // E.g. blocking acknowledge() from inside a message handler at some time as other operation on main thread
+      synchronized (sendBlockingLock)
+      {
+         packet.setChannelID(id);
+
+         final HornetQBuffer buffer = packet.encode(connection);
+
+         lock.lock();
+
+         try
+         {
+            if (failingOver)
+            {
+               try
+               {
+                  if (connection.getBlockingCallFailoverTimeout() < 0)
+                  {
+                     while (failingOver)
+                     {
+                        failoverCondition.await();
+                     }
+                  }
+                  else
+                  {
+                     if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS))
+                     {
+                        HornetQClientLogger.LOGGER.debug("timed-out waiting for failover condition");
+                     }
+                  }
+               }
+               catch (InterruptedException e)
+               {
+                  throw new HornetQInterruptedException(e);
+               }
+            }
+
+            response = null;
+
+            if (resendCache != null && packet.isRequiresConfirmations())
+            {
+               resendCache.add(packet);
+            }
+
+            connection.getTransportConnection().write(buffer, false, false);
+
+            long toWait = connection.getBlockingCallTimeout();
+
+            long start = System.currentTimeMillis();
+
+            while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION &&
+               response.getType() != expectedPacket)) && toWait > 0)
+            {
+               try
+               {
+                  sendCondition.await(toWait, TimeUnit.MILLISECONDS);
+               }
+               catch (InterruptedException e)
+               {
+                  throw new HornetQInterruptedException(e);
+               }
+
+               if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket)
+               {
+                  HornetQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
+               }
+
+               if (closed)
+               {
+                  break;
+               }
+
+               final long now = System.currentTimeMillis();
+
+               toWait -= now - start;
+
+               start = now;
+            }
+
+            if (response == null)
+            {
+               throw HornetQClientMessageBundle.BUNDLE.timedOutSendingPacket(packet.getType());
+            }
+
+            if (response.getType() == PacketImpl.EXCEPTION)
+            {
+               final HornetQExceptionMessage mem = (HornetQExceptionMessage) response;
+
+               HornetQException e = mem.getException();
+
+               e.fillInStackTrace();
+
+               throw e;
+            }
+         }
+         finally
+         {
+            lock.unlock();
+         }
+
+         return response;
+      }
+   }
+
+   /**
+    * @param packet the packet to intercept
+    * @return the name of the interceptor that returned <code>false</code> or <code>null</code> if no interceptors
+    * returned <code>false</code>.
+    */
+   public static String invokeInterceptors(final Packet packet, final List<Interceptor> interceptors, final RemotingConnection connection)
+   {
+      if (interceptors != null)
+      {
+         for (final Interceptor interceptor : interceptors)
+         {
+            try
+            {
+               boolean callNext = interceptor.intercept(packet, connection);
+
+               if (HornetQClientLogger.LOGGER.isDebugEnabled())
+               {
+                  // use a StringBuilder for speed since this may be executed a lot
+                  StringBuilder msg = new StringBuilder();
+                  msg.append("Invocation of interceptor ").append(interceptor.getClass().getName()).append(" on ").
+                     append(packet).append(" returned ").append(callNext);
+                  HornetQClientLogger.LOGGER.debug(msg.toString());
+               }
+
+               if (!callNext)
+               {
+                  return interceptor.getClass().getName();
+               }
+            }
+            catch (final Throwable e)
+            {
+               HornetQClientLogger.LOGGER.errorCallingInterceptor(e, interceptor);
+            }
+         }
+      }
+
+      return null;
+   }
+
+   public void setCommandConfirmationHandler(final CommandConfirmationHandler handler)
+   {
+      if (confWindowSize < 0)
+      {
+         final String msg =
+            "You can't set confirmationHandler on a connection with confirmation-window-size < 0."
+               + " Look at the documentation for more information.";
+         throw new IllegalStateException(msg);
+      }
+      commandConfirmationHandler = handler;
+   }
+
+   public void setHandler(final ChannelHandler handler)
+   {
+      this.handler = handler;
+   }
+
+   public ChannelHandler getHandler()
+   {
+      return handler;
+   }
+
+   public void close()
+   {
+      if (closed)
+      {
+         return;
+      }
+
+      if (!connection.isDestroyed() && !connection.removeChannel(id))
+      {
+         throw HornetQClientMessageBundle.BUNDLE.noChannelToClose(id);
+      }
+
+      if (failingOver)
+      {
+         unlock();
+      }
+      closed = true;
+   }
+
+   public void transferConnection(final CoreRemotingConnection newConnection)
+   {
+      // Needs to synchronize on the connection to make sure no packets from
+      // the old connection get processed after transfer has occurred
+      synchronized (connection.getTransferLock())
+      {
+         connection.removeChannel(id);
+
+         // And switch it
+
+         final CoreRemotingConnection rnewConnection = newConnection;
+
+         rnewConnection.putChannel(id, this);
+
+         connection = rnewConnection;
+
+         transferring = true;
+      }
+   }
+
+   public void replayCommands(final int otherLastConfirmedCommandID)
+   {
+      if (resendCache != null)
+      {
+         if (isTrace)
+         {
+            HornetQClientLogger.LOGGER.trace("Replaying commands on channelID=" + id);
+         }
+         clearUpTo(otherLastConfirmedCommandID);
+
+         for (final Packet packet : resendCache)
+         {
+            doWrite(packet);
+         }
+      }
+   }
+
+   public void lock()
+   {
+      lock.lock();
+
+      failingOver = true;
+
+      lock.unlock();
+   }
+
+   public void unlock()
+   {
+      lock.lock();
+
+      failingOver = false;
+
+      failoverCondition.signalAll();
+
+      lock.unlock();
+   }
+
+   public CoreRemotingConnection getConnection()
+   {
+      return connection;
+   }
+
+   // Needs to be synchronized since can be called by remoting service timer thread too for timeout flush
+   public synchronized void flushConfirmations()
+   {
+      if (resendCache != null && receivedBytes != 0)
+      {
+         receivedBytes = 0;
+
+         final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID.get());
+
+         confirmed.setChannelID(id);
+
+         doWrite(confirmed);
+      }
+   }
+
+   public void confirm(final Packet packet)
+   {
+      if (resendCache != null && packet.isRequiresConfirmations())
+      {
+         lastConfirmedCommandID.incrementAndGet();
+
+         receivedBytes += packet.getPacketSize();
+
+         if (receivedBytes >= confWindowSize)
+         {
+            receivedBytes = 0;
+
+            final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID.get());
+
+            confirmed.setChannelID(id);
+
+            doWrite(confirmed);
+         }
+      }
+   }
+
+   public void clearCommands()
+   {
+      if (resendCache != null)
+      {
+         lastConfirmedCommandID.set(-1);
+
+         firstStoredCommandID = 0;
+
+         resendCache.clear();
+      }
+   }
+
+   public void handlePacket(final Packet packet)
+   {
+      if (packet.getType() == PacketImpl.PACKETS_CONFIRMED)
+      {
+         if (resendCache != null)
+         {
+            final PacketsConfirmedMessage msg = (PacketsConfirmedMessage) packet;
+
+            clearUpTo(msg.getCommandID());
+         }
+
+         if (!connection.isClient())
+         {
+            handler.handlePacket(packet);
+         }
+
+         return;
+      }
+      else
+      {
+         if (packet.isResponse())
+         {
+            confirm(packet);
+
+            lock.lock();
+
+            try
+            {
+               response = packet;
+               sendCondition.signal();
+            }
+            finally
+            {
+               lock.unlock();
+            }
+         }
+         else if (handler != null)
+         {
+            handler.handlePacket(packet);
+         }
+      }
+   }
+
+   private void doWrite(final Packet packet)
+   {
+      final HornetQBuffer buffer = packet.encode(connection);
+
+      connection.getTransportConnection().write(buffer, false, false);
+   }
+
+   private void clearUpTo(final int lastReceivedCommandID)
+   {
+      final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
+
+      if (numberToClear == -1)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.invalidCommandID(lastReceivedCommandID);
+      }
+
+      int sizeToFree = 0;
+
+      for (int i = 0; i < numberToClear; i++)
+      {
+         final Packet packet = resendCache.poll();
+
+         if (packet == null)
+         {
+            if (lastReceivedCommandID > 0)
+            {
+               HornetQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID);
+            }
+            firstStoredCommandID = lastReceivedCommandID + 1;
+            return;
+         }
+
+         if (packet.getType() != PacketImpl.PACKETS_CONFIRMED)
+         {
+            sizeToFree += packet.getPacketSize();
+         }
+
+         if (commandConfirmationHandler != null)
+         {
+            commandConfirmationHandler.commandConfirmed(packet);
+         }
+      }
+
+      firstStoredCommandID += numberToClear;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "Channel[id=" + CHANNEL_ID.idToString(id) + ", handler=" + handler + "]";
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManager.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManager.java
new file mode 100644
index 0000000..19b48a8
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManager.java
@@ -0,0 +1,613 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.core.impl;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import io.netty.channel.ChannelPipeline;
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQExceptionType;
+import org.apache.activemq6.api.core.HornetQInterruptedException;
+import org.apache.activemq6.api.core.Interceptor;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.client.ClientSessionFactory;
+import org.apache.activemq6.api.core.client.HornetQClient;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+import org.apache.activemq6.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq6.core.protocol.ClientPacketDecoder;
+import org.apache.activemq6.core.protocol.core.Channel;
+import org.apache.activemq6.core.protocol.core.ChannelHandler;
+import org.apache.activemq6.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq6.core.protocol.core.Packet;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CheckFailoverMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CheckFailoverReplyMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSessionMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.Ping;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
+import org.apache.activemq6.core.remoting.impl.netty.HornetQFrameDecoder2;
+import org.apache.activemq6.core.version.Version;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+import org.apache.activemq6.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq6.spi.core.remoting.Connection;
+import org.apache.activemq6.spi.core.remoting.TopologyResponseHandler;
+import org.apache.activemq6.spi.core.remoting.SessionContext;
+import org.apache.activemq6.utils.VersionLoader;
+
+/**
+ * This class will return specific packets for different types of actions happening on a messaging protocol.
+ * <p/>
+ * This is trying to unify the Core client into multiple protocols.
+ * <p/>
+ * Returning null in certain packets means no action is taken on this specific protocol.
+ * <p/>
+ * Semantic properties could also be added to this implementation.
+ * <p/>
+ * Implementations of this class need to be stateless.
+ *
+ * @author Clebert Suconic
+ */
+
+public class HornetQClientProtocolManager implements ClientProtocolManager
+{
+   private final int versionID = VersionLoader.getVersion().getIncrementingVersion();
+
+   private ClientSessionFactoryInternal factoryInternal;
+
+   /**
+    * Guards assignments to {@link #inCreateSession} and {@link #inCreateSessionLatch}
+    */
+   private final Object inCreateSessionGuard = new Object();
+
+   /**
+    * Flag that tells whether we are trying to create a session.
+    */
+   private boolean inCreateSession;
+
+   /**
+    * Used to wait for the creation of a session.
+    */
+   private CountDownLatch inCreateSessionLatch;
+
+   protected volatile RemotingConnectionImpl connection;
+
+   protected TopologyResponseHandler topologyResponseHandler;
+
+   /**
+    * Flag that signals that the communication is closing. Causes many processes to exit.
+    */
+   private volatile boolean alive = true;
+
+   private final CountDownLatch waitLatch = new CountDownLatch(1);
+
+
+   public HornetQClientProtocolManager()
+   {
+   }
+
+   public String getName()
+   {
+      return HornetQClient.DEFAULT_CORE_PROTOCOL;
+   }
+
+   public void setSessionFactory(ClientSessionFactory factory)
+   {
+      this.factoryInternal = (ClientSessionFactoryInternal)factory;
+   }
+
+   public ClientSessionFactory getSessionFactory()
+   {
+      return this.factoryInternal;
+   }
+
+   @Override
+   public void addChannelHandlers(ChannelPipeline pipeline)
+   {
+      pipeline.addLast("hornetq-decoder", new HornetQFrameDecoder2());
+   }
+
+   public boolean waitOnLatch(long milliseconds) throws InterruptedException
+   {
+      return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS);
+   }
+
+   public Channel getChannel0()
+   {
+      if (connection == null)
+      {
+         return null;
+      }
+      else
+      {
+         return connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1);
+      }
+   }
+
+   public RemotingConnection getCurrentConnection()
+   {
+      return connection;
+   }
+
+
+   public Channel getChannel1()
+   {
+      if (connection == null)
+      {
+         return null;
+      }
+      else
+      {
+         return connection.getChannel(1, -1);
+      }
+   }
+
+   public Lock lockSessionCreation()
+   {
+      try
+      {
+         Lock localFailoverLock = factoryInternal.lockFailover();
+         try
+         {
+            if (connection == null)
+            {
+               return null;
+            }
+
+            Lock lock = getChannel1().getLock();
+
+            // Lock it - this must be done while the failoverLock is held
+            while (isAlive() && !lock.tryLock(100, TimeUnit.MILLISECONDS))
+            {
+            }
+
+            return lock;
+         }
+         finally
+         {
+            localFailoverLock.unlock();
+         }
+         // We can now release the failoverLock
+      }
+      catch (InterruptedException e)
+      {
+         Thread.currentThread().interrupt();
+         return null;
+      }
+   }
+
+
+   public void stop()
+   {
+      alive = false;
+
+
+      synchronized (inCreateSessionGuard)
+      {
+         if (inCreateSessionLatch != null)
+            inCreateSessionLatch.countDown();
+      }
+
+
+      Channel channel1 = getChannel1();
+      if (channel1 != null)
+      {
+         channel1.returnBlocking();
+      }
+
+      waitLatch.countDown();
+
+   }
+
+   public boolean isAlive()
+   {
+      return alive;
+   }
+
+
+   @Override
+   public void ping(long connectionTTL)
+   {
+      Channel channel = connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1);
+
+      Ping ping = new Ping(connectionTTL);
+
+      channel.send(ping);
+
+      connection.flush();
+   }
+
+   @Override
+   public void sendSubscribeTopology(final boolean isServer)
+   {
+      getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer,
+                                                                      VersionLoader.getVersion()
+                                                                         .getIncrementingVersion()));
+   }
+
+   @Override
+   public SessionContext createSessionContext(String name, String username, String password,
+                                              boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
+                                              boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws HornetQException
+   {
+      for (Version clientVersion : VersionLoader.getClientVersions())
+      {
+         try
+         {
+            return createSessionContext(clientVersion,
+                                        name,
+                                        username,
+                                        password,
+                                        xa,
+                                        autoCommitSends,
+                                        autoCommitAcks,
+                                        preAcknowledge,
+                                        minLargeMessageSize,
+                                        confirmationWindowSize);
+         }
+         catch (HornetQException e)
+         {
+            if (e.getType() != HornetQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS)
+            {
+               throw e;
+            }
+         }
+      }
+      connection.destroy();
+      throw new HornetQException(HornetQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS);
+   }
+
+   public SessionContext createSessionContext(Version clientVersion, String name, String username, String password,
+                                              boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
+                                              boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws HornetQException
+   {
+      if (!isAlive())
+         throw HornetQClientMessageBundle.BUNDLE.clientSessionClosed();
+
+      Channel sessionChannel = null;
+      CreateSessionResponseMessage response = null;
+
+      boolean retry;
+      do
+      {
+         retry = false;
+
+         Lock lock = null;
+
+         try
+         {
+
+            lock = lockSessionCreation();
+
+            // We now set a flag saying createSession is executing
+            synchronized (inCreateSessionGuard)
+            {
+               if (!isAlive())
+                  throw HornetQClientMessageBundle.BUNDLE.clientSessionClosed();
+               inCreateSession = true;
+               inCreateSessionLatch = new CountDownLatch(1);
+            }
+
+            long sessionChannelID = connection.generateChannelID();
+
+            Packet request = new CreateSessionMessage(name,
+                                                      sessionChannelID,
+                                                      clientVersion.getIncrementingVersion(),
+                                                      username,
+                                                      password,
+                                                      minLargeMessageSize,
+                                                      xa,
+                                                      autoCommitSends,
+                                                      autoCommitAcks,
+                                                      preAcknowledge,
+                                                      confirmationWindowSize,
+                                                      null);
+
+
+            try
+            {
+               // channel1 reference here has to go away
+               response = (CreateSessionResponseMessage) getChannel1().sendBlocking(request, PacketImpl.CREATESESSION_RESP);
+            }
+            catch (HornetQException cause)
+            {
+               if (!isAlive())
+                  throw cause;
+
+               if (cause.getType() == HornetQExceptionType.UNBLOCKED)
+               {
+                  // This means the thread was blocked on create session and failover unblocked it
+                  // so failover could occur
+
+                  retry = true;
+
+                  continue;
+               }
+               else
+               {
+                  throw cause;
+               }
+            }
+
+            sessionChannel = connection.getChannel(sessionChannelID, confirmationWindowSize);
+
+
+         }
+         catch (Throwable t)
+         {
+            if (lock != null)
+            {
+               lock.unlock();
+               lock = null;
+            }
+
+            if (t instanceof HornetQException)
+            {
+               throw (HornetQException) t;
+            }
+            else
+            {
+               throw HornetQClientMessageBundle.BUNDLE.failedToCreateSession(t);
+            }
+         }
+         finally
+         {
+            if (lock != null)
+            {
+               lock.unlock();
+            }
+
+            // Execution has finished so notify any failover thread that may be waiting for us to be done
+            inCreateSession = false;
+            inCreateSessionLatch.countDown();
+         }
+      }
+      while (retry);
+
+
+      // these objects won't be null, otherwise it would keep retrying on the previous loop
+      return new HornetQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize);
+
+   }
+
+   public boolean cleanupBeforeFailover(HornetQException cause)
+   {
+
+      boolean needToInterrupt;
+
+      CountDownLatch exitLockLatch;
+      Lock lock = lockSessionCreation();
+
+      if (lock == null)
+      {
+         return false;
+      }
+
+      try
+      {
+         synchronized (inCreateSessionGuard)
+         {
+            needToInterrupt = inCreateSession;
+            exitLockLatch = inCreateSessionLatch;
+         }
+      }
+      finally
+      {
+         lock.unlock();
+      }
+
+      if (needToInterrupt)
+      {
+         forceReturnChannel1(cause);
+
+         // Now we need to make sure that the thread has actually exited and returned it's
+         // connections
+         // before failover occurs
+
+         while (inCreateSession && isAlive())
+         {
+            try
+            {
+               if (exitLockLatch != null)
+               {
+                  exitLockLatch.await(500, TimeUnit.MILLISECONDS);
+               }
+            }
+            catch (InterruptedException e1)
+            {
+               throw new HornetQInterruptedException(e1);
+            }
+         }
+      }
+
+      return true;
+   }
+
+   @Override
+   public boolean checkForFailover(String liveNodeID) throws HornetQException
+   {
+      CheckFailoverMessage packet = new CheckFailoverMessage(liveNodeID);
+      CheckFailoverReplyMessage message = (CheckFailoverReplyMessage) getChannel1().sendBlocking(packet,
+                                                                                                 PacketImpl.CHECK_FOR_FAILOVER_REPLY);
+      return message.isOkToFailover();
+   }
+
+
+   public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout,
+                                     List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors,
+                                     TopologyResponseHandler topologyResponseHandler)
+   {
+      this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection,
+                                                                             callTimeout, callFailoverTimeout,
+                                                                             incomingInterceptors, outgoingInterceptors);
+
+      this.topologyResponseHandler = topologyResponseHandler;
+
+      getChannel0().setHandler(new Channel0Handler(connection));
+
+
+      sendHandshake(transportConnection);
+
+      return connection;
+   }
+
+   private void sendHandshake(Connection transportConnection)
+   {
+      if (transportConnection.isUsingProtocolHandling())
+      {
+         // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
+         String handshake = "HORNETQ";
+         HornetQBuffer hqbuffer = connection.createBuffer(handshake.length());
+         hqbuffer.writeBytes(handshake.getBytes());
+         transportConnection.write(hqbuffer);
+      }
+   }
+
+
+   private class Channel0Handler implements ChannelHandler
+   {
+      private final CoreRemotingConnection conn;
+
+      private Channel0Handler(final CoreRemotingConnection conn)
+      {
+         this.conn = conn;
+      }
+
+      public void handlePacket(final Packet packet)
+      {
+         final byte type = packet.getType();
+
+         if (type == PacketImpl.DISCONNECT || type == PacketImpl.DISCONNECT_V2)
+         {
+            final DisconnectMessage msg = (DisconnectMessage) packet;
+            String scaleDownTargetNodeID = null;
+
+            SimpleString nodeID = msg.getNodeID();
+
+            if (packet instanceof DisconnectMessage_V2)
+            {
+               final DisconnectMessage_V2 msg_v2 = (DisconnectMessage_V2) packet;
+               scaleDownTargetNodeID = msg_v2.getScaleDownNodeID() == null ? null : msg_v2.getScaleDownNodeID().toString();
+            }
+
+            if (topologyResponseHandler != null)
+               topologyResponseHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID);
+         }
+         else if (type == PacketImpl.CLUSTER_TOPOLOGY)
+         {
+            ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet;
+            notifyTopologyChange(topMessage);
+         }
+         else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2)
+         {
+            ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet;
+            notifyTopologyChange(topMessage);
+         }
+         else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3)
+         {
+            ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet;
+            notifyTopologyChange(topMessage);
+         }
+         else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY)
+         {
+            System.out.println("Channel0Handler.handlePacket");
+         }
+      }
+
+      /**
+       * @param topMessage
+       */
+      private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage)
+      {
+         final long eventUID;
+         final String backupGroupName;
+         final String scaleDownGroupName;
+         if (topMessage instanceof ClusterTopologyChangeMessage_V3)
+         {
+            eventUID = ((ClusterTopologyChangeMessage_V3) topMessage).getUniqueEventID();
+            backupGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getBackupGroupName();
+            scaleDownGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getScaleDownGroupName();
+         }
+         else if (topMessage instanceof ClusterTopologyChangeMessage_V2)
+         {
+            eventUID = ((ClusterTopologyChangeMessage_V2) topMessage).getUniqueEventID();
+            backupGroupName = ((ClusterTopologyChangeMessage_V2) topMessage).getBackupGroupName();
+            scaleDownGroupName = null;
+         }
+         else
+         {
+            eventUID = System.currentTimeMillis();
+            backupGroupName = null;
+            scaleDownGroupName = null;
+         }
+
+         if (topMessage.isExit())
+         {
+            if (HornetQClientLogger.LOGGER.isDebugEnabled())
+            {
+               HornetQClientLogger.LOGGER.debug("Notifying " + topMessage.getNodeID() + " going down");
+            }
+
+            if (topologyResponseHandler != null)
+            {
+               topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID());
+            }
+         }
+         else
+         {
+            Pair<TransportConfiguration, TransportConfiguration> transportConfig = topMessage.getPair();
+            if (transportConfig.getA() == null && transportConfig.getB() == null)
+            {
+               transportConfig = new Pair<>(conn.getTransportConnection()
+                                               .getConnectorConfig(),
+                                            null);
+            }
+
+            if (topologyResponseHandler != null)
+            {
+               topologyResponseHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), backupGroupName, scaleDownGroupName, transportConfig, topMessage.isLast());
+            }
+         }
+      }
+   }
+
+   protected PacketDecoder getPacketDecoder()
+   {
+      return ClientPacketDecoder.INSTANCE;
+   }
+
+   private void forceReturnChannel1(HornetQException cause)
+   {
+      if (connection != null)
+      {
+         Channel channel1 = connection.getChannel(1, -1);
+
+         if (channel1 != null)
+         {
+            channel1.returnBlocking(cause);
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java
new file mode 100644
index 0000000..09b91c3
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.core.impl;
+
+import org.apache.activemq6.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq6.spi.core.remoting.ClientProtocolManagerFactory;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory
+{
+   private static final long serialVersionUID = 1;
+
+   private static final HornetQClientProtocolManagerFactory INSTANCE = new HornetQClientProtocolManagerFactory();
+
+   private HornetQClientProtocolManagerFactory()
+   {
+   }
+
+   public static final HornetQClientProtocolManagerFactory getInstance()
+   {
+      return INSTANCE;
+   }
+
+   public ClientProtocolManager newProtocolManager()
+   {
+      return new HornetQClientProtocolManager();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQConsumerContext.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQConsumerContext.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQConsumerContext.java
new file mode 100644
index 0000000..c4967f1
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQConsumerContext.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.core.impl;
+
+import org.apache.activemq6.spi.core.remoting.ConsumerContext;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class HornetQConsumerContext extends ConsumerContext
+{
+   private long id;
+
+   public HornetQConsumerContext(long id)
+   {
+      this.id = id;
+   }
+
+   public long getId()
+   {
+      return id;
+   }
+
+   @Override
+   public boolean equals(Object o)
+   {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      HornetQConsumerContext that = (HornetQConsumerContext) o;
+
+      if (id != that.id) return false;
+
+      return true;
+   }
+
+   @Override
+   public int hashCode()
+   {
+      return (int) (id ^ (id >>> 32));
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQSessionContext.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQSessionContext.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQSessionContext.java
new file mode 100644
index 0000000..2ec884d
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQSessionContext.java
@@ -0,0 +1,940 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.core.impl;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQExceptionType;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.api.core.client.ClientConsumer;
+import org.apache.activemq6.api.core.client.ClientSession;
+import org.apache.activemq6.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+import org.apache.activemq6.core.client.impl.AddressQueryImpl;
+import org.apache.activemq6.core.client.impl.ClientConsumerImpl;
+import org.apache.activemq6.core.client.impl.ClientConsumerInternal;
+import org.apache.activemq6.core.client.impl.ClientLargeMessageInternal;
+import org.apache.activemq6.core.client.impl.ClientMessageInternal;
+import org.apache.activemq6.core.client.impl.ClientProducerCreditsImpl;
+import org.apache.activemq6.core.client.impl.ClientSessionImpl;
+import org.apache.activemq6.core.message.impl.MessageInternal;
+import org.apache.activemq6.core.protocol.core.Channel;
+import org.apache.activemq6.core.protocol.core.ChannelHandler;
+import org.apache.activemq6.core.protocol.core.CommandConfirmationHandler;
+import org.apache.activemq6.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq6.core.protocol.core.Packet;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateQueueMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSessionMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.ReattachSessionMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.RollbackMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionCloseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionExpireMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXACommitMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAEndMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAStartMessage;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+import org.apache.activemq6.spi.core.remoting.Connection;
+import org.apache.activemq6.spi.core.remoting.SessionContext;
+import org.apache.activemq6.utils.TokenBucketLimiterImpl;
+
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class HornetQSessionContext extends SessionContext
+{
+   private final Channel sessionChannel;
+   private final int serverVersion;
+   private int confirmationWindow;
+   private final String name;
+
+
+   public HornetQSessionContext(String name, RemotingConnection remotingConnection, Channel sessionChannel, int serverVersion, int confirmationWindow)
+   {
+      super(remotingConnection);
+
+      this.name = name;
+      this.sessionChannel = sessionChannel;
+      this.serverVersion = serverVersion;
+      this.confirmationWindow = confirmationWindow;
+
+      ChannelHandler handler = new ClientSessionPacketHandler();
+      sessionChannel.setHandler(handler);
+
+
+      if (confirmationWindow >= 0)
+      {
+         sessionChannel.setCommandConfirmationHandler(confirmationHandler);
+      }
+   }
+
+
+   private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler()
+   {
+      public void commandConfirmed(final Packet packet)
+      {
+         if (packet.getType() == PacketImpl.SESS_SEND)
+         {
+            SessionSendMessage ssm = (SessionSendMessage) packet;
+            callSendAck(ssm.getHandler(), ssm.getMessage());
+         }
+         else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
+         {
+            SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
+            if (!scm.isContinues())
+            {
+               callSendAck(scm.getHandler(), scm.getMessage());
+            }
+         }
+      }
+
+      private void callSendAck(SendAcknowledgementHandler handler, final Message message)
+      {
+         if (handler != null)
+         {
+            handler.sendAcknowledged(message);
+         }
+         else if (sendAckHandler != null)
+         {
+            sendAckHandler.sendAcknowledged(message);
+         }
+      }
+
+   };
+
+
+   // Failover utility methods
+
+   @Override
+   public void returnBlocking(HornetQException cause)
+   {
+      sessionChannel.returnBlocking(cause);
+   }
+
+   @Override
+   public void lockCommunications()
+   {
+      sessionChannel.lock();
+   }
+
+   @Override
+   public void releaseCommunications()
+   {
+      sessionChannel.setTransferring(false);
+      sessionChannel.unlock();
+   }
+
+   public void cleanup()
+   {
+      sessionChannel.close();
+
+      // if the server is sending a disconnect
+      // any pending blocked operation could hang without this
+      sessionChannel.returnBlocking();
+   }
+
+   @Override
+   public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits)
+   {
+      // nothing to be done here... Flow control here is done on the core side
+   }
+
+
+   public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler)
+   {
+      sessionChannel.setCommandConfirmationHandler(confirmationHandler);
+      this.sendAckHandler = handler;
+   }
+
+   public void createSharedQueue(SimpleString address,
+                                 SimpleString queueName,
+                                 SimpleString filterString,
+                                 boolean durable) throws HornetQException
+   {
+      sessionChannel.sendBlocking(new CreateSharedQueueMessage(address, queueName, filterString, durable, true), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void deleteQueue(final SimpleString queueName) throws HornetQException
+   {
+      sessionChannel.sendBlocking(new SessionDeleteQueueMessage(queueName), PacketImpl.NULL_RESPONSE);
+   }
+
+   public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws HornetQException
+   {
+      SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
+      SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
+
+      return response.toQueueQuery();
+
+   }
+
+
+   public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString,
+                                                int windowSize, int maxRate, int ackBatchSize, boolean browseOnly,
+                                                Executor executor, Executor flowControlExecutor) throws HornetQException
+   {
+      long consumerID = idGenerator.generateID();
+
+      HornetQConsumerContext consumerContext = new HornetQConsumerContext(consumerID);
+
+      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID,
+                                                                              queueName,
+                                                                              filterString,
+                                                                              browseOnly,
+                                                                              true);
+
+      SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
+
+      // The actual windows size that gets used is determined by the user since
+      // could be overridden on the queue settings
+      // The value we send is just a hint
+
+      return new ClientConsumerImpl(session,
+                                    consumerContext,
+                                    queueName,
+                                    filterString,
+                                    browseOnly,
+                                    calcWindowSize(windowSize),
+                                    ackBatchSize,
+                                    maxRate > 0 ? new TokenBucketLimiterImpl(maxRate,
+                                                                             false)
+                                       : null,
+                                    executor,
+                                    flowControlExecutor,
+                                    this,
+                                    queueInfo.toQueueQuery(),
+                                    lookupTCCL());
+   }
+
+
+   public int getServerVersion()
+   {
+      return serverVersion;
+   }
+
+   public ClientSession.AddressQuery addressQuery(final SimpleString address) throws HornetQException
+   {
+      SessionBindingQueryResponseMessage response =
+         (SessionBindingQueryResponseMessage) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
+
+      return new AddressQueryImpl(response.isExists(), response.getQueueNames());
+   }
+
+
+   @Override
+   public void closeConsumer(final ClientConsumer consumer) throws HornetQException
+   {
+      sessionChannel.sendBlocking(new SessionConsumerCloseMessage(getConsumerID(consumer)), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void sendConsumerCredits(final ClientConsumer consumer, final int credits)
+   {
+      sessionChannel.send(new SessionConsumerFlowCreditMessage(getConsumerID(consumer), credits));
+   }
+
+   public void forceDelivery(final ClientConsumer consumer, final long sequence) throws HornetQException
+   {
+      SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(getConsumerID(consumer), sequence);
+      sessionChannel.send(request);
+   }
+
+   public void simpleCommit() throws HornetQException
+   {
+      sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void simpleRollback(boolean lastMessageAsDelivered) throws HornetQException
+   {
+      sessionChannel.sendBlocking(new RollbackMessage(lastMessageAsDelivered), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void sessionStart() throws HornetQException
+   {
+      sessionChannel.send(new PacketImpl(PacketImpl.SESS_START));
+   }
+
+   public void sessionStop() throws HornetQException
+   {
+      sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void addSessionMetadata(String key, String data) throws HornetQException
+   {
+      sessionChannel.sendBlocking(new SessionAddMetaDataMessageV2(key, data), PacketImpl.NULL_RESPONSE);
+   }
+
+
+   public void addUniqueMetaData(String key, String data) throws HornetQException
+   {
+      sessionChannel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void xaCommit(Xid xid, boolean onePhase) throws XAException, HornetQException
+   {
+      SessionXACommitMessage packet = new SessionXACommitMessage(xid, onePhase);
+      SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+
+      if (response.isError())
+      {
+         throw new XAException(response.getResponseCode());
+      }
+
+      if (HornetQClientLogger.LOGGER.isTraceEnabled())
+      {
+         HornetQClientLogger.LOGGER.trace("finished commit on " + ClientSessionImpl.convert(xid) + " with response = " + response);
+      }
+   }
+
+   public void xaEnd(Xid xid, int flags) throws XAException, HornetQException
+   {
+      Packet packet;
+      if (flags == XAResource.TMSUSPEND)
+      {
+         packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
+      }
+      else if (flags == XAResource.TMSUCCESS)
+      {
+         packet = new SessionXAEndMessage(xid, false);
+      }
+      else if (flags == XAResource.TMFAIL)
+      {
+         packet = new SessionXAEndMessage(xid, true);
+      }
+      else
+      {
+         throw new XAException(XAException.XAER_INVAL);
+      }
+
+      SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+
+      if (response.isError())
+      {
+         throw new XAException(response.getResponseCode());
+      }
+   }
+
+
+   public void sendProducerCreditsMessage(final int credits, final SimpleString address)
+   {
+      sessionChannel.send(new SessionRequestProducerCreditsMessage(credits, address));
+   }
+
+   /**
+    * HornetQ does support large messages
+    *
+    * @return
+    */
+   public boolean supportsLargeMessage()
+   {
+      return true;
+   }
+
+   @Override
+   public int getCreditsOnSendingFull(MessageInternal msgI)
+   {
+      return msgI.getEncodeSize();
+   }
+
+   public void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws HornetQException
+   {
+      SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler);
+
+      if (sendBlocking)
+      {
+         sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
+      }
+      else
+      {
+         sessionChannel.sendBatched(packet);
+      }
+   }
+
+   @Override
+   public int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws HornetQException
+   {
+      SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(msgI);
+
+      sessionChannel.send(initialChunk);
+
+      return msgI.getHeadersAndPropertiesEncodeSize();
+   }
+
+   @Override
+   public int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws HornetQException
+   {
+      final boolean requiresResponse = lastChunk && sendBlocking;
+      final SessionSendContinuationMessage chunkPacket =
+         new SessionSendContinuationMessage(msgI, chunk, !lastChunk,
+                                            requiresResponse, messageBodySize, messageHandler);
+
+      if (requiresResponse)
+      {
+         // When sending it blocking, only the last chunk will be blocking.
+         sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+      }
+      else
+      {
+         sessionChannel.send(chunkPacket);
+      }
+
+      return chunkPacket.getPacketSize();
+   }
+
+   public void sendACK(boolean individual, boolean block, final ClientConsumer consumer, final Message message) throws HornetQException
+   {
+      PacketImpl messagePacket;
+      if (individual)
+      {
+         messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
+      }
+      else
+      {
+         messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
+      }
+
+      if (block)
+      {
+         sessionChannel.sendBlocking(messagePacket, PacketImpl.NULL_RESPONSE);
+      }
+      else
+      {
+         sessionChannel.sendBatched(messagePacket);
+      }
+   }
+
+   public void expireMessage(final ClientConsumer consumer, Message message) throws HornetQException
+   {
+      SessionExpireMessage messagePacket = new SessionExpireMessage(getConsumerID(consumer), message.getMessageID());
+
+      sessionChannel.send(messagePacket);
+   }
+
+
+   public void sessionClose() throws HornetQException
+   {
+      sessionChannel.sendBlocking(new SessionCloseMessage(), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void xaForget(Xid xid) throws XAException, HornetQException
+   {
+      SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(new SessionXAForgetMessage(xid), PacketImpl.SESS_XA_RESP);
+
+      if (response.isError())
+      {
+         throw new XAException(response.getResponseCode());
+      }
+   }
+
+   public int xaPrepare(Xid xid) throws XAException, HornetQException
+   {
+      SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid);
+
+      SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+
+      if (response.isError())
+      {
+         throw new XAException(response.getResponseCode());
+      }
+      else
+      {
+         return response.getResponseCode();
+      }
+   }
+
+   public Xid[] xaScan() throws HornetQException
+   {
+      SessionXAGetInDoubtXidsResponseMessage response = (SessionXAGetInDoubtXidsResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS), PacketImpl.SESS_XA_INDOUBT_XIDS_RESP);
+
+      List<Xid> xids = response.getXids();
+
+      Xid[] xidArray = xids.toArray(new Xid[xids.size()]);
+
+      return xidArray;
+   }
+
+   public void xaRollback(Xid xid, boolean wasStarted) throws HornetQException, XAException
+   {
+      SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
+
+      SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+
+      if (response.isError())
+      {
+         throw new XAException(response.getResponseCode());
+      }
+   }
+
+   public void xaStart(Xid xid, int flags) throws XAException, HornetQException
+   {
+      Packet packet;
+      if (flags == XAResource.TMJOIN)
+      {
+         packet = new SessionXAJoinMessage(xid);
+      }
+      else if (flags == XAResource.TMRESUME)
+      {
+         packet = new SessionXAResumeMessage(xid);
+      }
+      else if (flags == XAResource.TMNOFLAGS)
+      {
+         // Don't need to flush since the previous end will have done this
+         packet = new SessionXAStartMessage(xid);
+      }
+      else
+      {
+         throw new XAException(XAException.XAER_INVAL);
+      }
+
+      SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+
+      if (response.isError())
+      {
+         HornetQClientLogger.LOGGER.errorCallingStart(response.getMessage(), response.getResponseCode());
+         throw new XAException(response.getResponseCode());
+      }
+   }
+
+   public boolean configureTransactionTimeout(int seconds) throws HornetQException
+   {
+      SessionXASetTimeoutResponseMessage response = (SessionXASetTimeoutResponseMessage) sessionChannel.sendBlocking(new SessionXASetTimeoutMessage(seconds), PacketImpl.SESS_XA_SET_TIMEOUT_RESP);
+
+      return response.isOK();
+   }
+
+   public int recoverSessionTimeout() throws HornetQException
+   {
+      SessionXAGetTimeoutResponseMessage response = (SessionXAGetTimeoutResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT), PacketImpl.SESS_XA_GET_TIMEOUT_RESP);
+
+      return response.getTimeoutSeconds();
+   }
+
+   public void createQueue(SimpleString address, SimpleString queueName, SimpleString filterString, boolean durable, boolean temp) throws HornetQException
+   {
+      CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp, true);
+      sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
+   }
+
+   @Override
+   public boolean reattachOnNewConnection(RemotingConnection newConnection) throws HornetQException
+   {
+
+      this.remotingConnection = newConnection;
+
+      sessionChannel.transferConnection((CoreRemotingConnection) newConnection);
+
+      Packet request = new ReattachSessionMessage(name, sessionChannel.getLastConfirmedCommandID());
+
+      Channel channel1 = getCoreConnection().getChannel(1, -1);
+
+      ReattachSessionResponseMessage response = (ReattachSessionResponseMessage) channel1.sendBlocking(request, PacketImpl.REATTACH_SESSION_RESP);
+
+      if (response.isReattached())
+      {
+         if (HornetQClientLogger.LOGGER.isDebugEnabled())
+         {
+            HornetQClientLogger.LOGGER.debug("ClientSession reattached fine, replaying commands");
+         }
+         // The session was found on the server - we reattached transparently ok
+
+         sessionChannel.replayCommands(response.getLastConfirmedCommandID());
+
+         return true;
+      }
+      else
+      {
+
+         sessionChannel.clearCommands();
+
+         return false;
+      }
+
+   }
+
+   public void recreateSession(final String username,
+                               final String password,
+                               final int minLargeMessageSize,
+                               final boolean xa,
+                               final boolean autoCommitSends,
+                               final boolean autoCommitAcks,
+                               final boolean preAcknowledge,
+                               final SimpleString defaultAddress) throws HornetQException
+   {
+      Packet createRequest = new CreateSessionMessage(name,
+                                                      sessionChannel.getID(),
+                                                      getServerVersion(),
+                                                      username,
+                                                      password,
+                                                      minLargeMessageSize,
+                                                      xa,
+                                                      autoCommitSends,
+                                                      autoCommitAcks,
+                                                      preAcknowledge,
+                                                      confirmationWindow,
+                                                      defaultAddress == null ? null
+                                                         : defaultAddress.toString());
+      boolean retry;
+      do
+      {
+         try
+         {
+            getCreateChannel().sendBlocking(createRequest, PacketImpl.CREATESESSION_RESP);
+            retry = false;
+         }
+         catch (HornetQException e)
+         {
+            // the session was created while its server was starting, retry it:
+            if (e.getType() == HornetQExceptionType.SESSION_CREATION_REJECTED)
+            {
+               HornetQClientLogger.LOGGER.retryCreateSessionSeverStarting(name);
+               retry = true;
+               // sleep a little bit to avoid spinning too much
+               try
+               {
+                  Thread.sleep(10);
+               }
+               catch (InterruptedException ie)
+               {
+                  Thread.currentThread().interrupt();
+                  throw e;
+               }
+            }
+            else
+            {
+               throw e;
+            }
+         }
+      }
+      while (retry && !session.isClosing());
+   }
+
+   @Override
+   public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws HornetQException
+   {
+      ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
+
+      // We try and recreate any non durable queues, since they probably won't be there unless
+      // they are defined in hornetq-configuration.xml
+      // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
+      if (!queueInfo.isDurable())
+      {
+         CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(),
+                                                                        queueInfo.getName(),
+                                                                        queueInfo.getFilterString(),
+                                                                        false,
+                                                                        queueInfo.isTemporary(),
+                                                                        false);
+
+         sendPacketWithoutLock(sessionChannel, createQueueRequest);
+      }
+
+      SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal),
+                                                                                            consumerInternal.getQueueName(),
+                                                                                            consumerInternal.getFilterString(),
+                                                                                            consumerInternal.isBrowseOnly(),
+                                                                                            false);
+
+      sendPacketWithoutLock(sessionChannel, createConsumerRequest);
+
+      int clientWindowSize = consumerInternal.getClientWindowSize();
+
+      if (clientWindowSize != 0)
+      {
+         SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal),
+                                                                                        clientWindowSize);
+
+         sendPacketWithoutLock(sessionChannel, packet);
+      }
+      else
+      {
+         // https://jira.jboss.org/browse/HORNETQ-522
+         SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal),
+                                                                                        1);
+         sendPacketWithoutLock(sessionChannel, packet);
+      }
+   }
+
+   public void xaFailed(Xid xid) throws HornetQException
+   {
+      sendPacketWithoutLock(sessionChannel, new SessionXAAfterFailedMessage(xid));
+   }
+
+   public void restartSession() throws HornetQException
+   {
+      sendPacketWithoutLock(sessionChannel, new PacketImpl(PacketImpl.SESS_START));
+   }
+
+   @Override
+   public void resetMetadata(HashMap<String, String> metaDataToSend)
+   {
+      // Resetting the metadata after failover
+      for (Map.Entry<String, String> entries : metaDataToSend.entrySet())
+      {
+         sendPacketWithoutLock(sessionChannel, new SessionAddMetaDataMessageV2(entries.getKey(), entries.getValue(), false));
+      }
+   }
+
+
+   private Channel getCreateChannel()
+   {
+      return getCoreConnection().getChannel(1, -1);
+   }
+
+   private CoreRemotingConnection getCoreConnection()
+   {
+      return (CoreRemotingConnection) remotingConnection;
+   }
+
+
+   /**
+    * This doesn't apply to other protocols probably, so it will be a hornetq exclusive feature
+    *
+    * @throws HornetQException
+    */
+   private void handleConsumerDisconnected(DisconnectConsumerMessage packet) throws HornetQException
+   {
+      DisconnectConsumerMessage message = packet;
+
+      session.handleConsumerDisconnect(new HornetQConsumerContext(message.getConsumerId()));
+   }
+
+   private void handleReceivedMessagePacket(SessionReceiveMessage messagePacket) throws Exception
+   {
+      ClientMessageInternal msgi = (ClientMessageInternal) messagePacket.getMessage();
+
+      msgi.setDeliveryCount(messagePacket.getDeliveryCount());
+
+      msgi.setFlowControlSize(messagePacket.getPacketSize());
+
+      handleReceiveMessage(new HornetQConsumerContext(messagePacket.getConsumerID()), msgi);
+   }
+
+   private void handleReceiveLargeMessage(SessionReceiveLargeMessage serverPacket) throws Exception
+   {
+      ClientLargeMessageInternal clientLargeMessage = (ClientLargeMessageInternal) serverPacket.getLargeMessage();
+
+      clientLargeMessage.setFlowControlSize(serverPacket.getPacketSize());
+
+      clientLargeMessage.setDeliveryCount(serverPacket.getDeliveryCount());
+
+      handleReceiveLargeMessage(new HornetQConsumerContext(serverPacket.getConsumerID()), clientLargeMessage, serverPacket.getLargeMessageSize());
+   }
+
+
+   private void handleReceiveContinuation(SessionReceiveContinuationMessage continuationPacket) throws Exception
+   {
+      handleReceiveContinuation(new HornetQConsumerContext(continuationPacket.getConsumerID()), continuationPacket.getBody(), continuationPacket.getPacketSize(),
+                                continuationPacket.isContinues());
+   }
+
+
+   protected void handleReceiveProducerCredits(SessionProducerCreditsMessage message)
+   {
+      handleReceiveProducerCredits(message.getAddress(), message.getCredits());
+   }
+
+
+   protected void handleReceiveProducerFailCredits(SessionProducerCreditsFailMessage message)
+   {
+      handleReceiveProducerFailCredits(message.getAddress(), message.getCredits());
+   }
+
+   class ClientSessionPacketHandler implements ChannelHandler
+   {
+
+      public void handlePacket(final Packet packet)
+      {
+         byte type = packet.getType();
+
+         try
+         {
+            switch (type)
+            {
+               case DISCONNECT_CONSUMER:
+               {
+                  handleConsumerDisconnected((DisconnectConsumerMessage) packet);
+                  break;
+               }
+               case SESS_RECEIVE_CONTINUATION:
+               {
+                  handleReceiveContinuation((SessionReceiveContinuationMessage) packet);
+
+                  break;
+               }
+               case SESS_RECEIVE_MSG:
+               {
+                  handleReceivedMessagePacket((SessionReceiveMessage) packet);
+
+                  break;
+               }
+               case SESS_RECEIVE_LARGE_MSG:
+               {
+                  handleReceiveLargeMessage((SessionReceiveLargeMessage) packet);
+
+                  break;
+               }
+               case PacketImpl.SESS_PRODUCER_CREDITS:
+               {
+                  handleReceiveProducerCredits((SessionProducerCreditsMessage) packet);
+
+                  break;
+               }
+               case PacketImpl.SESS_PRODUCER_FAIL_CREDITS:
+               {
+                  handleReceiveProducerFailCredits((SessionProducerCreditsFailMessage) packet);
+
+                  break;
+               }
+               case EXCEPTION:
+               {
+                  // We can only log these exceptions
+                  // maybe we should cache it on SessionContext and throw an exception on any next calls
+                  HornetQExceptionMessage mem = (HornetQExceptionMessage) packet;
+
+                  HornetQClientLogger.LOGGER.receivedExceptionAsynchronously(mem.getException());
+
+                  break;
+               }
+               default:
+               {
+                  throw new IllegalStateException("Invalid packet: " + type);
+               }
+            }
+         }
+         catch (Exception e)
+         {
+            HornetQClientLogger.LOGGER.failedToHandlePacket(e);
+         }
+
+         sessionChannel.confirm(packet);
+      }
+   }
+
+   private long getConsumerID(ClientConsumer consumer)
+   {
+      return ((HornetQConsumerContext)consumer.getConsumerContext()).getId();
+   }
+
+   private ClassLoader lookupTCCL()
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
+         {
+            return Thread.currentThread().getContextClassLoader();
+         }
+      });
+
+   }
+
+   private int calcWindowSize(final int windowSize)
+   {
+      int clientWindowSize;
+      if (windowSize == -1)
+      {
+         // No flow control - buffer can increase without bound! Only use with
+         // caution for very fast consumers
+         clientWindowSize = -1;
+      }
+      else if (windowSize == 0)
+      {
+         // Slow consumer - no buffering
+         clientWindowSize = 0;
+      }
+      else if (windowSize == 1)
+      {
+         // Slow consumer = buffer 1
+         clientWindowSize = 1;
+      }
+      else if (windowSize > 1)
+      {
+         // Client window size is half server window size
+         clientWindowSize = windowSize >> 1;
+      }
+      else
+      {
+         throw HornetQClientMessageBundle.BUNDLE.invalidWindowSize(windowSize);
+      }
+
+      return clientWindowSize;
+   }
+
+
+   private void sendPacketWithoutLock(final Channel parameterChannel, final Packet packet)
+   {
+      packet.setChannelID(parameterChannel.getID());
+
+      Connection conn = parameterChannel.getConnection().getTransportConnection();
+
+      HornetQBuffer buffer = packet.encode(this.getCoreConnection());
+
+      conn.write(buffer, false, false);
+   }
+
+
+}


Mime
View raw message