activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [41/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:01:11 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java
index d5bfc7e..b2bad9a 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java
@@ -18,6 +18,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 
+import io.netty.channel.ChannelPipeline;
+import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.HornetQExceptionType;
 import org.hornetq.api.core.HornetQInterruptedException;
@@ -25,6 +27,8 @@ import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.client.HornetQClientLogger;
 import org.hornetq.core.client.HornetQClientMessageBundle;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
@@ -44,11 +48,12 @@ import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
 import org.hornetq.core.protocol.core.impl.wireformat.Ping;
 import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
+import org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2;
 import org.hornetq.core.version.Version;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.remoting.ClientProtocolManager;
 import org.hornetq.spi.core.remoting.Connection;
-import org.hornetq.spi.core.remoting.ProtocolResponseHandler;
+import org.hornetq.spi.core.remoting.TopologyResponseHandler;
 import org.hornetq.spi.core.remoting.SessionContext;
 import org.hornetq.utils.VersionLoader;
 
@@ -70,7 +75,7 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
 {
    private final int versionID = VersionLoader.getVersion().getIncrementingVersion();
 
-   private final ClientSessionFactoryInternal factoryInternal;
+   private ClientSessionFactoryInternal factoryInternal;
 
    /**
     * Guards assignments to {@link #inCreateSession} and {@link #inCreateSessionLatch}
@@ -87,12 +92,9 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
     */
    private CountDownLatch inCreateSessionLatch;
 
-
-   protected PacketDecoder packetDecoder = ClientPacketDecoder.INSTANCE;
-
    protected volatile RemotingConnectionImpl connection;
 
-   protected ProtocolResponseHandler callbackHandler;
+   protected TopologyResponseHandler topologyResponseHandler;
 
    /**
     * Flag that signals that the communication is closing. Causes many processes to exit.
@@ -102,15 +104,29 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
    private final CountDownLatch waitLatch = new CountDownLatch(1);
 
 
-   public HornetQClientProtocolManager(ClientSessionFactoryInternal factory)
+   public HornetQClientProtocolManager()
    {
-      this.factoryInternal = factory;
    }
 
+   public String getName()
+   {
+      return HornetQClient.DEFAULT_CORE_PROTOCOL;
+   }
+
+   public void setSessionFactory(ClientSessionFactory factory)
+   {
+      this.factoryInternal = (ClientSessionFactoryInternal)factory;
+   }
 
-   public void replacePacketDecoder(PacketDecoder decoder)
+   public ClientSessionFactory getSessionFactory()
    {
-      this.packetDecoder = decoder;
+      return this.factoryInternal;
+   }
+
+   @Override
+   public void addChannelHandlers(ChannelPipeline pipeline)
+   {
+      pipeline.addLast("hornetq-decoder", new HornetQFrameDecoder2());
    }
 
    public boolean waitOnLatch(long milliseconds) throws InterruptedException
@@ -193,7 +209,6 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
          if (inCreateSessionLatch != null)
             inCreateSessionLatch.countDown();
       }
-      forceReturnChannel1();
 
 
       Channel channel1 = getChannel1();
@@ -212,16 +227,6 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
    }
 
 
-   public void setConnection(RemotingConnection connection)
-   {
-      this.connection = (RemotingConnectionImpl) connection;
-   }
-
-   @Override
-   public void shakeHands()
-   {
-   }
-
    @Override
    public void ping(long connectionTTL)
    {
@@ -234,14 +239,6 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
       connection.flush();
    }
 
-   public void setResponseHandler(ProtocolResponseHandler handler)
-   {
-      this.callbackHandler = handler;
-
-      getChannel0().setHandler(new Channel0Handler(connection));
-   }
-
-
    @Override
    public void sendSubscribeTopology(final boolean isServer)
    {
@@ -395,7 +392,7 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
 
    }
 
-   public boolean cleanupBeforeFailover()
+   public boolean cleanupBeforeFailover(HornetQException cause)
    {
 
       boolean needToInterrupt;
@@ -423,7 +420,7 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
 
       if (needToInterrupt)
       {
-         forceReturnChannel1();
+         forceReturnChannel1(cause);
 
          // Now we need to make sure that the thread has actually exited and returned it's
          // connections
@@ -460,15 +457,32 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
 
    public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout,
                                      List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors,
-                                     ProtocolResponseHandler protocolResponseHandler)
+                                     TopologyResponseHandler topologyResponseHandler)
    {
-      RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(packetDecoder, transportConnection,
+      this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection,
                                                                              callTimeout, callFailoverTimeout,
                                                                              incomingInterceptors, outgoingInterceptors);
-      setConnection(remotingConnection);
-      this.setResponseHandler(protocolResponseHandler);
 
-      return remotingConnection;
+      this.topologyResponseHandler = topologyResponseHandler;
+
+      getChannel0().setHandler(new Channel0Handler(connection));
+
+
+      sendHandshake(transportConnection);
+
+      return connection;
+   }
+
+   private void sendHandshake(Connection transportConnection)
+   {
+      if (transportConnection.isUsingProtocolHandling())
+      {
+         // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
+         String handshake = "HORNETQ";
+         HornetQBuffer hqbuffer = connection.createBuffer(handshake.length());
+         hqbuffer.writeBytes(handshake.getBytes());
+         transportConnection.write(hqbuffer);
+      }
    }
 
 
@@ -498,8 +512,18 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
                scaleDownTargetNodeID = msg_v2.getScaleDownNodeID() == null ? null : msg_v2.getScaleDownNodeID().toString();
             }
 
-            if (callbackHandler != null)
-               callbackHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID);
+            if (topologyResponseHandler != null)
+               topologyResponseHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID);
+         }
+         else if (type == PacketImpl.CLUSTER_TOPOLOGY)
+         {
+            ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet;
+            notifyTopologyChange(topMessage);
+         }
+         else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2)
+         {
+            ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet;
+            notifyTopologyChange(topMessage);
          }
          else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3)
          {
@@ -546,8 +570,10 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
                HornetQClientLogger.LOGGER.debug("Notifying " + topMessage.getNodeID() + " going down");
             }
 
-            if (callbackHandler != null)
-               callbackHandler.notifyNodeDown(eventUID, topMessage.getNodeID());
+            if (topologyResponseHandler != null)
+            {
+               topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID());
+            }
          }
          else
          {
@@ -559,13 +585,20 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
                                             null);
             }
 
-            if (callbackHandler != null)
-               callbackHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), backupGroupName, scaleDownGroupName, transportConfig, topMessage.isLast());
+            if (topologyResponseHandler != null)
+            {
+               topologyResponseHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), backupGroupName, scaleDownGroupName, transportConfig, topMessage.isLast());
+            }
          }
       }
    }
 
-   private void forceReturnChannel1()
+   protected PacketDecoder getPacketDecoder()
+   {
+      return ClientPacketDecoder.INSTANCE;
+   }
+
+   private void forceReturnChannel1(HornetQException cause)
    {
       if (connection != null)
       {
@@ -573,7 +606,7 @@ public class HornetQClientProtocolManager implements ClientProtocolManager
 
          if (channel1 != null)
          {
-            channel1.returnBlocking();
+            channel1.returnBlocking(cause);
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java
index d6b7f08..0a066f2 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.protocol.core.impl;
 
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.spi.core.remoting.ClientProtocolManager;
 import org.hornetq.spi.core.remoting.ClientProtocolManagerFactory;
 
@@ -23,8 +22,21 @@ import org.hornetq.spi.core.remoting.ClientProtocolManagerFactory;
 
 public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory
 {
-   public ClientProtocolManager newProtocolManager(ClientSessionFactoryInternal factoryInternal)
+   private static final long serialVersionUID = 1;
+
+   private static final HornetQClientProtocolManagerFactory INSTANCE = new HornetQClientProtocolManagerFactory();
+
+   private HornetQClientProtocolManagerFactory()
+   {
+   }
+
+   public static final HornetQClientProtocolManagerFactory getInstance()
+   {
+      return INSTANCE;
+   }
+
+   public ClientProtocolManager newProtocolManager()
    {
-      return new HornetQClientProtocolManager(factoryInternal);
+      return new HornetQClientProtocolManager();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQConsumerContext.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQConsumerContext.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQConsumerContext.java
new file mode 100644
index 0000000..b63f372
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQConsumerContext.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl;
+
+import org.hornetq.spi.core.remoting.ConsumerContext;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class HornetQConsumerContext extends ConsumerContext
+{
+   private long id;
+
+   public HornetQConsumerContext(long id)
+   {
+      this.id = id;
+   }
+
+   public long getId()
+   {
+      return id;
+   }
+
+   @Override
+   public boolean equals(Object o)
+   {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      HornetQConsumerContext that = (HornetQConsumerContext) o;
+
+      if (id != that.id) return false;
+
+      return true;
+   }
+
+   @Override
+   public int hashCode()
+   {
+      return (int) (id ^ (id >>> 32));
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQSessionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQSessionContext.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQSessionContext.java
index 96a8181..311e937 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQSessionContext.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQSessionContext.java
@@ -38,6 +38,7 @@ import org.hornetq.core.client.impl.ClientConsumerImpl;
 import org.hornetq.core.client.impl.ClientConsumerInternal;
 import org.hornetq.core.client.impl.ClientLargeMessageInternal;
 import org.hornetq.core.client.impl.ClientMessageInternal;
+import org.hornetq.core.client.impl.ClientProducerCreditsImpl;
 import org.hornetq.core.client.impl.ClientSessionImpl;
 import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.protocol.core.Channel;
@@ -111,12 +112,14 @@ public class HornetQSessionContext extends SessionContext
    private final Channel sessionChannel;
    private final int serverVersion;
    private int confirmationWindow;
+   private final String name;
 
 
    public HornetQSessionContext(String name, RemotingConnection remotingConnection, Channel sessionChannel, int serverVersion, int confirmationWindow)
    {
-      super(name, remotingConnection);
+      super(remotingConnection);
 
+      this.name = name;
       this.sessionChannel = sessionChannel;
       this.serverVersion = serverVersion;
       this.confirmationWindow = confirmationWindow;
@@ -169,9 +172,9 @@ public class HornetQSessionContext extends SessionContext
    // Failover utility methods
 
    @Override
-   public void returnBlocking()
+   public void returnBlocking(HornetQException cause)
    {
-      sessionChannel.returnBlocking();
+      sessionChannel.returnBlocking(cause);
    }
 
    @Override
@@ -196,6 +199,12 @@ public class HornetQSessionContext extends SessionContext
       sessionChannel.returnBlocking();
    }
 
+   @Override
+   public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits)
+   {
+      // nothing to be done here... Flow control here is done on the core side
+   }
+
 
    public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler)
    {
@@ -232,6 +241,8 @@ public class HornetQSessionContext extends SessionContext
    {
       long consumerID = idGenerator.generateID();
 
+      HornetQConsumerContext consumerContext = new HornetQConsumerContext(consumerID);
+
       SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID,
                                                                               queueName,
                                                                               filterString,
@@ -245,7 +256,7 @@ public class HornetQSessionContext extends SessionContext
       // The value we send is just a hint
 
       return new ClientConsumerImpl(session,
-                                    consumerID,
+                                    consumerContext,
                                     queueName,
                                     filterString,
                                     browseOnly,
@@ -279,17 +290,17 @@ public class HornetQSessionContext extends SessionContext
    @Override
    public void closeConsumer(final ClientConsumer consumer) throws HornetQException
    {
-      sessionChannel.sendBlocking(new SessionConsumerCloseMessage((long) consumer.getId()), PacketImpl.NULL_RESPONSE);
+      sessionChannel.sendBlocking(new SessionConsumerCloseMessage(getConsumerID(consumer)), PacketImpl.NULL_RESPONSE);
    }
 
    public void sendConsumerCredits(final ClientConsumer consumer, final int credits)
    {
-      sessionChannel.send(new SessionConsumerFlowCreditMessage((long) consumer.getId(), credits));
+      sessionChannel.send(new SessionConsumerFlowCreditMessage(getConsumerID(consumer), credits));
    }
 
    public void forceDelivery(final ClientConsumer consumer, final long sequence) throws HornetQException
    {
-      SessionForceConsumerDelivery request = new SessionForceConsumerDelivery((long) consumer.getId(), sequence);
+      SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(getConsumerID(consumer), sequence);
       sessionChannel.send(request);
    }
 
@@ -390,7 +401,7 @@ public class HornetQSessionContext extends SessionContext
       return msgI.getEncodeSize();
    }
 
-   public void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler) throws HornetQException
+   public void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws HornetQException
    {
       SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler);
 
@@ -440,11 +451,11 @@ public class HornetQSessionContext extends SessionContext
       PacketImpl messagePacket;
       if (individual)
       {
-         messagePacket = new SessionIndividualAcknowledgeMessage((long) consumer.getId(), message.getMessageID(), block);
+         messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
       }
       else
       {
-         messagePacket = new SessionAcknowledgeMessage((long) consumer.getId(), message.getMessageID(), block);
+         messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
       }
 
       if (block)
@@ -459,7 +470,7 @@ public class HornetQSessionContext extends SessionContext
 
    public void expireMessage(final ClientConsumer consumer, Message message) throws HornetQException
    {
-      SessionExpireMessage messagePacket = new SessionExpireMessage((long) consumer.getId(), message.getMessageID());
+      SessionExpireMessage messagePacket = new SessionExpireMessage(getConsumerID(consumer), message.getMessageID());
 
       sessionChannel.send(messagePacket);
    }
@@ -682,7 +693,7 @@ public class HornetQSessionContext extends SessionContext
          sendPacketWithoutLock(sessionChannel, createQueueRequest);
       }
 
-      SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(consumerInternal.getID(),
+      SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal),
                                                                                             consumerInternal.getQueueName(),
                                                                                             consumerInternal.getFilterString(),
                                                                                             consumerInternal.isBrowseOnly(),
@@ -694,7 +705,7 @@ public class HornetQSessionContext extends SessionContext
 
       if (clientWindowSize != 0)
       {
-         SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage((long) consumerInternal.getId(),
+         SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal),
                                                                                         clientWindowSize);
 
          sendPacketWithoutLock(sessionChannel, packet);
@@ -702,7 +713,7 @@ public class HornetQSessionContext extends SessionContext
       else
       {
          // https://jira.jboss.org/browse/HORNETQ-522
-         SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage((long) consumerInternal.getId(),
+         SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal),
                                                                                         1);
          sendPacketWithoutLock(sessionChannel, packet);
       }
@@ -748,7 +759,8 @@ public class HornetQSessionContext extends SessionContext
    private void handleConsumerDisconnected(DisconnectConsumerMessage packet) throws HornetQException
    {
       DisconnectConsumerMessage message = packet;
-      session.handleConsumerDisconnect(message.getConsumerId());
+
+      session.handleConsumerDisconnect(new HornetQConsumerContext(message.getConsumerId()));
    }
 
    private void handleReceivedMessagePacket(SessionReceiveMessage messagePacket) throws Exception
@@ -759,7 +771,7 @@ public class HornetQSessionContext extends SessionContext
 
       msgi.setFlowControlSize(messagePacket.getPacketSize());
 
-      handleReceiveMessage(messagePacket.getConsumerID(), msgi);
+      handleReceiveMessage(new HornetQConsumerContext(messagePacket.getConsumerID()), msgi);
    }
 
    private void handleReceiveLargeMessage(SessionReceiveLargeMessage serverPacket) throws Exception
@@ -770,13 +782,13 @@ public class HornetQSessionContext extends SessionContext
 
       clientLargeMessage.setDeliveryCount(serverPacket.getDeliveryCount());
 
-      handleReceiveLargeMessage(serverPacket.getConsumerID(), clientLargeMessage, serverPacket.getLargeMessageSize());
+      handleReceiveLargeMessage(new HornetQConsumerContext(serverPacket.getConsumerID()), clientLargeMessage, serverPacket.getLargeMessageSize());
    }
 
 
    private void handleReceiveContinuation(SessionReceiveContinuationMessage continuationPacket) throws Exception
    {
-      handleReceiveContinuation(continuationPacket.getConsumerID(), continuationPacket.getBody(), continuationPacket.getPacketSize(),
+      handleReceiveContinuation(new HornetQConsumerContext(continuationPacket.getConsumerID()), continuationPacket.getBody(), continuationPacket.getPacketSize(),
                                 continuationPacket.isContinues());
    }
 
@@ -863,6 +875,11 @@ public class HornetQSessionContext extends SessionContext
       }
    }
 
+   private long getConsumerID(ClientConsumer consumer)
+   {
+      return ((HornetQConsumerContext)consumer.getConsumerContext()).getId();
+   }
+
    private ClassLoader lookupTCCL()
    {
       return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
index 0f161df..3afd32e 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -12,31 +12,26 @@
  */
 package org.hornetq.core.protocol.core.impl;
 
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.HornetQInterruptedException;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.client.HornetQClientLogger;
-import org.hornetq.core.client.HornetQClientMessageBundle;
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
 import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
-import org.hornetq.core.remoting.CloseListener;
-import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.security.HornetQPrincipal;
+import org.hornetq.spi.core.protocol.AbstractRemotingConnection;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.utils.SimpleIDGenerator;
 
@@ -44,7 +39,7 @@ import org.hornetq.utils.SimpleIDGenerator;
  * @author <a href="tim.fox@jboss.com">Tim Fox</a>
  * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
  */
-public class RemotingConnectionImpl implements CoreRemotingConnection
+public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection
 {
    // Constants
    // ------------------------------------------------------------------------------------
@@ -58,14 +53,8 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
    // -----------------------------------------------------------------------------------
    private final PacketDecoder packetDecoder;
 
-   private final Connection transportConnection;
-
    private final Map<Long, Channel> channels = new ConcurrentHashMap<Long, Channel>();
 
-   private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
-
-   private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<CloseListener>();
-
    private final long blockingCallTimeout;
 
    private final long blockingCallFailoverTimeout;
@@ -88,16 +77,10 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
 
    private final Object failLock = new Object();
 
-   private volatile boolean dataReceived;
-
-   private final Executor executor;
-
    private volatile boolean executing;
 
    private final SimpleString nodeID;
 
-   private final long creationTime;
-
    private String clientID;
 
    // Constructors
@@ -141,9 +124,9 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
                                   final SimpleString nodeID)
 
    {
-      this.packetDecoder = packetDecoder;
+      super(transportConnection, executor);
 
-      this.transportConnection = transportConnection;
+      this.packetDecoder = packetDecoder;
 
       this.blockingCallTimeout = blockingCallTimeout;
 
@@ -155,11 +138,9 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
 
       this.client = client;
 
-      this.executor = executor;
-
       this.nodeID = nodeID;
 
-      this.creationTime = System.currentTimeMillis();
+      transportConnection.setProtocolConnection(this);
    }
 
 
@@ -173,27 +154,10 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
          ", nodeID=" +
          nodeID +
          ", transportConnection=" +
-         transportConnection +
+         getTransportConnection() +
          "]";
    }
 
-   public Connection getTransportConnection()
-   {
-      return transportConnection;
-   }
-
-   public List<FailureListener> getFailureListeners()
-   {
-      return new ArrayList<FailureListener>(failureListeners);
-   }
-
-   public void setFailureListeners(final List<FailureListener> listeners)
-   {
-      failureListeners.clear();
-
-      failureListeners.addAll(listeners);
-   }
-
    /**
     * @return the clientVersion
     */
@@ -210,21 +174,6 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
       this.clientVersion = clientVersion;
    }
 
-   public Object getID()
-   {
-      return transportConnection.getID();
-   }
-
-   public String getRemoteAddress()
-   {
-      return transportConnection.getRemoteAddress();
-   }
-
-   public long getCreationTime()
-   {
-      return creationTime;
-   }
-
    public synchronized Channel getChannel(final long channelID, final int confWindowSize)
    {
       Channel channel = channels.get(channelID);
@@ -249,83 +198,6 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
       channels.put(channelID, channel);
    }
 
-   public void addFailureListener(final FailureListener listener)
-   {
-      if (listener == null)
-      {
-         throw HornetQClientMessageBundle.BUNDLE.failListenerCannotBeNull();
-      }
-      failureListeners.add(listener);
-   }
-
-   public boolean removeFailureListener(final FailureListener listener)
-   {
-      if (listener == null)
-      {
-         throw HornetQClientMessageBundle.BUNDLE.failListenerCannotBeNull();
-      }
-
-      return failureListeners.remove(listener);
-   }
-
-   public void addCloseListener(final CloseListener listener)
-   {
-      if (listener == null)
-      {
-         throw HornetQClientMessageBundle.BUNDLE.closeListenerCannotBeNull();
-      }
-
-      closeListeners.add(listener);
-   }
-
-   public boolean removeCloseListener(final CloseListener listener)
-   {
-      if (listener == null)
-      {
-         throw HornetQClientMessageBundle.BUNDLE.closeListenerCannotBeNull();
-      }
-
-      return closeListeners.remove(listener);
-   }
-
-   public List<CloseListener> removeCloseListeners()
-   {
-      List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
-
-      closeListeners.clear();
-
-      return ret;
-   }
-
-   public List<FailureListener> removeFailureListeners()
-   {
-      List<FailureListener> ret = new ArrayList<FailureListener>(failureListeners);
-
-      failureListeners.clear();
-
-      return ret;
-   }
-
-   public void setCloseListeners(List<CloseListener> listeners)
-   {
-      closeListeners.clear();
-
-      closeListeners.addAll(listeners);
-   }
-
-   public HornetQBuffer createBuffer(final int size)
-   {
-      return transportConnection.createBuffer(size);
-   }
-
-   /*
-    * This can be called concurrently by more than one thread so needs to be locked
-    */
-   public void fail(final HornetQException me)
-   {
-      fail(me, null);
-   }
-
    public void fail(final HornetQException me, String scaleDownTargetNodeID)
    {
       synchronized (failLock)
@@ -340,6 +212,16 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
 
       HornetQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
 
+
+      try
+      {
+         transportConnection.forceClose();
+      }
+      catch (Throwable e)
+      {
+         HornetQClientLogger.LOGGER.warn(e.getMessage(), e);
+      }
+
       // Then call the listeners
       callFailureListeners(me, scaleDownTargetNodeID);
 
@@ -349,7 +231,7 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
 
       for (Channel channel : channels.values())
       {
-         channel.returnBlocking();
+         channel.returnBlocking(me);
       }
    }
 
@@ -464,15 +346,6 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
       return blockingCallFailoverTimeout;
    }
 
-   public boolean checkDataReceived()
-   {
-      boolean res = dataReceived;
-
-      dataReceived = false;
-
-      return res;
-   }
-
    //We flush any confirmations on the connection - this prevents idle bridges for example
    //sitting there with many unacked messages
    public void flush()
@@ -488,12 +361,11 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
 
    public HornetQPrincipal getDefaultHornetQPrincipal()
    {
-      return transportConnection.getDefaultHornetQPrincipal();
+      return getTransportConnection().getDefaultHornetQPrincipal();
    }
 
    // Buffer Handler implementation
    // ----------------------------------------------------
-
    public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
    {
       try
@@ -539,7 +411,7 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
             doBufferReceived(packet);
          }
 
-         dataReceived = true;
+         super.bufferReceived(connectionID, buffer);
       }
       catch (Exception e)
       {
@@ -565,7 +437,7 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
       }
    }
 
-   private void removeAllChannels()
+   protected void removeAllChannels()
    {
       // We get the transfer lock first - this ensures no packets are being processed AND
       // it's guaranteed no more packets will be processed once this method is complete
@@ -575,55 +447,10 @@ public class RemotingConnectionImpl implements CoreRemotingConnection
       }
    }
 
-   private void callFailureListeners(final HornetQException me, String scaleDownTargetNodeID)
-   {
-      final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
-
-      for (final FailureListener listener : listenersClone)
-      {
-         try
-         {
-            listener.connectionFailed(me, false, scaleDownTargetNodeID);
-         }
-         catch (HornetQInterruptedException interrupted)
-         {
-            // this is an expected behaviour.. no warn or error here
-            HornetQClientLogger.LOGGER.debug("thread interrupted", interrupted);
-         }
-         catch (final Throwable t)
-         {
-            // Failure of one listener to execute shouldn't prevent others
-            // from
-            // executing
-            HornetQClientLogger.LOGGER.errorCallingFailureListener(t);
-         }
-      }
-   }
-
-   private void callClosingListeners()
-   {
-      final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners);
-
-      for (final CloseListener listener : listenersClone)
-      {
-         try
-         {
-            listener.connectionClosed();
-         }
-         catch (final Throwable t)
-         {
-            // Failure of one listener to execute shouldn't prevent others
-            // from
-            // executing
-            HornetQClientLogger.LOGGER.errorCallingFailureListener(t);
-         }
-      }
-   }
-
    private void internalClose()
    {
       // We close the underlying transport connection
-      transportConnection.close();
+      getTransportConnection().close();
 
       for (Channel channel : channels.values())
       {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java
index 50e799b..77811bb 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java
@@ -16,9 +16,9 @@ import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 
 /**
- * Ping is sent on the client side at {@link ClientSessionFactoryImpl}. At the server's side it is
- * treated at {@link RemotingServiceImpl}
- * @see RemotingConnection#checkDataReceived()
+ * Ping is sent on the client side by {@link org.hornetq.core.client.impl.ClientSessionFactoryImpl}. At the server's
+ * side it is handled by {@link org.hornetq.core.remoting.server.impl.RemotingServiceImpl}
+ * @see org.hornetq.spi.core.protocol.RemotingConnection#checkDataReceived()
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
  */
 public final class Ping extends PacketImpl

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java
index 1a54ed2..fac4698 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -27,12 +27,12 @@ public class SessionSendMessage extends MessagePacket
    private boolean requiresResponse;
 
    /**
-    * In case, we are using a different handler than the one set on the {@link ClientSession}
+    * In case, we are using a different handler than the one set on the {@link org.hornetq.api.core.client.ClientSession}
     * <p/>
     * This field is only used at the client side.
     *
-    * @see ClientSession#setSendAcknowledgementHandler(SendAcknowledgementHandler)
-    * @see ClientProducer#send(SimpleString, Message, SendAcknowledgementHandler)
+    * @see org.hornetq.api.core.client.ClientSession#setSendAcknowledgementHandler(SendAcknowledgementHandler)
+    * @see org.hornetq.api.core.client.ClientProducer#send(org.hornetq.api.core.SimpleString, org.hornetq.api.core.Message, SendAcknowledgementHandler)
     */
    private final transient SendAcknowledgementHandler handler;
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/TransportConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/TransportConfigurationUtil.java b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/TransportConfigurationUtil.java
new file mode 100644
index 0000000..58666e0
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/TransportConfigurationUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfigurationHelper;
+import org.hornetq.utils.ClassloadingUtil;
+
+/**
+ * Stores static mappings of class names to ConnectorFactory instances to act as a central repo for ConnectorFactory
+ * objects.
+ *
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
+ */
+
+public class TransportConfigurationUtil
+{
+   private static final Map<String, Map<String, Object>> DEFAULTS = new HashMap<>();
+
+   private static final HashMap<String, Object> EMPTY_HELPER = new HashMap<>();
+
+   public static Map<String, Object> getDefaults(String className)
+   {
+      if (className == null)
+      {
+         /* Returns a new clone of the empty helper.  This allows any parent objects to update the map key/values
+            without polluting the EMPTY_HELPER map. */
+         return (Map<String, Object>) EMPTY_HELPER.clone();
+      }
+
+      if (!DEFAULTS.containsKey(className))
+      {
+         Object object = instantiateObject(className);
+         if (object != null && object instanceof TransportConfigurationHelper)
+         {
+
+            DEFAULTS.put(className, ((TransportConfigurationHelper) object).getDefaults());
+         }
+         else
+         {
+            DEFAULTS.put(className, EMPTY_HELPER);
+         }
+      }
+
+      /* We need to return a copy of the default Map.  This means the defaults parent is able to update the map without
+      modifying the original */
+      return cloneDefaults(DEFAULTS.get(className));
+   }
+
+   private static Object instantiateObject(final String className)
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<Object>()
+      {
+         public Object run()
+         {
+            try
+            {
+               return ClassloadingUtil.newInstanceFromClassLoader(className);
+            }
+            catch (IllegalStateException e)
+            {
+               return null;
+            }
+         }
+      });
+   }
+
+   private static Map<String, Object> cloneDefaults(Map<String, Object> defaults)
+   {
+      Map<String, Object> cloned = new HashMap<String, Object>();
+      for (Map.Entry entry : defaults.entrySet())
+      {
+         cloned.put((String) entry.getKey(), entry.getValue());
+      }
+      return cloned;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java
new file mode 100644
index 0000000..68fc6e1
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.remoting.impl.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+/**
+ * A Netty decoder specially optimised to to decode messages on the core protocol only
+ *
+ * @author <a href="tlee@redhat.com">Trustin Lee</a>
+ * @author <a href="nmaurer@redhat.com">Norman Maurer</a>
+ * @version $Revision: 7839 $, $Date: 2009-08-21 02:26:39 +0900 (2009-08-21, 금) $
+ */
+public class HornetQAMQPFrameDecoder extends LengthFieldBasedFrameDecoder
+{
+   public HornetQAMQPFrameDecoder()
+   {
+      // The interface itself is part of the buffer (hence the -4)
+      super(Integer.MAX_VALUE, 0, 4, -4 , 0);
+   }
+
+
+   @Override
+   protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length)
+   {
+      return super.extractFrame(ctx, buffer, index, length);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java
index 474c452..a848df2 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java
@@ -20,6 +20,7 @@ import java.util.concurrent.Semaphore;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoop;
 import io.netty.handler.ssl.SslHandler;
@@ -30,6 +31,7 @@ import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
 import org.hornetq.core.client.HornetQClientLogger;
 import org.hornetq.core.security.HornetQPrincipal;
+import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.hornetq.spi.core.remoting.ReadyListener;
@@ -66,7 +68,9 @@ public class NettyConnection implements Connection
 
    private final Set<ReadyListener> readyListeners = new ConcurrentHashSet<ReadyListener>();
 
-   // Static --------------------------------------------------------
+   private RemotingConnection protocolConnection;
+
+// Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
@@ -89,9 +93,29 @@ public class NettyConnection implements Connection
 
    // Public --------------------------------------------------------
 
+   public Channel getNettyChannel()
+   {
+      return channel;
+   }
    // Connection implementation ----------------------------
 
 
+   public void forceClose()
+   {
+      if (channel != null)
+      {
+         try
+         {
+            channel.close();
+         }
+         catch (Throwable e)
+         {
+            HornetQClientLogger.LOGGER.warn(e.getMessage(), e);
+         }
+      }
+   }
+
+
    /**
     * This is exposed so users would have the option to look at any data through interceptors
     *
@@ -102,6 +126,16 @@ public class NettyConnection implements Connection
       return channel;
    }
 
+   public RemotingConnection getProtocolConnection()
+   {
+      return protocolConnection;
+   }
+
+   public void setProtocolConnection(RemotingConnection protocolConnection)
+   {
+      this.protocolConnection = protocolConnection;
+   }
+
    public void close()
    {
       if (closed)
@@ -178,6 +212,11 @@ public class NettyConnection implements Connection
 
    public void write(HornetQBuffer buffer, final boolean flush, final boolean batched)
    {
+      write(buffer, flush, batched, null);
+   }
+
+   public void write(HornetQBuffer buffer, final boolean flush, final boolean batched, final ChannelFutureListener futureListener)
+   {
 
       try
       {
@@ -223,7 +262,7 @@ public class NettyConnection implements Connection
             // use a normal promise
             final ByteBuf buf = buffer.byteBuf();
             final ChannelPromise promise;
-            if (flush)
+            if (flush || futureListener != null)
             {
                promise = channel.newPromise();
             }
@@ -236,7 +275,14 @@ public class NettyConnection implements Connection
             boolean inEventLoop = eventLoop.inEventLoop();
             if (!inEventLoop)
             {
-               channel.writeAndFlush(buf, promise);
+               if (futureListener != null)
+               {
+                  channel.writeAndFlush(buf, promise).addListener(futureListener);
+               }
+               else
+               {
+                  channel.writeAndFlush(buf, promise);
+               }
             }
             else
             {
@@ -248,7 +294,14 @@ public class NettyConnection implements Connection
                   @Override
                   public void run()
                   {
-                     channel.writeAndFlush(buf, promise);
+                     if (futureListener != null)
+                     {
+                        channel.writeAndFlush(buf, promise).addListener(futureListener);
+                     }
+                     else
+                     {
+                        channel.writeAndFlush(buf, promise);
+                     }
                   }
                };
                // execute the task on the eventloop
@@ -343,6 +396,12 @@ public class NettyConnection implements Connection
       }
    }
 
+   @Override
+   public boolean isUsingProtocolHandling()
+   {
+      return true;
+   }
+
 
    // Public --------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java
index e053cac..f442c91 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java
@@ -41,6 +41,8 @@ import java.security.AccessController;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -94,16 +96,16 @@ import io.netty.util.ResourceLeakDetector;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import org.hornetq.api.config.HornetQDefaultConfiguration;
-import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.client.HornetQClientLogger;
 import org.hornetq.core.client.HornetQClientMessageBundle;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.protocol.core.impl.HornetQClientProtocolManager;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.spi.core.remoting.AbstractConnector;
 import org.hornetq.spi.core.remoting.BufferHandler;
+import org.hornetq.spi.core.remoting.ClientProtocolManager;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.hornetq.utils.ConfigurationHelper;
@@ -117,6 +119,7 @@ import static org.hornetq.utils.Base64.encodeBytes;
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
  * @author <a href="mailto:tlee@redhat.com">Trustin Lee</a>
  * @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
  */
 public class NettyConnector extends AbstractConnector
 {
@@ -142,10 +145,19 @@ public class NettyConnector extends AbstractConnector
 
    private static final AttributeKey<String> REMOTING_KEY = AttributeKey.valueOf(SEC_HORNETQ_REMOTING_KEY);
 
+   // Default Configuration
+   public static final Map<String, Object> DEFAULT_CONFIG;
+
    static
    {
       // Disable resource leak detection for performance reasons by default
       ResourceLeakDetector.setEnabled(false);
+
+      // Set default Configuration
+      Map<String, Object> config = new HashMap<String , Object>();
+      config.put(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST);
+      config.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT);
+      DEFAULT_CONFIG = Collections.unmodifiableMap(config);
    }
 
    // Attributes ----------------------------------------------------
@@ -229,12 +241,13 @@ public class NettyConnector extends AbstractConnector
 
    private int connectTimeoutMillis;
 
+   private final ClientProtocolManager protocolManager;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-
    public NettyConnector(final Map<String, Object> configuration,
                          final BufferHandler handler,
                          final ConnectionLifeCycleListener listener,
@@ -242,7 +255,22 @@ public class NettyConnector extends AbstractConnector
                          final Executor threadPool,
                          final ScheduledExecutorService scheduledThreadPool)
    {
+      this(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, new HornetQClientProtocolManager());
+   }
+
+
+   public NettyConnector(final Map<String, Object> configuration,
+                         final BufferHandler handler,
+                         final ConnectionLifeCycleListener listener,
+                         final Executor closeExecutor,
+                         final Executor threadPool,
+                         final ScheduledExecutorService scheduledThreadPool,
+                         final ClientProtocolManager protocolManager)
+   {
       super(configuration);
+
+      this.protocolManager = protocolManager;
+
       if (listener == null)
       {
          throw HornetQClientMessageBundle.BUNDLE.nullListener();
@@ -437,10 +465,7 @@ public class NettyConnector extends AbstractConnector
          group = new NioEventLoopGroup(threadsToUse);
       }
       // if we are a servlet wrap the socketChannelFactory
-      if (useServlet)
-      {
-         // TODO: This will be replaced by allow upgrade HTTP connection from Undertow.;
-      }
+
       bootstrap = new Bootstrap();
       bootstrap.channel(channelClazz);
       bootstrap.group(group);
@@ -611,7 +636,8 @@ public class NettyConnector extends AbstractConnector
                pipeline.addLast(httpClientCodec);
                pipeline.addLast("http-upgrade", new HttpUpgradeHandler(pipeline, httpClientCodec));
             }
-            pipeline.addLast(new HornetQFrameDecoder2());
+
+            protocolManager.addChannelHandlers(pipeline);
 
             pipeline.addLast(new HornetQClientChannelHandler(channelGroup, handler, new Listener()));
          }
@@ -761,8 +787,8 @@ public class NettyConnector extends AbstractConnector
                request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.UPGRADE);
 
                final String endpoint = ConfigurationHelper.getStringProperty(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME,
-                       null,
-                       configuration);
+                                                                             null,
+                                                                             configuration);
                if (endpoint != null)
                {
                   request.headers().set(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, endpoint);
@@ -800,7 +826,7 @@ public class NettyConnector extends AbstractConnector
          // No acceptor on a client connection
          Listener connectionListener = new Listener();
          NettyConnection conn = new NettyConnection(configuration, ch, connectionListener, !httpEnabled && batchDelay > 0, false);
-         connectionListener.connectionCreated(null, conn, HornetQClient.DEFAULT_CORE_PROTOCOL);
+         connectionListener.connectionCreated(null, conn, protocolManager.getName());
          return conn;
       }
       else
@@ -876,6 +902,11 @@ public class NettyConnector extends AbstractConnector
                   ctx.close();
                }
             }
+            else if (response.getStatus().code() == HttpResponseStatus.FORBIDDEN.code())
+            {
+               HornetQClientLogger.LOGGER.httpUpgradeNotSupportedByRemoteAcceptor();
+               ctx.close();
+            }
             latch.countDown();
          }
       }
@@ -1062,10 +1093,6 @@ public class NettyConnector extends AbstractConnector
          {
             throw HornetQClientMessageBundle.BUNDLE.connectionExists(connection.getID());
          }
-         String handshake = "HORNETQ";
-         HornetQBuffer buffer = connection.createBuffer(handshake.length());
-         buffer.writeBytes(handshake.getBytes());
-         connection.write(buffer);
       }
 
       public void connectionDestroyed(final Object connectionID)

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnectorFactory.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnectorFactory.java b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnectorFactory.java
index 1b9f9e9..f66b58a 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnectorFactory.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnectorFactory.java
@@ -18,6 +18,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.hornetq.spi.core.remoting.BufferHandler;
+import org.hornetq.spi.core.remoting.ClientProtocolManager;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.hornetq.spi.core.remoting.Connector;
 import org.hornetq.spi.core.remoting.ConnectorFactory;
@@ -26,6 +27,7 @@ import org.hornetq.spi.core.remoting.ConnectorFactory;
  * A NettyConnectorFactory
  *
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
  */
 public class NettyConnectorFactory implements ConnectorFactory
 {
@@ -34,7 +36,8 @@ public class NettyConnectorFactory implements ConnectorFactory
                                     final ConnectionLifeCycleListener listener,
                                     final Executor closeExecutor,
                                     final Executor threadPool,
-                                    final ScheduledExecutorService scheduledThreadPool)
+                                    final ScheduledExecutorService scheduledThreadPool,
+                                    final ClientProtocolManager protocolManager)
    {
       return new NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
    }
@@ -49,4 +52,10 @@ public class NettyConnectorFactory implements ConnectorFactory
    {
       return false;
    }
+
+   @Override
+   public Map<String, Object> getDefaults()
+   {
+      return NettyConnector.DEFAULT_CONFIG;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/ssl/SSLSupport.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/ssl/SSLSupport.java b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/ssl/SSLSupport.java
index cbe93f9..97bd565 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/ssl/SSLSupport.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/ssl/SSLSupport.java
@@ -32,19 +32,15 @@ import org.hornetq.utils.ClassloadingUtil;
 
 /**
  * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author Justin Bertram
  *
- *
+ * Please note, this class supports PKCS#11 keystores, but there are no specific tests in the HornetQ test-suite to
+ * validate/verify this works because this requires a functioning PKCS#11 provider which is not available by default
+ * (see java.security.Security#getProviders()).  The main thing to keep in mind is that PKCS#11 keystores will have a
+ * null keystore path.
  */
 public class SSLSupport
 {
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
    // Public --------------------------------------------------------
 
    public static SSLContext createContext(final String keystoreProvider, final String keystorePath, final String keystorePassword,
@@ -81,17 +77,13 @@ public class SSLSupport
       return supportedSuites.delete(supportedSuites.length() - 2, supportedSuites.length()).toString();
    }
 
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
    // Private -------------------------------------------------------
 
    private static TrustManager[] loadTrustManager(final String trustStoreProvider,
                                                   final String trustStorePath,
                                                   final String trustStorePassword) throws Exception
    {
-      if (trustStorePath == null && ("JKS".equals(trustStoreProvider) || trustStoreProvider == null))
+      if (trustStorePath == null && (trustStoreProvider == null || (trustStoreProvider != null && !"PKCS11".equals(trustStoreProvider.toUpperCase()))))
       {
          return null;
       }
@@ -107,14 +99,11 @@ public class SSLSupport
 
    private static KeyStore loadKeystore(final String keystoreProvider, final String keystorePath, final String keystorePassword) throws Exception
    {
-      assert keystorePath != null || "JKS".equals(keystoreProvider) == false;
-      assert keystorePassword != null;
-
       KeyStore ks = KeyStore.getInstance(keystoreProvider);
       InputStream in = null;
       try
       {
-         if ("JKS".equals(keystoreProvider))
+         if (keystorePath != null)
          {
             URL keystoreURL = SSLSupport.validateStoreURL(keystorePath);
             in = keystoreURL.openStream();
@@ -139,7 +128,7 @@ public class SSLSupport
 
    private static KeyManager[] loadKeyManagers(final String keyStoreProvider, final String keystorePath, final String keystorePassword) throws Exception
    {
-      if (keystorePath == null && ("JKS".equals(keyStoreProvider) || keyStoreProvider == null))
+      if (keystorePath == null && (keyStoreProvider == null || (keyStoreProvider != null && !"PKCS11".equals(keyStoreProvider.toUpperCase()))))
       {
          return null;
       }
@@ -196,7 +185,4 @@ public class SSLSupport
          }
       });
    }
-
-
-   // Inner classes -------------------------------------------------
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/reader/BytesMessageUtil.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/reader/BytesMessageUtil.java b/hornetq-core-client/src/main/java/org/hornetq/reader/BytesMessageUtil.java
new file mode 100644
index 0000000..24ea35c
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/reader/BytesMessageUtil.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.reader;
+
+import org.hornetq.api.core.Message;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class BytesMessageUtil extends MessageUtil
+{
+
+   public static boolean bytesReadBoolean(Message message)
+   {
+      return getBodyBuffer(message).readBoolean();
+   }
+
+   public static byte bytesReadByte(Message message)
+   {
+      return getBodyBuffer(message).readByte();
+   }
+
+   public static int bytesReadUnsignedByte(Message message)
+   {
+      return getBodyBuffer(message).readUnsignedByte();
+   }
+
+   public static short bytesReadShort(Message message)
+   {
+      return getBodyBuffer(message).readShort();
+   }
+
+   public static int bytesReadUnsignedShort(Message message)
+   {
+      return getBodyBuffer(message).readUnsignedShort();
+   }
+
+   public static char bytesReadChar(Message message)
+   {
+      return (char)getBodyBuffer(message).readShort();
+   }
+
+   public static int bytesReadInt(Message message)
+   {
+      return getBodyBuffer(message).readInt();
+   }
+
+   public static long bytesReadLong(Message message)
+   {
+      return getBodyBuffer(message).readLong();
+   }
+
+   public static float bytesReadFloat(Message message)
+   {
+      return Float.intBitsToFloat(getBodyBuffer(message).readInt());
+   }
+
+   public static double bytesReadDouble(Message message)
+   {
+      return Double.longBitsToDouble(getBodyBuffer(message).readLong());
+   }
+
+   public static String bytesReadUTF(Message message)
+   {
+      return getBodyBuffer(message).readUTF();
+   }
+
+
+
+   public static int bytesReadBytes(Message message, final byte[] value)
+   {
+      return bytesReadBytes(message, value, value.length);
+   }
+
+   public static int bytesReadBytes(Message message, final byte[] value, final int length)
+   {
+      if (!getBodyBuffer(message).readable())
+      {
+         return -1;
+      }
+
+      int read = Math.min(length, getBodyBuffer(message).readableBytes());
+
+      if (read != 0)
+      {
+         getBodyBuffer(message).readBytes(value, 0, read);
+      }
+
+      return read;
+
+   }
+
+
+   public static void bytesWriteBoolean(Message message, boolean value)
+   {
+      getBodyBuffer(message).writeBoolean(value);
+   }
+
+
+
+   public static void bytesWriteByte(Message message, byte value)
+   {
+      getBodyBuffer(message).writeByte(value);
+   }
+
+
+
+   public static void bytesWriteShort(Message message, short value)
+   {
+      getBodyBuffer(message).writeShort(value);
+   }
+
+
+   public static void bytesWriteChar(Message message, char value)
+   {
+      getBodyBuffer(message).writeShort((short)value);
+   }
+
+   public static void bytesWriteInt(Message message, int value)
+   {
+      getBodyBuffer(message).writeInt(value);
+   }
+
+   public static void bytesWriteLong(Message message, long value)
+   {
+      getBodyBuffer(message).writeLong(value);
+   }
+
+   public static void bytesWriteFloat(Message message, float value)
+   {
+      getBodyBuffer(message).writeInt(Float.floatToIntBits(value));
+   }
+
+   public static void bytesWriteDouble(Message message, double value)
+   {
+      getBodyBuffer(message).writeLong(Double.doubleToLongBits(value));
+   }
+
+   public static void bytesWriteUTF(Message message, String value)
+   {
+      getBodyBuffer(message).writeUTF(value);
+   }
+
+   public static void bytesWriteBytes(Message message, byte[] value)
+   {
+      getBodyBuffer(message).writeBytes(value);
+   }
+
+   public static void bytesWriteBytes(Message message, final byte[] value, final int offset, final int length)
+   {
+      getBodyBuffer(message).writeBytes(value, offset, length);
+   }
+
+
+   /**
+    * Returns true if it could send the Object to any known format
+    * @param message
+    * @param value
+    * @return
+    */
+   public static boolean bytesWriteObject(Message message, Object value)
+   {
+      if (value == null)
+      {
+         throw new NullPointerException("Attempt to write a null value");
+      }
+      if (value instanceof String)
+      {
+         bytesWriteUTF(message, (String) value);
+      }
+      else if (value instanceof Boolean)
+      {
+         bytesWriteBoolean(message, (Boolean) value);
+      }
+      else if (value instanceof Character)
+      {
+         bytesWriteChar(message, (Character) value);
+      }
+      else if (value instanceof Byte)
+      {
+         bytesWriteByte(message, (Byte) value);
+      }
+      else if (value instanceof Short)
+      {
+         bytesWriteShort(message, (Short) value);
+      }
+      else if (value instanceof Integer)
+      {
+         bytesWriteInt(message, (Integer) value);
+      }
+      else if (value instanceof Long)
+      {
+         bytesWriteLong(message, (Long) value);
+      }
+      else if (value instanceof Float)
+      {
+         bytesWriteFloat(message, (Float) value);
+      }
+      else if (value instanceof Double)
+      {
+         bytesWriteDouble(message, (Double) value);
+      }
+      else if (value instanceof byte[])
+      {
+         bytesWriteBytes(message, (byte[]) value);
+      }
+      else
+      {
+         return false;
+      }
+
+
+      return true;
+   }
+
+   public static void bytesMessageReset(Message message)
+   {
+      getBodyBuffer(message).resetReaderIndex();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/reader/MapMessageUtil.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/reader/MapMessageUtil.java b/hornetq-core-client/src/main/java/org/hornetq/reader/MapMessageUtil.java
new file mode 100644
index 0000000..d55fb30
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/reader/MapMessageUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.reader;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.utils.TypedProperties;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class MapMessageUtil extends MessageUtil
+{
+
+   /**
+    * Utility method to set the map on a message body
+    */
+   public static void writeBodyMap(Message message, TypedProperties properties)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      buff.resetWriterIndex();
+      properties.encode(buff);
+   }
+
+   /**
+    * Utility method to set the map on a message body
+    */
+   public static TypedProperties readBodyMap(Message message)
+   {
+      TypedProperties map = new TypedProperties();
+      readBodyMap(message, map);
+      return map;
+   }
+
+   /**
+    * Utility method to set the map on a message body
+    */
+   public static void readBodyMap(Message message, TypedProperties map)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      buff.resetReaderIndex();
+      map.decode(buff);
+   }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/reader/MessageUtil.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/reader/MessageUtil.java b/hornetq-core-client/src/main/java/org/hornetq/reader/MessageUtil.java
new file mode 100644
index 0000000..a2429db
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/reader/MessageUtil.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.reader;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.HornetQPropertyConversionException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+
+/**
+ * static methods intended for import static on JMS like messages.
+ *
+ * This provides a helper for core message to act some of the JMS functions used by the JMS wrapper
+ *
+ * @author Clebert Suconic
+ */
+
+public class MessageUtil
+{
+   public static final SimpleString CORRELATIONID_HEADER_NAME = new SimpleString("JMSCorrelationID");
+
+   public static final SimpleString REPLYTO_HEADER_NAME = new SimpleString("JMSReplyTo");
+
+   public static final SimpleString TYPE_HEADER_NAME = new SimpleString("JMSType");
+
+   public static final SimpleString JMS = new SimpleString("JMS");
+
+   public static final SimpleString JMSX = new SimpleString("JMSX");
+
+   public static final SimpleString JMS_ = new SimpleString("JMS_");
+
+   public static final String JMSXDELIVERYCOUNT = "JMSXDeliveryCount";
+
+   public static final String JMSXGROUPID = "JMSXGroupID";
+
+   public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__HQ_CID");
+
+
+
+   public static HornetQBuffer getBodyBuffer(Message message)
+   {
+      return message.getBodyBuffer();
+   }
+
+
+
+   public static byte[] getJMSCorrelationIDAsBytes(Message message)
+   {
+      Object obj = message.getObjectProperty(CORRELATIONID_HEADER_NAME);
+
+      if (obj instanceof byte[])
+      {
+         return (byte[])obj;
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+
+
+   public static void setJMSType(Message message, String type)
+   {
+      message.putStringProperty(TYPE_HEADER_NAME, new SimpleString(type));
+   }
+
+   public static String getJMSType(Message message)
+   {
+      SimpleString ss = message.getSimpleStringProperty(TYPE_HEADER_NAME);
+
+      if (ss != null)
+      {
+         return ss.toString();
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+
+   public static final void setJMSCorrelationIDAsBytes(Message message, final byte[] correlationID) throws HornetQException
+   {
+      if (correlationID == null || correlationID.length == 0)
+      {
+         throw new HornetQException("Please specify a non-zero length byte[]");
+      }
+      message.putBytesProperty(CORRELATIONID_HEADER_NAME, correlationID);
+   }
+
+   public static void setJMSCorrelationID(Message message, final String correlationID)
+   {
+      if (correlationID == null)
+      {
+         message.removeProperty(CORRELATIONID_HEADER_NAME);
+      }
+      else
+      {
+         message.putStringProperty(CORRELATIONID_HEADER_NAME, new SimpleString(correlationID));
+      }
+   }
+
+   public static String getJMSCorrelationID(Message message)
+   {
+      try
+      {
+         return message.getStringProperty(CORRELATIONID_HEADER_NAME);
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         return null;
+      }
+   }
+
+
+   public static SimpleString getJMSReplyTo(Message message)
+   {
+      return message.getSimpleStringProperty(REPLYTO_HEADER_NAME);
+   }
+
+   public static void setJMSReplyTo(Message message, final SimpleString dest)
+   {
+
+      if (dest == null)
+      {
+         message.removeProperty(REPLYTO_HEADER_NAME);
+      }
+      else
+      {
+
+         message.putStringProperty(REPLYTO_HEADER_NAME, dest);
+      }
+   }
+
+
+
+   public static void clearProperties(Message message)
+   {
+
+      List<SimpleString> toRemove = new ArrayList<SimpleString>();
+
+      for (SimpleString propName : message.getPropertyNames())
+      {
+         if (!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
+            propName.startsWith(JMS_))
+         {
+            toRemove.add(propName);
+         }
+      }
+
+      for (SimpleString propName : toRemove)
+      {
+         message.removeProperty(propName);
+      }
+   }
+
+
+
+   public static Set<String> getPropertyNames(Message message)
+   {
+      HashSet<String> set = new HashSet<String>();
+
+      for (SimpleString propName : message.getPropertyNames())
+      {
+         if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
+            propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME))
+         {
+            set.add(propName.toString());
+         }
+      }
+
+      set.add(JMSXDELIVERYCOUNT);
+
+      return set;
+   }
+
+   public static boolean propertyExists(Message message, String name)
+   {
+      return message.containsProperty(new SimpleString(name)) || name.equals(MessageUtil.JMSXDELIVERYCOUNT) ||
+         MessageUtil.JMSXGROUPID.equals(name) &&
+            message.containsProperty(org.hornetq.api.core.Message.HDR_GROUP_ID);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/reader/StreamMessageUtil.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/reader/StreamMessageUtil.java b/hornetq-core-client/src/main/java/org/hornetq/reader/StreamMessageUtil.java
new file mode 100644
index 0000000..edefc01
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/reader/StreamMessageUtil.java
@@ -0,0 +1,300 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.reader;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.Pair;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class StreamMessageUtil extends MessageUtil
+{
+   /**
+    * Method to read boolean values out of the Stream protocol existent on JMS Stream Messages
+    * Throws IllegalStateException if the type was invalid
+    *
+    * @param message
+    * @return
+    */
+   public static boolean streamReadBoolean(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+
+      switch (type)
+      {
+         case DataConstants.BOOLEAN:
+            return buff.readBoolean();
+         case DataConstants.STRING:
+            String s = buff.readNullableString();
+            return Boolean.valueOf(s);
+         default:
+            throw new IllegalStateException("Invalid conversion, type byte was " + type);
+      }
+
+   }
+
+   public static byte streamReadByte(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      int index = buff.readerIndex();
+      try
+      {
+         byte type = buff.readByte();
+         switch (type)
+         {
+            case DataConstants.BYTE:
+               return buff.readByte();
+            case DataConstants.STRING:
+               String s = buff.readNullableString();
+               return Byte.parseByte(s);
+            default:
+               throw new IllegalStateException("Invalid conversion");
+         }
+      }
+      catch (NumberFormatException e)
+      {
+         buff.readerIndex(index);
+         throw e;
+      }
+
+   }
+
+   public static short streamReadShort(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.BYTE:
+            return buff.readByte();
+         case DataConstants.SHORT:
+            return buff.readShort();
+         case DataConstants.STRING:
+            String s = buff.readNullableString();
+            return Short.parseShort(s);
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+   }
+
+   public static char streamReadChar(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.CHAR:
+            return (char)buff.readShort();
+         case DataConstants.STRING:
+            String str = buff.readNullableString();
+            if (str == null)
+            {
+               throw new NullPointerException("Invalid conversion");
+            }
+            else
+            {
+               throw new IllegalStateException("Invalid conversion");
+            }
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+
+   }
+
+   public static int streamReadInteger(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.BYTE:
+            return buff.readByte();
+         case DataConstants.SHORT:
+            return buff.readShort();
+         case DataConstants.INT:
+            return buff.readInt();
+         case DataConstants.STRING:
+            String s = buff.readNullableString();
+            return Integer.parseInt(s);
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+   }
+
+
+   public static long streamReadLong(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.BYTE:
+            return buff.readByte();
+         case DataConstants.SHORT:
+            return buff.readShort();
+         case DataConstants.INT:
+            return buff.readInt();
+         case DataConstants.LONG:
+            return buff.readLong();
+         case DataConstants.STRING:
+            String s = buff.readNullableString();
+            return Long.parseLong(s);
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+   }
+
+   public static float streamReadFloat(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.FLOAT:
+            return Float.intBitsToFloat(buff.readInt());
+         case DataConstants.STRING:
+            String s = buff.readNullableString();
+            return Float.parseFloat(s);
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+   }
+
+
+   public static double streamReadDouble(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.FLOAT:
+            return Float.intBitsToFloat(buff.readInt());
+         case DataConstants.DOUBLE:
+            return Double.longBitsToDouble(buff.readLong());
+         case DataConstants.STRING:
+            String s = buff.readNullableString();
+            return Double.parseDouble(s);
+         default:
+            throw new IllegalStateException("Invalid conversion: " + type);
+      }
+   }
+
+
+   public static String streamReadString(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.BOOLEAN:
+            return String.valueOf(buff.readBoolean());
+         case DataConstants.BYTE:
+            return String.valueOf(buff.readByte());
+         case DataConstants.SHORT:
+            return String.valueOf(buff.readShort());
+         case DataConstants.CHAR:
+            return String.valueOf((char)buff.readShort());
+         case DataConstants.INT:
+            return String.valueOf(buff.readInt());
+         case DataConstants.LONG:
+            return String.valueOf(buff.readLong());
+         case DataConstants.FLOAT:
+            return String.valueOf(Float.intBitsToFloat(buff.readInt()));
+         case DataConstants.DOUBLE:
+            return String.valueOf(Double.longBitsToDouble(buff.readLong()));
+         case DataConstants.STRING:
+            return buff.readNullableString();
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+   }
+
+   /**
+    * Utility for reading bytes out of streaming.
+    * It will return remainingBytes, bytesRead
+    * @param remainingBytes remaining Bytes from previous read. Send it to 0 if it was the first call for the message
+    * @param message
+    * @return a pair of remaining bytes and bytes read
+    */
+   public static Pair<Integer, Integer> streamReadBytes(Message message, int remainingBytes, byte[] value)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+
+      if (remainingBytes == -1)
+      {
+         return new Pair<>(0, -1);
+      }
+      else if (remainingBytes == 0)
+      {
+         byte type = buff.readByte();
+         if (type != DataConstants.BYTES)
+         {
+            throw new IllegalStateException("Invalid conversion");
+         }
+         remainingBytes = buff.readInt();
+      }
+      int read = Math.min(value.length, remainingBytes);
+      buff.readBytes(value, 0, read);
+      remainingBytes -= read;
+      if (remainingBytes == 0)
+      {
+         remainingBytes = -1;
+      }
+      return new Pair<>(remainingBytes, read);
+
+   }
+
+   public static Object streamReadObject(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.BOOLEAN:
+            return buff.readBoolean();
+         case DataConstants.BYTE:
+            return buff.readByte();
+         case DataConstants.SHORT:
+            return buff.readShort();
+         case DataConstants.CHAR:
+            return (char)buff.readShort();
+         case DataConstants.INT:
+            return buff.readInt();
+         case DataConstants.LONG:
+            return buff.readLong();
+         case DataConstants.FLOAT:
+            return Float.intBitsToFloat(buff.readInt());
+         case DataConstants.DOUBLE:
+            return Double.longBitsToDouble(buff.readLong());
+         case DataConstants.STRING:
+            return buff.readNullableString();
+         case DataConstants.BYTES:
+            int bufferLen = buff.readInt();
+            byte[] bytes = new byte[bufferLen];
+            buff.readBytes(bytes);
+            return bytes;
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+
+   }
+
+
+}


Mime
View raw message