activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [42/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:01:12 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageImpl.java
index 71fedc1..cb68c47 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageImpl.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageImpl.java
@@ -20,11 +20,13 @@ import java.nio.ByteBuffer;
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.HornetQPropertyConversionException;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.client.HornetQClientMessageBundle;
 import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.reader.MessageUtil;
 
 /**
  *
@@ -37,7 +39,8 @@ import org.hornetq.core.message.impl.MessageImpl;
 public class ClientMessageImpl extends MessageImpl implements ClientMessageInternal
 {
    // added this constant here so that the client package have no dependency on JMS
-   public static final SimpleString REPLYTO_HEADER_NAME = new SimpleString("JMSReplyTo");
+   public static final SimpleString REPLYTO_HEADER_NAME = MessageUtil.REPLYTO_HEADER_NAME;
+
 
    private int deliveryCount;
 
@@ -68,42 +71,54 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
       super(type, durable, expiration, timestamp, priority, initialMessageBufferSize);
    }
 
+   @Override
    public boolean isServerMessage()
    {
       return false;
    }
 
+   @Override
    public void onReceipt(final ClientConsumerInternal consumer)
    {
       this.consumer = consumer;
    }
 
-   public void setDeliveryCount(final int deliveryCount)
+   @Override
+   public ClientMessageImpl setDeliveryCount(final int deliveryCount)
    {
       this.deliveryCount = deliveryCount;
+      return this;
    }
 
+   @Override
    public int getDeliveryCount()
    {
       return deliveryCount;
    }
 
-   public void acknowledge() throws HornetQException
+   @Override
+   public ClientMessageImpl acknowledge() throws HornetQException
    {
       if (consumer != null)
       {
          consumer.acknowledge(this);
       }
+
+      return this;
    }
 
-   public void individualAcknowledge() throws HornetQException
+   @Override
+   public ClientMessageImpl individualAcknowledge() throws HornetQException
    {
       if (consumer != null)
       {
          consumer.individualAcknowledge(this);
       }
+
+      return this;
    }
 
+   @Override
    public int getFlowControlSize()
    {
       if (flowControlSize < 0)
@@ -113,6 +128,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
       return flowControlSize;
    }
 
+   @Override
    public void setFlowControlSize(final int flowControlSize)
    {
       this.flowControlSize = flowControlSize;
@@ -121,16 +137,19 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
    /**
     * @return the largeMessage
     */
+   @Override
    public boolean isLargeMessage()
    {
       return false;
    }
 
+   @Override
    public boolean isCompressed()
    {
       return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED);
    }
 
+   @Override
    public int getBodySize()
    {
       return buffer.writerIndex() - buffer.readerIndex();
@@ -159,9 +178,10 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
    }
 
    @Override
-   public void setOutputStream(final OutputStream out) throws HornetQException
+   public ClientMessageImpl setOutputStream(final OutputStream out) throws HornetQException
    {
       saveToOutputStream(out);
+      return this;
    }
 
    @Override
@@ -178,6 +198,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
    /**
     * @return the bodyInputStream
     */
+   @Override
    public InputStream getBodyInputStream()
    {
       return bodyInputStream;
@@ -186,9 +207,11 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
    /**
     * @param bodyInputStream the bodyInputStream to set
     */
-   public void setBodyInputStream(final InputStream bodyInputStream)
+   @Override
+   public ClientMessageImpl setBodyInputStream(final InputStream bodyInputStream)
    {
       this.bodyInputStream = bodyInputStream;
+      return this;
    }
 
    @Override
@@ -197,21 +220,168 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
       return new DecodingContext();
    }
 
+   @Override
+   public ClientMessageImpl putBooleanProperty(final SimpleString key, final boolean value)
+   {
+      return (ClientMessageImpl) super.putBooleanProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putByteProperty(final SimpleString key, final byte value)
+   {
+      return (ClientMessageImpl) super.putByteProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putBytesProperty(final SimpleString key, final byte[] value)
+   {
+      return (ClientMessageImpl) super.putBytesProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putCharProperty(SimpleString key, char value)
+   {
+      return (ClientMessageImpl) super.putCharProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putCharProperty(String key, char value)
+   {
+      return (ClientMessageImpl) super.putCharProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putShortProperty(final SimpleString key, final short value)
+   {
+      return (ClientMessageImpl) super.putShortProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putIntProperty(final SimpleString key, final int value)
+   {
+      return (ClientMessageImpl) super.putIntProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putLongProperty(final SimpleString key, final long value)
+   {
+      return (ClientMessageImpl) super.putLongProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putFloatProperty(final SimpleString key, final float value)
+   {
+      return (ClientMessageImpl) super.putFloatProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putDoubleProperty(final SimpleString key, final double value)
+   {
+      return (ClientMessageImpl) super.putDoubleProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putStringProperty(final SimpleString key, final SimpleString value)
+   {
+      return (ClientMessageImpl) super.putStringProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putObjectProperty(final SimpleString key, final Object value) throws HornetQPropertyConversionException
+   {
+      return (ClientMessageImpl) super.putObjectProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putObjectProperty(final String key, final Object value) throws HornetQPropertyConversionException
+   {
+      return (ClientMessageImpl) super.putObjectProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putBooleanProperty(final String key, final boolean value)
+   {
+      return (ClientMessageImpl) super.putBooleanProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putByteProperty(final String key, final byte value)
+   {
+      return (ClientMessageImpl) super.putByteProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putBytesProperty(final String key, final byte[] value)
+   {
+      return (ClientMessageImpl) super.putBytesProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putShortProperty(final String key, final short value)
+   {
+      return (ClientMessageImpl) super.putShortProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putIntProperty(final String key, final int value)
+   {
+      return (ClientMessageImpl) super.putIntProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putLongProperty(final String key, final long value)
+   {
+      return (ClientMessageImpl) super.putLongProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putFloatProperty(final String key, final float value)
+   {
+      return (ClientMessageImpl) super.putFloatProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putDoubleProperty(final String key, final double value)
+   {
+      return (ClientMessageImpl) super.putDoubleProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putStringProperty(final String key, final String value)
+   {
+      return (ClientMessageImpl) super.putStringProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl writeBodyBufferBytes(byte[] bytes)
+   {
+      return (ClientMessageImpl) super.writeBodyBufferBytes(bytes);
+   }
+
+   @Override
+   public ClientMessageImpl writeBodyBufferString(String string)
+   {
+      return (ClientMessageImpl) super.writeBodyBufferString(string);
+   }
+
    private final class DecodingContext implements BodyEncoder
    {
       public DecodingContext()
       {
       }
 
+      @Override
       public void open()
       {
          getBodyBuffer().readerIndex(0);
       }
 
+      @Override
       public void close()
       {
       }
 
+      @Override
       public long getLargeBodySize()
       {
          if (isLargeMessage())
@@ -224,12 +394,14 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
          }
       }
 
+      @Override
       public int encode(final ByteBuffer bufferRead) throws HornetQException
       {
          HornetQBuffer buffer1 = HornetQBuffers.wrappedBuffer(bufferRead);
          return encode(buffer1, bufferRead.capacity());
       }
 
+      @Override
       public int encode(final HornetQBuffer bufferOut, final int size)
       {
          byte[] bytes = new byte[size];

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageInternal.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageInternal.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageInternal.java
index 132be0f..2a1e291 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageInternal.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageInternal.java
@@ -13,8 +13,8 @@
 
 package org.hornetq.core.client.impl;
 
+import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.utils.TypedProperties;
 
 /**
@@ -22,7 +22,7 @@ import org.hornetq.utils.TypedProperties;
  *
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
  */
-public interface ClientMessageInternal extends ClientMessage, MessageInternal
+public interface ClientMessageInternal extends ClientMessage
 {
 
    TypedProperties getProperties();
@@ -33,6 +33,8 @@ public interface ClientMessageInternal extends ClientMessage, MessageInternal
    /** Size used for FlowControl */
    void setFlowControlSize(int flowControlSize);
 
+   void setAddressTransient(SimpleString address);
+
    void onReceipt(ClientConsumerInternal consumer);
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManager.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManager.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManager.java
index 5b12ade..156d526 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManager.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManager.java
@@ -13,6 +13,7 @@
 package org.hornetq.core.client.impl;
 
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.spi.core.remoting.SessionContext;
 
 /**
  * A ClientProducerCreditManager
@@ -23,7 +24,7 @@ import org.hornetq.api.core.SimpleString;
  */
 public interface ClientProducerCreditManager
 {
-   ClientProducerCredits getCredits(SimpleString address, boolean anon);
+   ClientProducerCredits getCredits(SimpleString address, boolean anon, SessionContext context);
 
    void returnCredits(SimpleString address);
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
index f9fab0d..7477543 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
@@ -17,6 +17,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.spi.core.remoting.SessionContext;
 
 /**
  * A ProducerCreditManager
@@ -42,7 +43,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
       this.windowSize = windowSize;
    }
 
-   public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon)
+   public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon, SessionContext context)
    {
       if (windowSize == -1)
       {
@@ -84,7 +85,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
          // while this is still sending requests causing a dead lock
          if (needInit)
          {
-            credits.init();
+            credits.init(context);
          }
 
          return credits;
@@ -202,7 +203,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
          return false;
       }
 
-      public void init()
+      public void init(SessionContext ctx)
       {
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCredits.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCredits.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCredits.java
index a1eeafc..f4398f1 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCredits.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCredits.java
@@ -13,6 +13,7 @@
 package org.hornetq.core.client.impl;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.spi.core.remoting.SessionContext;
 
 /**
  * A ClientProducerCredits
@@ -31,7 +32,7 @@ public interface ClientProducerCredits
 
    boolean isBlocked();
 
-   void init();
+   void init(SessionContext sessionContext);
 
    void reset();
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
index 4deb64d..3290f27 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
@@ -16,6 +16,7 @@ import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.client.HornetQClientLogger;
 import org.hornetq.core.client.HornetQClientMessageBundle;
+import org.hornetq.spi.core.remoting.SessionContext;
 
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -47,6 +48,8 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits
 
    private boolean serverRespondedWithFail;
 
+   private SessionContext sessionContext;
+
    public ClientProducerCreditsImpl(final ClientSessionInternal session,
                                     final SimpleString address,
                                     final int windowSize)
@@ -62,11 +65,15 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits
       semaphore = new Semaphore(0, false);
    }
 
-   public void init()
+   public void init(SessionContext sessionContext)
    {
       // We initial request twice as many credits as we request in subsequent requests
       // This allows the producer to keep sending as more arrive, minimising pauses
       checkCredits(windowSize);
+
+      this.sessionContext = sessionContext;
+
+      this.sessionContext.linkFlowControl(address, this);
    }
 
    public void acquireCredits(final int credits) throws InterruptedException, HornetQException

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java
index 71e6b8f..ac12955 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java
@@ -326,7 +326,7 @@ public class ClientProducerImpl implements ClientProducerInternal
          throw new HornetQInterruptedException(e);
       }
 
-      sessionContext.sendFullMessage(msgI, sendBlocking, handler);
+      sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
    }
 
    private void checkClosed() throws HornetQException

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
index c9736d5..8660a2d 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
@@ -45,8 +44,6 @@ import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.client.HornetQClientLogger;
 import org.hornetq.core.client.HornetQClientMessageBundle;
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
-import org.hornetq.core.protocol.core.impl.HornetQClientProtocolManager;
-import org.hornetq.core.protocol.core.impl.PacketDecoder;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -56,7 +53,7 @@ import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.hornetq.spi.core.remoting.Connector;
 import org.hornetq.spi.core.remoting.ConnectorFactory;
-import org.hornetq.spi.core.remoting.ProtocolResponseHandler;
+import org.hornetq.spi.core.remoting.TopologyResponseHandler;
 import org.hornetq.spi.core.remoting.SessionContext;
 import org.hornetq.utils.ClassloadingUtil;
 import org.hornetq.utils.ConcurrentHashSet;
@@ -69,15 +66,11 @@ import org.hornetq.utils.UUIDGenerator;
 /**
  * @author Tim Fox
  * @author Clebert Suconic
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
  */
 
 public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ConnectionLifeCycleListener
 {
-
-
-   // TODO use the factory here
-   protected ClientProtocolManager clientProtocolManager = new HornetQClientProtocolManager(this);
-
    // Constants
    // ------------------------------------------------------------------------------------
 
@@ -90,6 +83,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
    private final ServerLocatorInternal serverLocator;
 
+   private final ClientProtocolManager clientProtocolManager;
+
    private TransportConfiguration connectorConfig;
 
    private TransportConfiguration backupConfig;
@@ -159,7 +154,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
    private String liveNodeID;
 
-
    public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
                                    final TransportConfiguration connectorConfig,
                                    final long callTimeout,
@@ -173,18 +167,21 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
                                    final Executor threadPool,
                                    final ScheduledExecutorService scheduledThreadPool,
                                    final List<Interceptor> incomingInterceptors,
-                                   final List<Interceptor> outgoingInterceptors,
-                                   PacketDecoder packetDecoder)
+                                   final List<Interceptor> outgoingInterceptors)
    {
       createTrace = new Exception();
 
       this.serverLocator = serverLocator;
 
+      this.clientProtocolManager = serverLocator.newProtocolManager();
+
+      this.clientProtocolManager.setSessionFactory(this);
+
       this.connectorConfig = connectorConfig;
 
       connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
 
-      checkTransportKeys(connectorFactory, connectorConfig.getParams());
+      checkTransportKeys(connectorFactory, connectorConfig);
 
       this.callTimeout = callTimeout;
 
@@ -227,10 +224,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
       confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0);
 
-
-      // TODO : Get rid of this / encapsulate it through the ClientProtocolManager (create a ExchangeServerProtocol for instance)
-      ((HornetQClientProtocolManager) clientProtocolManager).replacePacketDecoder(packetDecoder);
-
    }
 
    public void disableFinalizeCheck()
@@ -281,7 +274,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
                                                            this,
                                                            closeExecutor,
                                                            threadPool,
-                                                           scheduledThreadPool);
+                                                           scheduledThreadPool,
+                                                           clientProtocolManager);
       }
 
       if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams()))
@@ -472,9 +466,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       return listeners.remove(listener);
    }
 
-   public void addFailoverListener(FailoverEventListener listener)
+   public ClientSessionFactoryImpl addFailoverListener(FailoverEventListener listener)
    {
       failoverListeners.add(listener);
+      return this;
    }
 
    public boolean removeFailoverListener(FailoverEventListener listener)
@@ -645,7 +640,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
          {
 
 
-            if (clientProtocolManager.cleanupBeforeFailover())
+            if (clientProtocolManager.cleanupBeforeFailover(me))
             {
 
 
@@ -677,7 +672,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
                connector = null;
 
-               reconnectSessions(oldConnection, reconnectAttempts);
+               reconnectSessions(oldConnection, reconnectAttempts, me);
 
                if (oldConnection != null)
                {
@@ -785,8 +780,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
                                                             orderedExecutorFactory.getExecutor(),
                                                             orderedExecutorFactory.getExecutor());
 
-      context.setSession(session);
-
       synchronized (sessions)
       {
          if (closed || !clientProtocolManager.isAlive())
@@ -859,7 +852,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
    /*
     * Re-attach sessions all pre-existing sessions to the new remoting connection
     */
-   private void reconnectSessions(final RemotingConnection oldConnection, final int reconnectAttempts)
+   private void reconnectSessions(final RemotingConnection oldConnection, final int reconnectAttempts, final HornetQException cause)
    {
       HashSet<ClientSessionInternal> sessionsToFailover;
       synchronized (sessions)
@@ -906,7 +899,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
       for (ClientSessionInternal session : sessionsToFailover)
       {
-         session.handleFailover(connection);
+         session.handleFailover(connection, cause);
       }
    }
 
@@ -1111,25 +1104,26 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
    protected void schedulePing()
    {
-      if (clientFailureCheckPeriod != -1)
+      if (pingerFuture == null)
       {
-         if (pingerFuture == null)
-         {
-            pingRunnable = new ClientSessionFactoryImpl.PingRunnable();
+         pingRunnable = new ClientSessionFactoryImpl.PingRunnable();
 
+         if (clientFailureCheckPeriod != -1)
+         {
             pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(new ClientSessionFactoryImpl.ActualScheduledPinger(pingRunnable),
                                                                       0,
                                                                       clientFailureCheckPeriod,
                                                                       TimeUnit.MILLISECONDS);
-            // To make sure the first ping will be sent
-            pingRunnable.send();
-         }
-         // send a ping every time we create a new remoting connection
-         // to set up its TTL on the server side
-         else
-         {
-            pingRunnable.run();
          }
+
+         // To make sure the first ping will be sent
+         pingRunnable.send();
+      }
+      // send a ping every time we create a new remoting connection
+      // to set up its TTL on the server side
+      else
+      {
+         pingRunnable.run();
       }
    }
 
@@ -1262,14 +1256,15 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
                                               this,
                                               closeExecutor,
                                               threadPool,
-                                              scheduledThreadPool);
+                                              scheduledThreadPool,
+                                              clientProtocolManager);
    }
 
-   private void checkTransportKeys(final ConnectorFactory factory, final Map<String, Object> params)
+   private void checkTransportKeys(final ConnectorFactory factory, final TransportConfiguration tc)
    {
-      if (params != null)
+      if (tc.getParams() != null)
       {
-         Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), params.keySet());
+         Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), tc.getParams().keySet());
 
          if (!invalid.isEmpty())
          {
@@ -1282,7 +1277,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       }
    }
 
-
    /**
     * It will connect to either live or backup accordingly to the current configurations
     * it will also switch to backup case it can't connect to live and there's a backup configured
@@ -1535,7 +1529,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       RemotingConnection newConnection = clientProtocolManager.connect(transportConnection, callTimeout,
                                                                        callFailoverTimeout, incomingInterceptors,
                                                                        outgoingInterceptors,
-                                                                       new SessionFactoryProtocolHandler());
+                                                                       new SessionFactoryTopologyHandler());
 
       newConnection.addFailureListener(new DelegatingFailureListener(newConnection.getID()));
 
@@ -1566,8 +1560,13 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       }
    }
 
+   @Override
+   public String getLiveNodeId()
+   {
+      return liveNodeID;
+   }
 
-   class SessionFactoryProtocolHandler implements ProtocolResponseHandler
+   class SessionFactoryTopologyHandler implements TopologyResponseHandler
    {
 
       @Override
@@ -1607,5 +1606,4 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
          serverLocator.notifyNodeDown(eventTime, nodeID);
       }
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
index 43afd66..e09823d 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
@@ -24,6 +24,7 @@ import org.hornetq.utils.ConfirmationWindowWarning;
  * A ClientSessionFactoryInternal
  *
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
  *
  */
 public interface ClientSessionFactoryInternal extends ClientSessionFactory
@@ -36,6 +37,8 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory
 
    void disableFinalizeCheck();
 
+   String getLiveNodeId();
+
    // for testing
 
    int numConnections();
@@ -56,6 +59,5 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory
 
    ConfirmationWindowWarning getConfirmationWindowWarning();
 
-
    Lock lockFailover();
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
index 3975223..197d80c 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
@@ -40,6 +40,7 @@ import org.hornetq.core.client.HornetQClientLogger;
 import org.hornetq.core.client.HornetQClientMessageBundle;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.ConsumerContext;
 import org.hornetq.spi.core.remoting.SessionContext;
 import org.hornetq.utils.ConfirmationWindowWarning;
 import org.hornetq.utils.TokenBucketLimiterImpl;
@@ -51,6 +52,7 @@ import org.hornetq.utils.XidCodecSupport;
  * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
  * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
  */
 public final class ClientSessionImpl implements ClientSessionInternal, FailureListener
 {
@@ -77,7 +79,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
    private final Set<ClientProducerInternal> producers = new HashSet<ClientProducerInternal>();
 
    // Consumers must be an ordered map so if we fail we recreate them in the same order with the same ids
-   private final Map<Long, ClientConsumerInternal> consumers = new LinkedHashMap<Long, ClientConsumerInternal>();
+   private final Map<ConsumerContext, ClientConsumerInternal> consumers = new LinkedHashMap<ConsumerContext, ClientConsumerInternal>();
 
    private volatile boolean closed;
 
@@ -223,6 +225,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
       this.sessionContext = sessionContext;
 
+      sessionContext.setSession(this);
+
       confirmationWindowWarning = sessionFactory.getConfirmationWindowWarning();
    }
 
@@ -652,7 +656,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       }
    }
 
-   public void start() throws HornetQException
+   public ClientSessionImpl start() throws HornetQException
    {
       checkClosed();
 
@@ -667,6 +671,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
          started = true;
       }
+
+      return this;
    }
 
    public void stop() throws HornetQException
@@ -721,6 +727,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       return inClose;
    }
 
+   @Override
+   public String getNodeId()
+   {
+      return sessionFactory.getLiveNodeId();
+   }
+
    // ClientSessionInternal implementation
    // ------------------------------------------------------------
 
@@ -812,7 +824,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
    {
       synchronized (consumers)
       {
-         consumers.put(consumer.getID(), consumer);
+         consumers.put(consumer.getConsumerContext(), consumer);
       }
    }
 
@@ -828,7 +840,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
    {
       synchronized (consumers)
       {
-         consumers.remove(consumer.getID());
+         consumers.remove(consumer.getConsumerContext());
       }
    }
 
@@ -840,7 +852,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       }
    }
 
-   public void handleReceiveMessage(final long consumerID, final ClientMessageInternal message) throws Exception
+   public void handleReceiveMessage(final ConsumerContext consumerID, final ClientMessageInternal message) throws Exception
    {
       ClientConsumerInternal consumer = getConsumer(consumerID);
 
@@ -850,7 +862,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       }
    }
 
-   public void handleReceiveLargeMessage(final long consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception
+   public void handleReceiveLargeMessage(final ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception
    {
       ClientConsumerInternal consumer = getConsumer(consumerID);
 
@@ -860,7 +872,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       }
    }
 
-   public void handleReceiveContinuation(final long consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception
+   public void handleReceiveContinuation(final ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception
    {
       ClientConsumerInternal consumer = getConsumer(consumerID);
 
@@ -871,9 +883,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
    }
 
    @Override
-   public void handleConsumerDisconnect(long consumerID) throws HornetQException
+   public void handleConsumerDisconnect(ConsumerContext context) throws HornetQException
    {
-      final ClientConsumerInternal consumer = getConsumer(consumerID);
+      final ClientConsumerInternal consumer = getConsumer(context);
 
       if (consumer != null)
       {
@@ -944,9 +956,10 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       doCleanup(failingOver);
    }
 
-   public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler)
+   public ClientSessionImpl setSendAcknowledgementHandler(final SendAcknowledgementHandler handler)
    {
       sessionContext.setSendAcknowledgementHandler(handler);
+      return this;
    }
 
    public void preHandleFailover(RemotingConnection connection)
@@ -959,7 +972,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
    // Needs to be synchronized to prevent issues with occurring concurrently with close()
 
-   public void handleFailover(final RemotingConnection backupConnection)
+   public void handleFailover(final RemotingConnection backupConnection, HornetQException cause)
    {
       synchronized (this)
       {
@@ -1008,7 +1021,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
                                                  minLargeMessageSize, xa, autoCommitSends,
                                                  autoCommitAcks, preAcknowledge, defaultAddress);
 
-                  for (Map.Entry<Long, ClientConsumerInternal> entryx : consumers.entrySet())
+                  for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : consumers.entrySet())
                   {
 
                      ClientConsumerInternal consumerInternal = entryx.getValue();
@@ -1043,7 +1056,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
                   resetCreditManager = true;
                }
 
-               sessionContext.returnBlocking();
+               sessionContext.returnBlocking(cause);
             }
          }
          catch (Throwable t)
@@ -1137,7 +1150,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
    public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon)
    {
-      return producerCreditManager.getCredits(address, anon);
+      ClientProducerCredits credits = producerCreditManager.getCredits(address, anon, sessionContext);
+
+      return credits;
    }
 
    public void returnCredits(final SimpleString address)
@@ -1343,21 +1358,47 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
    {
       checkXA();
 
-      if (!(xares instanceof ClientSessionInternal))
+      if (forceNotSameRM)
       {
          return false;
       }
 
-      if (forceNotSameRM)
+      ClientSessionInternal other = getSessionInternalFromXAResource(xares);
+
+      if (other == null)
       {
          return false;
       }
 
-      ClientSessionInternal other = (ClientSessionInternal) xares;
+      String liveNodeId = sessionFactory.getLiveNodeId();
+      String otherLiveNodeId = ((ClientSessionFactoryInternal) other.getSessionFactory()).getLiveNodeId();
 
+      if (liveNodeId != null && otherLiveNodeId != null)
+      {
+         return liveNodeId.equals(otherLiveNodeId);
+      }
+
+      //we shouldn't get here, live node id should always be set
       return sessionFactory == other.getSessionFactory();
    }
 
+   private ClientSessionInternal getSessionInternalFromXAResource(final XAResource xares)
+   {
+      if (xares == null)
+      {
+         return null;
+      }
+      if (xares instanceof ClientSessionInternal)
+      {
+         return (ClientSessionInternal) xares;
+      }
+      else if (xares instanceof HornetQXAResource)
+      {
+         return getSessionInternalFromXAResource(((HornetQXAResource)xares).getResource());
+      }
+      return null;
+   }
+
    public int prepare(final Xid xid) throws XAException
    {
       checkXA();
@@ -1732,15 +1773,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       }
    }
 
-   /**
-    * @param consumerID
-    * @return
-    */
-   private ClientConsumerInternal getConsumer(final long consumerID)
+   private ClientConsumerInternal getConsumer(final ConsumerContext consumerContext)
    {
       synchronized (consumers)
       {
-         ClientConsumerInternal consumer = consumers.get(consumerID);
+         ClientConsumerInternal consumer = consumers.get(consumerContext);
          return consumer;
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionInternal.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionInternal.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionInternal.java
index 67ea4fa..a05e6a3 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionInternal.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionInternal.java
@@ -19,6 +19,7 @@ import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.SendAcknowledgementHandler;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.ConsumerContext;
 
 /**
  * A ClientSessionInternal
@@ -49,17 +50,17 @@ public interface ClientSessionInternal extends ClientSession
 
    void removeProducer(ClientProducerInternal producer);
 
-   void handleReceiveMessage(long consumerID, ClientMessageInternal message) throws Exception;
+   void handleReceiveMessage(ConsumerContext consumerID, ClientMessageInternal message) throws Exception;
 
-   void handleReceiveLargeMessage(long consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception;
+   void handleReceiveLargeMessage(ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception;
 
-   void handleReceiveContinuation(long consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception;
+   void handleReceiveContinuation(ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception;
 
-   void handleConsumerDisconnect(long consumerID) throws HornetQException;
+   void handleConsumerDisconnect(ConsumerContext consumerContext) throws HornetQException;
 
    void preHandleFailover(RemotingConnection connection);
 
-   void handleFailover(RemotingConnection backupConnection);
+   void handleFailover(RemotingConnection backupConnection, HornetQException cause);
 
    RemotingConnection getConnection();
 
@@ -116,4 +117,5 @@ public interface ClientSessionInternal extends ClientSession
 
    boolean isClosing();
 
+   String getNodeId();
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java
index f7e0c61..5b90f55 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java
@@ -29,6 +29,7 @@ import org.hornetq.api.core.client.SendAcknowledgementHandler;
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.client.HornetQClientLogger;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.ConsumerContext;
 import org.hornetq.utils.ConcurrentHashSet;
 
 /**
@@ -38,6 +39,7 @@ import org.hornetq.utils.ConcurrentHashSet;
  * on GC if it has not already been closed
  *
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
  */
 public class DelegatingSession implements ClientSessionInternal
 {
@@ -388,33 +390,33 @@ public class DelegatingSession implements ClientSessionInternal
       session.preHandleFailover(connection);
    }
 
-   public void handleFailover(final RemotingConnection backupConnection)
+   public void handleFailover(final RemotingConnection backupConnection, HornetQException cause)
    {
-      session.handleFailover(backupConnection);
+      session.handleFailover(backupConnection, cause);
    }
 
    @Override
-   public void handleReceiveMessage(long consumerID, ClientMessageInternal message) throws Exception
+   public void handleReceiveMessage(ConsumerContext consumerID, ClientMessageInternal message) throws Exception
    {
       session.handleReceiveMessage(consumerID, message);
    }
 
    @Override
-   public void handleReceiveLargeMessage(long consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception
+   public void handleReceiveLargeMessage(ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception
    {
       session.handleReceiveLargeMessage(consumerID, clientLargeMessage, largeMessageSize);
    }
 
    @Override
-   public void handleReceiveContinuation(long consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception
+   public void handleReceiveContinuation(ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception
    {
       session.handleReceiveContinuation(consumerID, chunk, flowControlSize, isContinues);
    }
 
    @Override
-   public void handleConsumerDisconnect(long consumerID) throws HornetQException
+   public void handleConsumerDisconnect(ConsumerContext consumerContext) throws HornetQException
    {
-      session.handleConsumerDisconnect(consumerID);
+      session.handleConsumerDisconnect(consumerContext);
    }
 
    public boolean isAutoCommitAcks()
@@ -507,9 +509,10 @@ public class DelegatingSession implements ClientSessionInternal
       session.rollback(xid);
    }
 
-   public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler)
+   public DelegatingSession setSendAcknowledgementHandler(final SendAcknowledgementHandler handler)
    {
       session.setSendAcknowledgementHandler(handler);
+      return this;
    }
 
    public boolean setTransactionTimeout(final int seconds) throws XAException
@@ -522,9 +525,10 @@ public class DelegatingSession implements ClientSessionInternal
       session.resetIfNeeded();
    }
 
-   public void start() throws HornetQException
+   public DelegatingSession start() throws HornetQException
    {
       session.start();
+      return this;
    }
 
    public void start(final Xid xid, final int flags) throws XAException
@@ -642,4 +646,10 @@ public class DelegatingSession implements ClientSessionInternal
    {
       session.scheduleConfirmation(handler, msg);
    }
+
+   @Override
+   public String getNodeId()
+   {
+      return session.getNodeId();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/HornetQXAResource.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/HornetQXAResource.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/HornetQXAResource.java
new file mode 100644
index 0000000..e278b8c
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/HornetQXAResource.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.client.impl;
+
+import javax.transaction.xa.XAResource;
+
+public interface HornetQXAResource extends XAResource
+{
+   XAResource getResource();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
index d5612a2..5aa37b8 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
@@ -53,9 +53,10 @@ import org.hornetq.core.client.HornetQClientMessageBundle;
 import org.hornetq.core.cluster.DiscoveryEntry;
 import org.hornetq.core.cluster.DiscoveryGroup;
 import org.hornetq.core.cluster.DiscoveryListener;
-import org.hornetq.core.protocol.ClientPacketDecoder;
-import org.hornetq.core.protocol.core.impl.PacketDecoder;
+import org.hornetq.core.protocol.core.impl.HornetQClientProtocolManagerFactory;
 import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.remoting.ClientProtocolManager;
+import org.hornetq.spi.core.remoting.ClientProtocolManagerFactory;
 import org.hornetq.spi.core.remoting.Connector;
 import org.hornetq.utils.ClassloadingUtil;
 import org.hornetq.utils.HornetQThreadFactory;
@@ -81,6 +82,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
 
    private static final long serialVersionUID = -1615857864410205260L;
 
+
+   // This is the default value
+   private ClientProtocolManagerFactory protocolManagerFactory = HornetQClientProtocolManagerFactory.getInstance();
+
    private final boolean ha;
 
    private boolean finalizeCheck = true;
@@ -208,12 +213,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
    * If you need to, make them transient and handle the serialization yourself
    * */
 
-
-   /*
-   * we use the client decoder by default but there are times when we want to use the server packet decoder
-   */
-   private transient PacketDecoder packetDecoder = ClientPacketDecoder.INSTANCE;
-
    private final Exception traceException = new Exception();
 
    // To be called when there are ServerLocator being finalized.
@@ -642,6 +641,28 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       }
    }
 
+
+   public ClientProtocolManager newProtocolManager()
+   {
+      return getProtocolManagerFactory().newProtocolManager();
+   }
+
+   public ClientProtocolManagerFactory getProtocolManagerFactory()
+   {
+      if (protocolManagerFactory == null)
+      {
+         // this could happen over serialization from older versions
+         protocolManagerFactory = HornetQClientProtocolManagerFactory.getInstance();
+      }
+      return protocolManagerFactory;
+   }
+
+   public void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory)
+   {
+      this.protocolManagerFactory = protocolManagerFactory;
+   }
+
+
    public void disableFinalizeCheck()
    {
       finalizeCheck = false;
@@ -675,9 +696,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return connect(true);
    }
 
-   public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
+   public ServerLocatorImpl setAfterConnectionInternalListener(AfterConnectInternalListener listener)
    {
       this.afterConnectListener = listener;
+      return this;
    }
 
    public AfterConnectInternalListener getAfterConnectInternalListener()
@@ -736,8 +758,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
                                                                           threadPool,
                                                                           scheduledThreadPool,
                                                                           incomingInterceptors,
-                                                                          outgoingInterceptors,
-                                                                          packetDecoder);
+                                                                          outgoingInterceptors);
 
       addToConnecting(factory);
       try
@@ -780,8 +801,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
                                                                           threadPool,
                                                                           scheduledThreadPool,
                                                                           incomingInterceptors,
-                                                                          outgoingInterceptors,
-                                                                          packetDecoder);
+                                                                          outgoingInterceptors);
 
       addToConnecting(factory);
       try
@@ -873,8 +893,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
                                                       threadPool,
                                                       scheduledThreadPool,
                                                       incomingInterceptors,
-                                                      outgoingInterceptors,
-                                                      packetDecoder);
+                                                      outgoingInterceptors);
                try
                {
                   addToConnecting(factory);
@@ -963,9 +982,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return cacheLargeMessagesClient;
    }
 
-   public void setCacheLargeMessagesClient(final boolean cached)
+   public ServerLocatorImpl setCacheLargeMessagesClient(final boolean cached)
    {
       cacheLargeMessagesClient = cached;
+      return this;
    }
 
    public long getClientFailureCheckPeriod()
@@ -973,10 +993,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return clientFailureCheckPeriod;
    }
 
-   public void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
+   public ServerLocatorImpl setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
    {
       checkWrite();
       this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+      return this;
    }
 
    public long getConnectionTTL()
@@ -984,10 +1005,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return connectionTTL;
    }
 
-   public void setConnectionTTL(final long connectionTTL)
+   public ServerLocatorImpl setConnectionTTL(final long connectionTTL)
    {
       checkWrite();
       this.connectionTTL = connectionTTL;
+      return this;
    }
 
    public long getCallTimeout()
@@ -995,10 +1017,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return callTimeout;
    }
 
-   public void setCallTimeout(final long callTimeout)
+   public ServerLocatorImpl setCallTimeout(final long callTimeout)
    {
       checkWrite();
       this.callTimeout = callTimeout;
+      return this;
    }
 
    public long getCallFailoverTimeout()
@@ -1006,10 +1029,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return callFailoverTimeout;
    }
 
-   public void setCallFailoverTimeout(long callFailoverTimeout)
+   public ServerLocatorImpl setCallFailoverTimeout(long callFailoverTimeout)
    {
       checkWrite();
       this.callFailoverTimeout = callFailoverTimeout;
+      return this;
    }
 
    public int getMinLargeMessageSize()
@@ -1017,10 +1041,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return minLargeMessageSize;
    }
 
-   public void setMinLargeMessageSize(final int minLargeMessageSize)
+   public ServerLocatorImpl setMinLargeMessageSize(final int minLargeMessageSize)
    {
       checkWrite();
       this.minLargeMessageSize = minLargeMessageSize;
+      return this;
    }
 
    public int getConsumerWindowSize()
@@ -1028,10 +1053,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return consumerWindowSize;
    }
 
-   public void setConsumerWindowSize(final int consumerWindowSize)
+   public ServerLocatorImpl setConsumerWindowSize(final int consumerWindowSize)
    {
       checkWrite();
       this.consumerWindowSize = consumerWindowSize;
+      return this;
    }
 
    public int getConsumerMaxRate()
@@ -1039,10 +1065,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return consumerMaxRate;
    }
 
-   public void setConsumerMaxRate(final int consumerMaxRate)
+   public ServerLocatorImpl setConsumerMaxRate(final int consumerMaxRate)
    {
       checkWrite();
       this.consumerMaxRate = consumerMaxRate;
+      return this;
    }
 
    public int getConfirmationWindowSize()
@@ -1050,10 +1077,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return confirmationWindowSize;
    }
 
-   public void setConfirmationWindowSize(final int confirmationWindowSize)
+   public ServerLocatorImpl setConfirmationWindowSize(final int confirmationWindowSize)
    {
       checkWrite();
       this.confirmationWindowSize = confirmationWindowSize;
+      return this;
    }
 
    public int getProducerWindowSize()
@@ -1061,10 +1089,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return producerWindowSize;
    }
 
-   public void setProducerWindowSize(final int producerWindowSize)
+   public ServerLocatorImpl setProducerWindowSize(final int producerWindowSize)
    {
       checkWrite();
       this.producerWindowSize = producerWindowSize;
+      return this;
    }
 
    public int getProducerMaxRate()
@@ -1072,10 +1101,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return producerMaxRate;
    }
 
-   public void setProducerMaxRate(final int producerMaxRate)
+   public ServerLocatorImpl setProducerMaxRate(final int producerMaxRate)
    {
       checkWrite();
       this.producerMaxRate = producerMaxRate;
+      return this;
    }
 
    public boolean isBlockOnAcknowledge()
@@ -1083,10 +1113,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return blockOnAcknowledge;
    }
 
-   public void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+   public ServerLocatorImpl setBlockOnAcknowledge(final boolean blockOnAcknowledge)
    {
       checkWrite();
       this.blockOnAcknowledge = blockOnAcknowledge;
+      return this;
    }
 
    public boolean isBlockOnDurableSend()
@@ -1094,10 +1125,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return blockOnDurableSend;
    }
 
-   public void setBlockOnDurableSend(final boolean blockOnDurableSend)
+   public ServerLocatorImpl setBlockOnDurableSend(final boolean blockOnDurableSend)
    {
       checkWrite();
       this.blockOnDurableSend = blockOnDurableSend;
+      return this;
    }
 
    public boolean isBlockOnNonDurableSend()
@@ -1105,10 +1137,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return blockOnNonDurableSend;
    }
 
-   public void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
+   public ServerLocatorImpl setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
    {
       checkWrite();
       this.blockOnNonDurableSend = blockOnNonDurableSend;
+      return this;
    }
 
    public boolean isAutoGroup()
@@ -1116,10 +1149,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return autoGroup;
    }
 
-   public void setAutoGroup(final boolean autoGroup)
+   public ServerLocatorImpl setAutoGroup(final boolean autoGroup)
    {
       checkWrite();
       this.autoGroup = autoGroup;
+      return this;
    }
 
    public boolean isPreAcknowledge()
@@ -1127,10 +1161,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return preAcknowledge;
    }
 
-   public void setPreAcknowledge(final boolean preAcknowledge)
+   public ServerLocatorImpl setPreAcknowledge(final boolean preAcknowledge)
    {
       checkWrite();
       this.preAcknowledge = preAcknowledge;
+      return this;
    }
 
    public int getAckBatchSize()
@@ -1138,10 +1173,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return ackBatchSize;
    }
 
-   public void setAckBatchSize(final int ackBatchSize)
+   public ServerLocatorImpl setAckBatchSize(final int ackBatchSize)
    {
       checkWrite();
       this.ackBatchSize = ackBatchSize;
+      return this;
    }
 
    public boolean isUseGlobalPools()
@@ -1149,10 +1185,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return useGlobalPools;
    }
 
-   public void setUseGlobalPools(final boolean useGlobalPools)
+   public ServerLocatorImpl setUseGlobalPools(final boolean useGlobalPools)
    {
       checkWrite();
       this.useGlobalPools = useGlobalPools;
+      return this;
    }
 
    public int getScheduledThreadPoolMaxSize()
@@ -1160,10 +1197,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return scheduledThreadPoolMaxSize;
    }
 
-   public void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
+   public ServerLocatorImpl setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
    {
       checkWrite();
       this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
+      return this;
    }
 
    public int getThreadPoolMaxSize()
@@ -1171,10 +1209,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return threadPoolMaxSize;
    }
 
-   public void setThreadPoolMaxSize(final int threadPoolMaxSize)
+   public ServerLocatorImpl setThreadPoolMaxSize(final int threadPoolMaxSize)
    {
       checkWrite();
       this.threadPoolMaxSize = threadPoolMaxSize;
+      return this;
    }
 
    public long getRetryInterval()
@@ -1182,10 +1221,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return retryInterval;
    }
 
-   public void setRetryInterval(final long retryInterval)
+   public ServerLocatorImpl setRetryInterval(final long retryInterval)
    {
       checkWrite();
       this.retryInterval = retryInterval;
+      return this;
    }
 
    public long getMaxRetryInterval()
@@ -1193,10 +1233,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return maxRetryInterval;
    }
 
-   public void setMaxRetryInterval(final long retryInterval)
+   public ServerLocatorImpl setMaxRetryInterval(final long retryInterval)
    {
       checkWrite();
       maxRetryInterval = retryInterval;
+      return this;
    }
 
    public double getRetryIntervalMultiplier()
@@ -1204,10 +1245,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return retryIntervalMultiplier;
    }
 
-   public void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
+   public ServerLocatorImpl setRetryIntervalMultiplier(final double retryIntervalMultiplier)
    {
       checkWrite();
       this.retryIntervalMultiplier = retryIntervalMultiplier;
+      return this;
    }
 
    public int getReconnectAttempts()
@@ -1215,16 +1257,18 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return reconnectAttempts;
    }
 
-   public void setReconnectAttempts(final int reconnectAttempts)
+   public ServerLocatorImpl setReconnectAttempts(final int reconnectAttempts)
    {
       checkWrite();
       this.reconnectAttempts = reconnectAttempts;
+      return this;
    }
 
-   public void setInitialConnectAttempts(int initialConnectAttempts)
+   public ServerLocatorImpl setInitialConnectAttempts(int initialConnectAttempts)
    {
       checkWrite();
       this.initialConnectAttempts = initialConnectAttempts;
+      return this;
    }
 
    public int getInitialConnectAttempts()
@@ -1237,10 +1281,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return this.failoverOnInitialConnection;
    }
 
-   public void setFailoverOnInitialConnection(final boolean failover)
+   public ServerLocatorImpl setFailoverOnInitialConnection(final boolean failover)
    {
       checkWrite();
       this.failoverOnInitialConnection = failover;
+      return this;
    }
 
    public String getConnectionLoadBalancingPolicyClassName()
@@ -1248,10 +1293,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return connectionLoadBalancingPolicyClassName;
    }
 
-   public void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
+   public ServerLocatorImpl setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
    {
       checkWrite();
       connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
+      return this;
    }
 
    public TransportConfiguration[] getStaticTransportConfigurations()
@@ -1272,14 +1318,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       addIncomingInterceptor(interceptor);
    }
 
-   public void addIncomingInterceptor(final Interceptor interceptor)
+   public ServerLocatorImpl addIncomingInterceptor(final Interceptor interceptor)
    {
       incomingInterceptors.add(interceptor);
+      return this;
    }
 
-   public void addOutgoingInterceptor(final Interceptor interceptor)
+   public ServerLocatorImpl addOutgoingInterceptor(final Interceptor interceptor)
    {
       outgoingInterceptors.add(interceptor);
+      return this;
    }
 
    @Override
@@ -1304,16 +1352,18 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return initialMessagePacketSize;
    }
 
-   public void setInitialMessagePacketSize(final int size)
+   public ServerLocatorImpl setInitialMessagePacketSize(final int size)
    {
       checkWrite();
       initialMessagePacketSize = size;
+      return this;
    }
 
-   public void setGroupID(final String groupID)
+   public ServerLocatorImpl setGroupID(final String groupID)
    {
       checkWrite();
       this.groupID = groupID;
+      return this;
    }
 
    public String getGroupID()
@@ -1326,9 +1376,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return compressLargeMessage;
    }
 
-   public void setCompressLargeMessage(boolean avoid)
+   public ServerLocatorImpl setCompressLargeMessage(boolean avoid)
    {
       this.compressLargeMessage = avoid;
+      return this;
    }
 
    private void checkWrite()
@@ -1348,14 +1399,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return initialConnectors.length;
    }
 
-   public void setIdentity(String identity)
+   public ServerLocatorImpl setIdentity(String identity)
    {
       this.identity = identity;
+      return this;
    }
 
-   public void setNodeID(String nodeID)
+   public ServerLocatorImpl setNodeID(String nodeID)
    {
       this.nodeID = nodeID;
+      return this;
    }
 
    public String getNodeID()
@@ -1363,9 +1416,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return nodeID;
    }
 
-   public void setClusterConnection(boolean clusterConnection)
+   public ServerLocatorImpl setClusterConnection(boolean clusterConnection)
    {
       this.clusterConnection = clusterConnection;
+      return this;
    }
 
    public boolean isClusterConnection()
@@ -1378,9 +1432,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       return clusterTransportConfiguration;
    }
 
-   public void setClusterTransportConfiguration(TransportConfiguration tc)
+   public ServerLocatorImpl setClusterTransportConfiguration(TransportConfiguration tc)
    {
       this.clusterTransportConfiguration = tc;
+      return this;
    }
 
    @Override
@@ -1742,20 +1797,15 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
    }
 
    @Override
-   public void setPacketDecoder(PacketDecoder packetDecoder)
-   {
-      this.packetDecoder = packetDecoder;
-   }
-
-   @Override
    public boolean isConnectable()
    {
       return getNumInitialConnectors() > 0 || getDiscoveryGroupConfiguration() != null;
    }
 
-   public void addClusterTopologyListener(final ClusterTopologyListener listener)
+   public ServerLocatorImpl addClusterTopologyListener(final ClusterTopologyListener listener)
    {
       topology.addClusterTopologyListener(listener);
+      return this;
    }
 
    public void removeClusterTopologyListener(final ClusterTopologyListener listener)
@@ -1802,8 +1852,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       {
          topologyArrayGuard = new String();
       }
-      //is transient so need to create, for compatibility issues
-      packetDecoder = ClientPacketDecoder.INSTANCE;
    }
 
    private final class StaticConnector implements Serializable
@@ -1951,8 +1999,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
                                                                                    threadPool,
                                                                                    scheduledThreadPool,
                                                                                    incomingInterceptors,
-                                                                                   outgoingInterceptors,
-                                                                                   packetDecoder);
+                                                                                   outgoingInterceptors);
 
                factory.disableFinalizeCheck();
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
index 40c117b..4d8c258 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
@@ -18,8 +18,8 @@ import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.protocol.core.impl.PacketDecoder;
 import org.hornetq.api.core.Pair;
+import org.hornetq.spi.core.remoting.ClientProtocolManager;
 
 /**
  * A ServerLocatorInternal
@@ -36,14 +36,14 @@ public interface ServerLocatorInternal extends ServerLocator
 
    AfterConnectInternalListener getAfterConnectInternalListener();
 
-   void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
+   ServerLocatorInternal setAfterConnectionInternalListener(AfterConnectInternalListener listener);
 
    /** Used to better identify Cluster Connection Locators on logs. To facilitate eventual debugging.
     *
     *  This method used to be on tests interface, but I'm now making it part of the public interface since*/
-   void setIdentity(String identity);
+   ServerLocatorInternal setIdentity(String identity);
 
-   void setNodeID(String nodeID);
+   ServerLocatorInternal setNodeID(String nodeID);
 
    String getNodeID();
 
@@ -70,17 +70,17 @@ public interface ServerLocatorInternal extends ServerLocator
     */
    void notifyNodeDown(long uniqueEventID, String nodeID);
 
-   void setClusterConnection(boolean clusterConnection);
+   ServerLocatorInternal setClusterConnection(boolean clusterConnection);
 
    boolean isClusterConnection();
 
    TransportConfiguration getClusterTransportConfiguration();
 
-   void setClusterTransportConfiguration(TransportConfiguration tc);
+   ServerLocatorInternal setClusterTransportConfiguration(TransportConfiguration tc);
 
    Topology getTopology();
 
-   void setPacketDecoder(PacketDecoder instance);
+   ClientProtocolManager newProtocolManager();
 
    boolean isConnectable();
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/Topology.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/Topology.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/Topology.java
index 898d194..22f6b90 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/Topology.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/Topology.java
@@ -156,8 +156,11 @@ public final class Topology implements Serializable
          TopologyMemberImpl currentMember = getMember(nodeId);
          if (currentMember == null)
          {
-            HornetQClientLogger.LOGGER.debug("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput,
-                     new Exception("trace"));
+            if (HornetQClientLogger.LOGGER.isTraceEnabled())
+            {
+               HornetQClientLogger.LOGGER.trace("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput,
+                                                new Exception("trace"));
+            }
 
             currentMember = memberInput;
             topology.put(nodeId, currentMember);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/cluster/DiscoveryGroup.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/cluster/DiscoveryGroup.java b/hornetq-core-client/src/main/java/org/hornetq/core/cluster/DiscoveryGroup.java
index c0981a3..43beff4 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/cluster/DiscoveryGroup.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/cluster/DiscoveryGroup.java
@@ -26,7 +26,7 @@ import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQInterruptedException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.core.client.HornetQClientLogger;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.management.Notification;
@@ -118,7 +118,7 @@ public final class DiscoveryGroup implements HornetQComponent
 
          props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
 
-         Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STARTED, props);
+         Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STARTED, props);
 
          notificationService.sendNotification(notification);
       }
@@ -170,7 +170,7 @@ public final class DiscoveryGroup implements HornetQComponent
       {
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
-         Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STOPPED, props);
+         Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STOPPED, props);
          try
          {
             notificationService.sendNotification(notification);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageImpl.java
index cc9a390..057dcf6 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageImpl.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageImpl.java
@@ -26,6 +26,7 @@ import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
 import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.utils.ByteUtil;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.TypedProperties;
 import org.hornetq.utils.UUID;
@@ -45,7 +46,9 @@ public abstract class MessageImpl implements MessageInternal
 {
    public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_HQ_ROUTE_TO");
 
-   public static final SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_HQ_ROUTE_TO_ACK");
+   public static final SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_HQ_SCALEDOWN_TO");
+
+   public static final SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_HQ_ACK_ROUTE_TO");
 
    // used by the bridges to set duplicates
    public static final SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_HQ_BRIDGE_DUP");
@@ -268,6 +271,20 @@ public abstract class MessageImpl implements MessageInternal
       return bodyBuffer;
    }
 
+   public Message writeBodyBufferBytes(byte[] bytes)
+   {
+      getBodyBuffer().writeBytes(bytes);
+
+      return this;
+   }
+
+   public Message writeBodyBufferString(String string)
+   {
+      getBodyBuffer().writeString(string);
+
+      return this;
+   }
+
    public void checkCompletion() throws HornetQException
    {
       // no op on regular messages
@@ -295,9 +312,10 @@ public abstract class MessageImpl implements MessageInternal
       return userID;
    }
 
-   public void setUserID(final UUID userID)
+   public MessageImpl setUserID(final UUID userID)
    {
       this.userID = userID;
+      return this;
    }
 
    /**
@@ -345,7 +363,7 @@ public abstract class MessageImpl implements MessageInternal
       return durable;
    }
 
-   public void setDurable(final boolean durable)
+   public MessageImpl setDurable(final boolean durable)
    {
       if (this.durable != durable)
       {
@@ -353,6 +371,7 @@ public abstract class MessageImpl implements MessageInternal
 
          bufferValid = false;
       }
+      return this;
    }
 
    public long getExpiration()
@@ -360,7 +379,7 @@ public abstract class MessageImpl implements MessageInternal
       return expiration;
    }
 
-   public void setExpiration(final long expiration)
+   public MessageImpl setExpiration(final long expiration)
    {
       if (this.expiration != expiration)
       {
@@ -368,6 +387,7 @@ public abstract class MessageImpl implements MessageInternal
 
          bufferValid = false;
       }
+      return this;
    }
 
    public long getTimestamp()
@@ -375,7 +395,7 @@ public abstract class MessageImpl implements MessageInternal
       return timestamp;
    }
 
-   public void setTimestamp(final long timestamp)
+   public MessageImpl setTimestamp(final long timestamp)
    {
       if (this.timestamp != timestamp)
       {
@@ -383,6 +403,7 @@ public abstract class MessageImpl implements MessageInternal
 
          bufferValid = false;
       }
+      return this;
    }
 
    public byte getPriority()
@@ -390,7 +411,7 @@ public abstract class MessageImpl implements MessageInternal
       return priority;
    }
 
-   public void setPriority(final byte priority)
+   public MessageImpl setPriority(final byte priority)
    {
       if (this.priority != priority)
       {
@@ -398,6 +419,7 @@ public abstract class MessageImpl implements MessageInternal
 
          bufferValid = false;
       }
+      return this;
    }
 
    public boolean isExpired()
@@ -924,6 +946,37 @@ public abstract class MessageImpl implements MessageInternal
       return false;
    }
 
+   /**
+    * Debug Helper!!!!
+    *
+    * I'm leaving this message here without any callers for a reason:
+    * During debugs it's important eventually to identify what's on the bodies, and this method will give you a good idea about them.
+    * Add the message.bodyToString() to the Watch variables on the debugger view and this will show up like a charm!!!
+    * @return
+    */
+   public String bodyToString()
+   {
+      getEndOfBodyPosition();
+      int readerIndex1 = this.buffer.readerIndex();
+      buffer.readerIndex(0);
+      byte[] buffer1 = new byte[buffer.writerIndex()];
+      buffer.readBytes(buffer1);
+      buffer.readerIndex(readerIndex1);
+
+      byte[] buffer2 = null;
+      if (bodyBuffer != null)
+      {
+         int readerIndex2 = this.bodyBuffer.readerIndex();
+         bodyBuffer.readerIndex(0);
+         buffer2 = new byte[bodyBuffer.writerIndex() - bodyBuffer.readerIndex()];
+         bodyBuffer.readBytes(buffer2);
+         bodyBuffer.readerIndex(readerIndex2);
+      }
+
+      return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[" + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1);
+   }
+
+
 
 
    @Override
@@ -961,6 +1014,10 @@ public abstract class MessageImpl implements MessageInternal
 
          int bodySize = getEndOfBodyPosition();
 
+         // Clebert: I've started sending this on encoding due to conversions between protocols
+         //          and making sure we are not losing the buffer start position between protocols
+         this.endOfBodyPosition = bodySize;
+
          // write it
          buffer.setInt(BUFFER_HEADER_SPACE, bodySize);
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageInternal.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageInternal.java b/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageInternal.java
index b49aff7..f84cf0a 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageInternal.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageInternal.java
@@ -26,8 +26,6 @@ import org.hornetq.utils.TypedProperties;
  *
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
  *
- * TODO - this can be refactored further to separate out large message specific stuff
- *
  *
  */
 public interface MessageInternal extends Message

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/Channel.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/Channel.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/Channel.java
index 86bccd6..fd043fe 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/Channel.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/Channel.java
@@ -136,6 +136,11 @@ public interface Channel
    void returnBlocking();
 
    /**
+    * forces any {@link org.hornetq.core.protocol.core.Channel#sendBlocking(Packet, byte)} request to return with an exception.
+    */
+   void returnBlocking(Throwable cause);
+
+   /**
     * returns the channel lock
     *
     * @return the lock

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
index 89303ef..0a6202a 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
@@ -184,11 +184,16 @@ public final class ChannelImpl implements Channel
 
    public void returnBlocking()
    {
+      returnBlocking(null);
+   }
+
+   public void returnBlocking(Throwable cause)
+   {
       lock.lock();
 
       try
       {
-         response = new HornetQExceptionMessage(HornetQClientMessageBundle.BUNDLE.unblockingACall());
+         response = new HornetQExceptionMessage(HornetQClientMessageBundle.BUNDLE.unblockingACall(cause));
 
          sendCondition.signal();
       }


Mime
View raw message