activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [39/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:42:09 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerImpl.java
new file mode 100644
index 0000000..12dc322
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerImpl.java
@@ -0,0 +1,607 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQBuffers;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQInterruptedException;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+import org.apache.activemq6.core.message.BodyEncoder;
+import org.apache.activemq6.core.message.impl.MessageInternal;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.apache.activemq6.spi.core.remoting.SessionContext;
+import org.apache.activemq6.utils.DeflaterReader;
+import org.apache.activemq6.utils.HornetQBufferInputStream;
+import org.apache.activemq6.utils.TokenBucketLimiter;
+import org.apache.activemq6.utils.UUIDGenerator;
+
+/**
+ * The client-side Producer.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ */
+public class ClientProducerImpl implements ClientProducerInternal
+{
+   private final SimpleString address;
+
+   private final ClientSessionInternal session;
+
+   private final SessionContext sessionContext;
+
+   private volatile boolean closed;
+
+   // For rate throttling
+
+   private final TokenBucketLimiter rateLimiter;
+
+   private final boolean blockOnNonDurableSend;
+
+   private final boolean blockOnDurableSend;
+
+   private final SimpleString groupID;
+
+   private final int minLargeMessageSize;
+
+   private final ClientProducerCredits producerCredits;
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ClientProducerImpl(final ClientSessionInternal session,
+                             final SimpleString address,
+                             final TokenBucketLimiter rateLimiter,
+                             final boolean blockOnNonDurableSend,
+                             final boolean blockOnDurableSend,
+                             final boolean autoGroup,
+                             final SimpleString groupID,
+                             final int minLargeMessageSize,
+                             final SessionContext sessionContext)
+   {
+      this.sessionContext = sessionContext;
+
+      this.session = session;
+
+      this.address = address;
+
+      this.rateLimiter = rateLimiter;
+
+      this.blockOnNonDurableSend = blockOnNonDurableSend;
+
+      this.blockOnDurableSend = blockOnDurableSend;
+
+      if (autoGroup)
+      {
+         this.groupID = UUIDGenerator.getInstance().generateSimpleStringUUID();
+      }
+      else
+      {
+         this.groupID = groupID;
+      }
+
+      this.minLargeMessageSize = minLargeMessageSize;
+
+      if (address != null)
+      {
+         producerCredits = session.getCredits(address, false);
+      }
+      else
+      {
+         producerCredits = null;
+      }
+   }
+
+   // ClientProducer implementation ----------------------------------------------------------------
+
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   public void send(final Message msg) throws HornetQException
+   {
+      checkClosed();
+
+      doSend(null, msg, null, false);
+   }
+
+   public void send(final SimpleString address1, final Message msg) throws HornetQException
+   {
+      checkClosed();
+
+      doSend(address1, msg, null, false);
+   }
+
+   public void send(final String address1, final Message message) throws HornetQException
+   {
+      send(SimpleString.toSimpleString(address1), message);
+   }
+
+   @Override
+   public void send(SimpleString address1, Message message, SendAcknowledgementHandler handler) throws HornetQException
+   {
+      checkClosed();
+      boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled();
+      if (confirmationWindowEnabled)
+      {
+         doSend(address1, message, handler, true);
+      }
+      else
+      {
+         doSend(address1, message, null, true);
+         if (handler != null)
+         {
+            session.scheduleConfirmation(handler, message);
+         }
+      }
+   }
+
+   @Override
+   public void send(Message message, SendAcknowledgementHandler handler) throws HornetQException
+   {
+      send(null, message, handler);
+   }
+
+   public synchronized void close() throws HornetQException
+   {
+      if (closed)
+      {
+         return;
+      }
+
+      doCleanup();
+   }
+
+   public void cleanUp()
+   {
+      if (closed)
+      {
+         return;
+      }
+
+      doCleanup();
+   }
+
+   public boolean isClosed()
+   {
+      return closed;
+   }
+
+   public boolean isBlockOnDurableSend()
+   {
+      return blockOnDurableSend;
+   }
+
+   public boolean isBlockOnNonDurableSend()
+   {
+      return blockOnNonDurableSend;
+   }
+
+   public int getMaxRate()
+   {
+      return rateLimiter == null ? -1 : rateLimiter.getRate();
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public ClientProducerCredits getProducerCredits()
+   {
+      return producerCredits;
+   }
+
+   private void doCleanup()
+   {
+      if (address != null)
+      {
+         session.returnCredits(address);
+      }
+
+      session.removeProducer(this);
+
+      closed = true;
+   }
+
+   private void doSend(final SimpleString address1, final Message msg, final SendAcknowledgementHandler handler,
+                       final boolean forceAsync) throws HornetQException
+   {
+      session.startCall();
+
+      try
+      {
+         MessageInternal msgI = (MessageInternal) msg;
+
+         ClientProducerCredits theCredits;
+
+         boolean isLarge;
+         // a note about the second check on the writerIndexSize,
+         // If it's a server's message, it means this is being done through the bridge or some special consumer on the
+         // server's on which case we can't' convert the message into large at the servers
+         if (sessionContext.supportsLargeMessage() && (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
+            msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage()))
+         {
+            isLarge = true;
+         }
+         else
+         {
+            isLarge = false;
+         }
+
+         if (address1 != null)
+         {
+            if (!isLarge)
+            {
+               session.setAddress(msg, address1);
+            }
+            else
+            {
+               msg.setAddress(address1);
+            }
+
+            // Anonymous
+            theCredits = session.getCredits(address1, true);
+         }
+         else
+         {
+            if (!isLarge)
+            {
+               session.setAddress(msg, this.address);
+            }
+            else
+            {
+               msg.setAddress(this.address);
+            }
+
+            theCredits = producerCredits;
+         }
+
+         if (rateLimiter != null)
+         {
+            // Rate flow control
+
+            rateLimiter.limit();
+         }
+
+         if (groupID != null)
+         {
+            msgI.putStringProperty(Message.HDR_GROUP_ID, groupID);
+         }
+
+         final boolean sendBlockingConfig = msgI.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
+         final boolean forceAsyncOverride = handler != null;
+         final boolean sendBlocking = sendBlockingConfig && !forceAsyncOverride;
+
+         session.workDone();
+
+         if (isLarge)
+         {
+            largeMessageSend(sendBlocking, msgI, theCredits, handler);
+         }
+         else
+         {
+            sendRegularMessage(msgI, sendBlocking, theCredits, handler);
+         }
+      }
+      finally
+      {
+         session.endCall();
+      }
+   }
+
+   private void sendRegularMessage(final MessageInternal msgI, final boolean sendBlocking, final ClientProducerCredits theCredits, final SendAcknowledgementHandler handler) throws HornetQException
+   {
+      try
+      {
+         // This will block if credits are not available
+
+         // Note, that for a large message, the encode size only includes the properties + headers
+         // Not the continuations, but this is ok since we are only interested in limiting the amount of
+         // data in *memory* and continuations go straight to the disk
+
+         int creditSize = sessionContext.getCreditsOnSendingFull(msgI);
+
+         theCredits.acquireCredits(creditSize);
+      }
+      catch (InterruptedException e)
+      {
+         throw new HornetQInterruptedException(e);
+      }
+
+      sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
+   }
+
+   private void checkClosed() throws HornetQException
+   {
+      if (closed)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.producerClosed();
+      }
+   }
+
+   // Methods to send Large Messages----------------------------------------------------------------
+
+   /**
+    * @param msgI
+    * @param handler
+    * @throws HornetQException
+    */
+   private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI,
+                                 final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws HornetQException
+   {
+      int headerSize = msgI.getHeadersAndPropertiesEncodeSize();
+
+      if (msgI.getHeadersAndPropertiesEncodeSize() >= minLargeMessageSize)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.headerSizeTooBig(headerSize);
+      }
+
+      // msg.getBody() could be Null on LargeServerMessage
+      if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null)
+      {
+         msgI.getWholeBuffer().readerIndex(0);
+      }
+
+      InputStream input;
+
+      if (msgI.isServerMessage())
+      {
+         largeMessageSendServer(sendBlocking, msgI, credits, handler);
+      }
+      else if ((input = msgI.getBodyInputStream()) != null)
+      {
+         largeMessageSendStreamed(sendBlocking, msgI, input, credits, handler);
+      }
+      else
+      {
+         largeMessageSendBuffered(sendBlocking, msgI, credits, handler);
+      }
+   }
+
+   private void sendInitialLargeMessageHeader(MessageInternal msgI, ClientProducerCredits credits) throws HornetQException
+   {
+      int creditsUsed = sessionContext.sendInitialChunkOnLargeMessage(msgI);
+
+      // On the case of large messages we tried to send credits before but we would starve otherwise
+      // we may find a way to improve the logic and always acquire the credits before
+      // but that's the way it's been tested and been working ATM
+      try
+      {
+         credits.acquireCredits(creditsUsed);
+      }
+      catch (InterruptedException e)
+      {
+         throw new HornetQInterruptedException(e);
+      }
+   }
+
+   /**
+    * Used to send serverMessages through the bridges. No need to validate compression here since
+    * the message is only compressed at the client
+    *
+    * @param sendBlocking
+    * @param msgI
+    * @param handler
+    * @throws HornetQException
+    */
+   private void largeMessageSendServer(final boolean sendBlocking, final MessageInternal msgI,
+                                       final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws HornetQException
+   {
+      sendInitialLargeMessageHeader(msgI, credits);
+
+      BodyEncoder context = msgI.getBodyEncoder();
+
+      final long bodySize = context.getLargeBodySize();
+
+      context.open();
+      try
+      {
+
+         for (int pos = 0; pos < bodySize; )
+         {
+            final boolean lastChunk;
+
+            final int chunkLength = Math.min((int) (bodySize - pos), minLargeMessageSize);
+
+            final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength);
+
+            context.encode(bodyBuffer, chunkLength);
+
+            pos += chunkLength;
+
+            lastChunk = pos >= bodySize;
+            SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
+
+            int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
+
+            try
+            {
+               credits.acquireCredits(creditsUsed);
+            }
+            catch (InterruptedException e)
+            {
+               throw new HornetQInterruptedException(e);
+            }
+         }
+      }
+      finally
+      {
+         context.close();
+      }
+   }
+
+   /**
+    * @param sendBlocking
+    * @param msgI
+    * @param handler
+    * @throws HornetQException
+    */
+   private void
+   largeMessageSendBuffered(final boolean sendBlocking, final MessageInternal msgI,
+                            final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws HornetQException
+   {
+      msgI.getBodyBuffer().readerIndex(0);
+      largeMessageSendStreamed(sendBlocking, msgI, new HornetQBufferInputStream(msgI.getBodyBuffer()), credits,
+                               handler);
+   }
+
+   /**
+    * @param sendBlocking
+    * @param msgI
+    * @param inputStreamParameter
+    * @param credits
+    * @throws HornetQException
+    */
+   private void largeMessageSendStreamed(final boolean sendBlocking, final MessageInternal msgI,
+                                         final InputStream inputStreamParameter, final ClientProducerCredits credits,
+                                         SendAcknowledgementHandler handler) throws HornetQException
+   {
+      boolean lastPacket = false;
+
+      InputStream input = inputStreamParameter;
+
+      // We won't know the real size of the message since we are compressing while reading the streaming.
+      // This counter will be passed to the deflater to be updated for every byte read
+      AtomicLong messageSize = new AtomicLong();
+
+      DeflaterReader deflaterReader = null;
+
+      if (session.isCompressLargeMessages())
+      {
+         msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
+         deflaterReader = new DeflaterReader(inputStreamParameter, messageSize);
+         input = deflaterReader;
+      }
+
+      long totalSize = 0;
+
+      boolean headerSent = false;
+
+      while (!lastPacket)
+      {
+         byte[] buff = new byte[minLargeMessageSize];
+
+         int pos = 0;
+
+         do
+         {
+            int numberOfBytesRead;
+
+            int wanted = minLargeMessageSize - pos;
+
+            try
+            {
+               numberOfBytesRead = input.read(buff, pos, wanted);
+            }
+            catch (IOException e)
+            {
+               throw HornetQClientMessageBundle.BUNDLE.errorReadingBody(e);
+            }
+
+            if (numberOfBytesRead == -1)
+            {
+               lastPacket = true;
+
+               break;
+            }
+
+            pos += numberOfBytesRead;
+         }
+         while (pos < minLargeMessageSize);
+
+         totalSize += pos;
+
+         final SessionSendContinuationMessage chunk;
+
+         if (lastPacket)
+         {
+            if (!session.isCompressLargeMessages())
+            {
+               messageSize.set(totalSize);
+            }
+
+            // This is replacing the last packet by a smaller packet
+            byte[] buff2 = new byte[pos];
+
+            System.arraycopy(buff, 0, buff2, 0, pos);
+
+            buff = buff2;
+
+            // This is the case where the message is being converted as a regular message
+            if (!headerSent && session.isCompressLargeMessages() && buff2.length < minLargeMessageSize)
+            {
+               msgI.getBodyBuffer().resetReaderIndex();
+               msgI.getBodyBuffer().resetWriterIndex();
+               msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize());
+
+               msgI.getBodyBuffer().writeBytes(buff, 0, pos);
+               sendRegularMessage(msgI, sendBlocking, credits, handler);
+               return;
+            }
+            else
+            {
+               if (!headerSent)
+               {
+                  headerSent = true;
+                  sendInitialLargeMessageHeader(msgI, credits);
+               }
+               int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, handler);
+               try
+               {
+                  credits.acquireCredits(creditsSent);
+               }
+               catch (InterruptedException e)
+               {
+                  throw new HornetQInterruptedException(e);
+               }
+            }
+         }
+         else
+         {
+            if (!headerSent)
+            {
+               headerSent = true;
+               sendInitialLargeMessageHeader(msgI, credits);
+            }
+
+
+            int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, handler);
+            try
+            {
+               credits.acquireCredits(creditsSent);
+            }
+            catch (InterruptedException e)
+            {
+               throw new HornetQInterruptedException(e);
+            }
+         }
+      }
+
+      try
+      {
+         input.close();
+      }
+      catch (IOException e)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerInternal.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerInternal.java
new file mode 100644
index 0000000..056c20c
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerInternal.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+import org.apache.activemq6.api.core.client.ClientProducer;
+
+/**
+ *
+ * A ClientProducerInternal
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public interface ClientProducerInternal extends ClientProducer
+{
+   void cleanUp();
+
+   ClientProducerCredits getProducerCredits();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryImpl.java
new file mode 100644
index 0000000..eebc044
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryImpl.java
@@ -0,0 +1,1609 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.client.impl;
+
+import java.lang.ref.WeakReference;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQInterruptedException;
+import org.apache.activemq6.api.core.HornetQNotConnectedException;
+import org.apache.activemq6.api.core.Interceptor;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.client.ClientSession;
+import org.apache.activemq6.api.core.client.FailoverEventListener;
+import org.apache.activemq6.api.core.client.FailoverEventType;
+import org.apache.activemq6.api.core.client.HornetQClient;
+import org.apache.activemq6.api.core.client.ServerLocator;
+import org.apache.activemq6.api.core.client.SessionFailureListener;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+import org.apache.activemq6.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq6.core.remoting.FailureListener;
+import org.apache.activemq6.core.server.HornetQComponent;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+import org.apache.activemq6.spi.core.remoting.BufferHandler;
+import org.apache.activemq6.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq6.spi.core.remoting.Connection;
+import org.apache.activemq6.spi.core.remoting.ConnectionLifeCycleListener;
+import org.apache.activemq6.spi.core.remoting.Connector;
+import org.apache.activemq6.spi.core.remoting.ConnectorFactory;
+import org.apache.activemq6.spi.core.remoting.TopologyResponseHandler;
+import org.apache.activemq6.spi.core.remoting.SessionContext;
+import org.apache.activemq6.utils.ClassloadingUtil;
+import org.apache.activemq6.utils.ConcurrentHashSet;
+import org.apache.activemq6.utils.ConfigurationHelper;
+import org.apache.activemq6.utils.ConfirmationWindowWarning;
+import org.apache.activemq6.utils.ExecutorFactory;
+import org.apache.activemq6.utils.OrderedExecutorFactory;
+import org.apache.activemq6.utils.UUIDGenerator;
+
+/**
+ * @author Tim Fox
+ * @author Clebert Suconic
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
+ */
+
+public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ConnectionLifeCycleListener
+{
+   // Constants
+   // ------------------------------------------------------------------------------------
+
+   private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled();
+
+   private static final boolean isDebug = HornetQClientLogger.LOGGER.isDebugEnabled();
+
+   // Attributes
+   // -----------------------------------------------------------------------------------
+
+   private final ServerLocatorInternal serverLocator;
+
+   private final ClientProtocolManager clientProtocolManager;
+
+   private TransportConfiguration connectorConfig;
+
+   private TransportConfiguration backupConfig;
+
+   private ConnectorFactory connectorFactory;
+
+   private transient boolean finalizeCheck = true;
+
+   private final long callTimeout;
+
+   private final long callFailoverTimeout;
+
+   private final long clientFailureCheckPeriod;
+
+   private final long connectionTTL;
+
+   private final Set<ClientSessionInternal> sessions = new HashSet<ClientSessionInternal>();
+
+   private final Object createSessionLock = new Object();
+
+   private final Lock newFailoverLock = new ReentrantLock();
+
+
+   private final Object connectionLock = new Object();
+
+   private final ExecutorFactory orderedExecutorFactory;
+
+   private final Executor threadPool;
+
+   private final ScheduledExecutorService scheduledThreadPool;
+
+   private final Executor closeExecutor;
+
+   private RemotingConnection connection;
+
+   private final long retryInterval;
+
+   private final double retryIntervalMultiplier; // For exponential backoff
+
+   private final long maxRetryInterval;
+
+   private int reconnectAttempts;
+
+   private final Set<SessionFailureListener> listeners = new ConcurrentHashSet<SessionFailureListener>();
+
+   private final Set<FailoverEventListener> failoverListeners = new ConcurrentHashSet<FailoverEventListener>();
+
+   private Connector connector;
+
+   private Future<?> pingerFuture;
+   private PingRunnable pingRunnable;
+
+
+   private final List<Interceptor> incomingInterceptors;
+
+   private final List<Interceptor> outgoingInterceptors;
+
+   private volatile boolean stopPingingAfterOne;
+
+   private volatile boolean closed;
+
+   public final Exception createTrace;
+
+   public static final Set<CloseRunnable> CLOSE_RUNNABLES = Collections.synchronizedSet(new HashSet<CloseRunnable>());
+
+   private final ConfirmationWindowWarning confirmationWindowWarning;
+
+   private String liveNodeID;
+
+   public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
+                                   final TransportConfiguration connectorConfig,
+                                   final long callTimeout,
+                                   final long callFailoverTimeout,
+                                   final long clientFailureCheckPeriod,
+                                   final long connectionTTL,
+                                   final long retryInterval,
+                                   final double retryIntervalMultiplier,
+                                   final long maxRetryInterval,
+                                   final int reconnectAttempts,
+                                   final Executor threadPool,
+                                   final ScheduledExecutorService scheduledThreadPool,
+                                   final List<Interceptor> incomingInterceptors,
+                                   final List<Interceptor> outgoingInterceptors)
+   {
+      createTrace = new Exception();
+
+      this.serverLocator = serverLocator;
+
+      this.clientProtocolManager = serverLocator.newProtocolManager();
+
+      this.clientProtocolManager.setSessionFactory(this);
+
+      this.connectorConfig = connectorConfig;
+
+      connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
+
+      checkTransportKeys(connectorFactory, connectorConfig);
+
+      this.callTimeout = callTimeout;
+
+      this.callFailoverTimeout = callFailoverTimeout;
+
+      // HORNETQ-1314 - if this in an in-vm connection then disable connection monitoring
+      if (connectorFactory.isReliable() &&
+         clientFailureCheckPeriod == HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD &&
+         connectionTTL == HornetQClient.DEFAULT_CONNECTION_TTL)
+      {
+         this.clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD_INVM;
+         this.connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL_INVM;
+      }
+      else
+      {
+         this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+
+         this.connectionTTL = connectionTTL;
+      }
+
+      this.retryInterval = retryInterval;
+
+      this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+      this.maxRetryInterval = maxRetryInterval;
+
+      this.reconnectAttempts = reconnectAttempts;
+
+      this.scheduledThreadPool = scheduledThreadPool;
+
+      this.threadPool = threadPool;
+
+      orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
+
+      closeExecutor = orderedExecutorFactory.getExecutor();
+
+      this.incomingInterceptors = incomingInterceptors;
+
+      this.outgoingInterceptors = outgoingInterceptors;
+
+      confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0);
+
+   }
+
+   public void disableFinalizeCheck()
+   {
+      finalizeCheck = false;
+   }
+
+   public Lock lockFailover()
+   {
+      newFailoverLock.lock();
+      return newFailoverLock;
+   }
+
+   public void connect(final int initialConnectAttempts, final boolean failoverOnInitialConnection) throws HornetQException
+   {
+      // Get the connection
+      getConnectionWithRetry(initialConnectAttempts);
+
+      if (connection == null)
+      {
+         StringBuilder msg =
+            new StringBuilder("Unable to connect to server using configuration ").append(connectorConfig);
+         if (backupConfig != null)
+         {
+            msg.append(" and backup configuration ").append(backupConfig);
+         }
+         throw new HornetQNotConnectedException(msg.toString());
+      }
+
+   }
+
+   public TransportConfiguration getConnectorConfiguration()
+   {
+      return connectorConfig;
+   }
+
+   public void setBackupConnector(final TransportConfiguration live, final TransportConfiguration backUp)
+   {
+      Connector localConnector = connector;
+
+      // if the connector has never been used (i.e. the getConnection hasn't been called yet), we will need
+      // to create a connector just to validate if the parameters are ok.
+      // so this will create the instance to be used on the isEquivalent check
+      if (localConnector == null)
+      {
+         localConnector = connectorFactory.createConnector(connectorConfig.getParams(),
+                                                           new DelegatingBufferHandler(),
+                                                           this,
+                                                           closeExecutor,
+                                                           threadPool,
+                                                           scheduledThreadPool,
+                                                           clientProtocolManager);
+      }
+
+      if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams()))
+      {
+         if (ClientSessionFactoryImpl.isDebug)
+         {
+            HornetQClientLogger.LOGGER.debug("Setting up backup config = " + backUp + " for live = " + live);
+         }
+         backupConfig = backUp;
+      }
+      else
+      {
+         if (ClientSessionFactoryImpl.isDebug)
+         {
+            HornetQClientLogger.LOGGER.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live +
+                                                " / " +
+                                                backUp +
+                                                " but it didn't belong to " +
+                                                connectorConfig);
+         }
+      }
+   }
+
+   public Object getBackupConnector()
+   {
+      return backupConfig;
+   }
+
+   public ClientSession createSession(final String username,
+                                      final String password,
+                                      final boolean xa,
+                                      final boolean autoCommitSends,
+                                      final boolean autoCommitAcks,
+                                      final boolean preAcknowledge,
+                                      final int ackBatchSize) throws HornetQException
+   {
+      return createSessionInternal(username,
+                                   password,
+                                   xa,
+                                   autoCommitSends,
+                                   autoCommitAcks,
+                                   preAcknowledge,
+                                   ackBatchSize);
+   }
+
+   public ClientSession createSession(final boolean autoCommitSends,
+                                      final boolean autoCommitAcks,
+                                      final int ackBatchSize) throws HornetQException
+   {
+      return createSessionInternal(null,
+                                   null,
+                                   false,
+                                   autoCommitSends,
+                                   autoCommitAcks,
+                                   serverLocator.isPreAcknowledge(),
+                                   ackBatchSize);
+   }
+
+   public ClientSession createXASession() throws HornetQException
+   {
+      return createSessionInternal(null,
+                                   null,
+                                   true,
+                                   false,
+                                   false,
+                                   serverLocator.isPreAcknowledge(),
+                                   serverLocator.getAckBatchSize());
+   }
+
+   public ClientSession createTransactedSession() throws HornetQException
+   {
+      return createSessionInternal(null,
+                                   null,
+                                   false,
+                                   false,
+                                   false,
+                                   serverLocator.isPreAcknowledge(),
+                                   serverLocator.getAckBatchSize());
+   }
+
+   public ClientSession createSession() throws HornetQException
+   {
+      return createSessionInternal(null,
+                                   null,
+                                   false,
+                                   true,
+                                   true,
+                                   serverLocator.isPreAcknowledge(),
+                                   serverLocator.getAckBatchSize());
+   }
+
+   public ClientSession createSession(final boolean autoCommitSends, final boolean autoCommitAcks) throws HornetQException
+   {
+      return createSessionInternal(null,
+                                   null,
+                                   false,
+                                   autoCommitSends,
+                                   autoCommitAcks,
+                                   serverLocator.isPreAcknowledge(),
+                                   serverLocator.getAckBatchSize());
+   }
+
+   public ClientSession createSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks) throws HornetQException
+   {
+      return createSessionInternal(null,
+                                   null,
+                                   xa,
+                                   autoCommitSends,
+                                   autoCommitAcks,
+                                   serverLocator.isPreAcknowledge(),
+                                   serverLocator.getAckBatchSize());
+   }
+
+   public ClientSession createSession(final boolean xa,
+                                      final boolean autoCommitSends,
+                                      final boolean autoCommitAcks,
+                                      final boolean preAcknowledge) throws HornetQException
+   {
+      return createSessionInternal(null,
+                                   null,
+                                   xa,
+                                   autoCommitSends,
+                                   autoCommitAcks,
+                                   preAcknowledge,
+                                   serverLocator.getAckBatchSize());
+   }
+
+   // ConnectionLifeCycleListener implementation --------------------------------------------------
+
+   public void connectionCreated(final HornetQComponent component, final Connection connection, final String protocol)
+   {
+   }
+
+   public void connectionDestroyed(final Object connectionID)
+   {
+      // The exception has to be created in the same thread where it's being called
+      // as to avoid a different stack trace cause
+      final HornetQException ex = HornetQClientMessageBundle.BUNDLE.channelDisconnected();
+
+      // It has to use the same executor as the disconnect message is being sent through
+
+      closeExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+            handleConnectionFailure(connectionID, ex);
+         }
+      });
+
+   }
+
+   public void connectionException(final Object connectionID, final HornetQException me)
+   {
+      handleConnectionFailure(connectionID, me);
+   }
+
+   // Must be synchronized to prevent it happening concurrently with failover which can lead to
+   // inconsistencies
+   public void removeSession(final ClientSessionInternal session, final boolean failingOver)
+   {
+      synchronized (sessions)
+      {
+         sessions.remove(session);
+      }
+   }
+
+   public void connectionReadyForWrites(final Object connectionID, final boolean ready)
+   {
+   }
+
+   public synchronized int numConnections()
+   {
+      return connection != null ? 1 : 0;
+   }
+
+   public int numSessions()
+   {
+      return sessions.size();
+   }
+
+   public void addFailureListener(final SessionFailureListener listener)
+   {
+      listeners.add(listener);
+   }
+
+   public boolean removeFailureListener(final SessionFailureListener listener)
+   {
+      return listeners.remove(listener);
+   }
+
+   public ClientSessionFactoryImpl addFailoverListener(FailoverEventListener listener)
+   {
+      failoverListeners.add(listener);
+      return this;
+   }
+
+   public boolean removeFailoverListener(FailoverEventListener listener)
+   {
+      return failoverListeners.remove(listener);
+   }
+
+   public void causeExit()
+   {
+      clientProtocolManager.stop();
+   }
+
+   private void interruptConnectAndCloseAllSessions(boolean close)
+   {
+      clientProtocolManager.stop();
+
+      synchronized (createSessionLock)
+      {
+         closeCleanSessions(close);
+         closed = true;
+      }
+   }
+
+   /**
+    * @param close
+    */
+   private void closeCleanSessions(boolean close)
+   {
+      HashSet<ClientSessionInternal> sessionsToClose;
+      synchronized (sessions)
+      {
+         sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+      }
+      // work on a copied set. the session will be removed from sessions when session.close() is
+      // called
+      for (ClientSessionInternal session : sessionsToClose)
+      {
+         try
+         {
+            if (close)
+               session.close();
+            else
+               session.cleanUp(false);
+         }
+         catch (Exception e1)
+         {
+            HornetQClientLogger.LOGGER.unableToCloseSession(e1);
+         }
+      }
+      checkCloseConnection();
+   }
+
+   public void close()
+   {
+      if (closed)
+      {
+         return;
+      }
+      interruptConnectAndCloseAllSessions(true);
+
+      serverLocator.factoryClosed(this);
+   }
+
+   public void cleanup()
+   {
+      if (closed)
+      {
+         return;
+      }
+
+      interruptConnectAndCloseAllSessions(false);
+   }
+
+   public boolean isClosed()
+   {
+      return closed || serverLocator.isClosed();
+   }
+
+   @Override
+   public ServerLocator getServerLocator()
+   {
+      return serverLocator;
+   }
+
+   public void stopPingingAfterOne()
+   {
+      stopPingingAfterOne = true;
+   }
+
+   private void handleConnectionFailure(final Object connectionID, final HornetQException me)
+   {
+      handleConnectionFailure(connectionID, me, null);
+   }
+
+   private void handleConnectionFailure(final Object connectionID, final HornetQException me, String scaleDownTargetNodeID)
+   {
+      try
+      {
+         failoverOrReconnect(connectionID, me, scaleDownTargetNodeID);
+      }
+      catch (HornetQInterruptedException e1)
+      {
+         // this is just a debug, since an interrupt is an expected event (in case of a shutdown)
+         HornetQClientLogger.LOGGER.debug(e1.getMessage(), e1);
+      }
+   }
+
+   /**
+    * TODO: Maybe this belongs to HornetQClientProtocolManager
+    * @param connectionID
+    * @param me
+    */
+   private void failoverOrReconnect(final Object connectionID, final HornetQException me, String scaleDownTargetNodeID)
+   {
+      Set<ClientSessionInternal> sessionsToClose = null;
+      if (!clientProtocolManager.isAlive())
+         return;
+      Lock localFailoverLock = lockFailover();
+      try
+      {
+         if (connection == null || !connection.getID().equals(connectionID) || !clientProtocolManager.isAlive())
+         {
+            // We already failed over/reconnected - probably the first failure came in, all the connections were failed
+            // over then a async connection exception or disconnect
+            // came in for one of the already exitLoop connections, so we return true - we don't want to call the
+            // listeners again
+
+            return;
+         }
+
+         if (ClientSessionFactoryImpl.isTrace)
+         {
+            HornetQClientLogger.LOGGER.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts);
+         }
+
+         callFailoverListeners(FailoverEventType.FAILURE_DETECTED);
+         // We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages
+         callSessionFailureListeners(me, false, false, scaleDownTargetNodeID);
+
+         // Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
+         // There are either no threads executing in createSession, or one is blocking on a createSession
+         // result.
+
+         // Then interrupt the channel 1 that is blocking (could just interrupt them all)
+
+         // Then release all channel 1 locks - this allows the createSession to exit the monitor
+
+         // Then get all channel 1 locks again - this ensures the any createSession thread has executed the section and
+         // returned all its connections to the connection manager (the code to return connections to connection manager
+         // must be inside the lock
+
+         // Then perform failover
+
+         // Then release failoverLock
+
+         // The other side of the bargain - during createSession:
+         // The calling thread must get the failoverLock and get its' connections when this is
+         // locked.
+         // While this is still locked it must then get the channel1 lock
+         // It can then release the failoverLock
+         // It should catch HornetQException.INTERRUPTED in the call to channel.sendBlocking
+         // It should then return its connections, with channel 1 lock still held
+         // It can then release the channel 1 lock, and retry (which will cause locking on
+         // failoverLock
+         // until failover is complete
+
+         if (reconnectAttempts != 0)
+         {
+
+
+            if (clientProtocolManager.cleanupBeforeFailover(me))
+            {
+
+
+               // Now we absolutely know that no threads are executing in or blocked in
+               // createSession,
+               // and no
+               // more will execute it until failover is complete
+
+               // So.. do failover / reconnection
+
+               RemotingConnection oldConnection = connection;
+
+               connection = null;
+
+               Connector localConnector = connector;
+               if (localConnector != null)
+               {
+                  try
+                  {
+                     localConnector.close();
+                  }
+                  catch (Exception ignore)
+                  {
+                     // no-op
+                  }
+               }
+
+               cancelScheduledTasks();
+
+               connector = null;
+
+               reconnectSessions(oldConnection, reconnectAttempts, me);
+
+               if (oldConnection != null)
+               {
+                  oldConnection.destroy();
+               }
+
+               if (connection != null)
+               {
+                  callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED);
+               }
+            }
+         }
+         else
+         {
+            RemotingConnection connectionToDestory = connection;
+            if (connectionToDestory != null)
+            {
+               connectionToDestory.destroy();
+            }
+            connection = null;
+         }
+
+         if (connection == null)
+         {
+            synchronized (sessions)
+            {
+               sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+            }
+            callFailoverListeners(FailoverEventType.FAILOVER_FAILED);
+            callSessionFailureListeners(me, true, false);
+         }
+      }
+      finally
+      {
+         localFailoverLock.unlock();
+      }
+
+      // This needs to be outside the failover lock to prevent deadlock
+      if (connection != null)
+      {
+         callSessionFailureListeners(me, true, true);
+      }
+      if (sessionsToClose != null)
+      {
+         // If connection is null it means we didn't succeed in failing over or reconnecting
+         // so we close all the sessions, so they will throw exceptions when attempted to be used
+
+         for (ClientSessionInternal session : sessionsToClose)
+         {
+            try
+            {
+               session.cleanUp(true);
+            }
+            catch (Exception cause)
+            {
+               HornetQClientLogger.LOGGER.failedToCleanupSession(cause);
+            }
+         }
+      }
+   }
+
+   private ClientSession createSessionInternal(final String username,
+                                               final String password,
+                                               final boolean xa,
+                                               final boolean autoCommitSends,
+                                               final boolean autoCommitAcks,
+                                               final boolean preAcknowledge,
+                                               final int ackBatchSize) throws HornetQException
+   {
+      String name = UUIDGenerator.getInstance().generateStringUUID();
+
+
+      SessionContext context = createSessionChannel(name,
+                                                    username,
+                                                    password,
+                                                    xa,
+                                                    autoCommitSends,
+                                                    autoCommitAcks,
+                                                    preAcknowledge);
+
+      ClientSessionInternal session = new ClientSessionImpl(this,
+                                                            name,
+                                                            username,
+                                                            password,
+                                                            xa,
+                                                            autoCommitSends,
+                                                            autoCommitAcks,
+                                                            preAcknowledge,
+                                                            serverLocator.isBlockOnAcknowledge(),
+                                                            serverLocator.isAutoGroup(),
+                                                            ackBatchSize,
+                                                            serverLocator.getConsumerWindowSize(),
+                                                            serverLocator.getConsumerMaxRate(),
+                                                            serverLocator.getConfirmationWindowSize(),
+                                                            serverLocator.getProducerWindowSize(),
+                                                            serverLocator.getProducerMaxRate(),
+                                                            serverLocator.isBlockOnNonDurableSend(),
+                                                            serverLocator.isBlockOnDurableSend(),
+                                                            serverLocator.isCacheLargeMessagesClient(),
+                                                            serverLocator.getMinLargeMessageSize(),
+                                                            serverLocator.isCompressLargeMessage(),
+                                                            serverLocator.getInitialMessagePacketSize(),
+                                                            serverLocator.getGroupID(),
+                                                            context,
+                                                            orderedExecutorFactory.getExecutor(),
+                                                            orderedExecutorFactory.getExecutor());
+
+      synchronized (sessions)
+      {
+         if (closed || !clientProtocolManager.isAlive())
+         {
+            session.close();
+            return null;
+         }
+         sessions.add(session);
+      }
+
+      return new DelegatingSession(session);
+
+   }
+
+
+   private void callSessionFailureListeners(final HornetQException me, final boolean afterReconnect,
+                                            final boolean failedOver)
+   {
+      callSessionFailureListeners(me, afterReconnect, failedOver, null);
+   }
+
+   private void callSessionFailureListeners(final HornetQException me, final boolean afterReconnect,
+                                            final boolean failedOver, final String scaleDownTargetNodeID)
+   {
+      final List<SessionFailureListener> listenersClone = new ArrayList<SessionFailureListener>(listeners);
+
+      for (final SessionFailureListener listener : listenersClone)
+      {
+         try
+         {
+            if (afterReconnect)
+            {
+               listener.connectionFailed(me, failedOver, scaleDownTargetNodeID);
+            }
+            else
+            {
+               listener.beforeReconnect(me);
+            }
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            HornetQClientLogger.LOGGER.failedToExecuteListener(t);
+         }
+      }
+   }
+
+   private void callFailoverListeners(FailoverEventType type)
+   {
+      final List<FailoverEventListener> listenersClone = new ArrayList<>(failoverListeners);
+
+      for (final FailoverEventListener listener : listenersClone)
+      {
+         try
+         {
+            listener.failoverEvent(type);
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            HornetQClientLogger.LOGGER.failedToExecuteListener(t);
+         }
+      }
+   }
+
+   /*
+    * Re-attach sessions all pre-existing sessions to the new remoting connection
+    */
+   private void reconnectSessions(final RemotingConnection oldConnection, final int reconnectAttempts, final HornetQException cause)
+   {
+      HashSet<ClientSessionInternal> sessionsToFailover;
+      synchronized (sessions)
+      {
+         sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
+      }
+
+      for (ClientSessionInternal session : sessionsToFailover)
+      {
+         session.preHandleFailover(connection);
+      }
+
+      getConnectionWithRetry(reconnectAttempts);
+
+      if (connection == null)
+      {
+         if (!clientProtocolManager.isAlive())
+            HornetQClientLogger.LOGGER.failedToConnectToServer();
+
+         return;
+      }
+
+
+
+
+      List<FailureListener> oldListeners = oldConnection.getFailureListeners();
+
+      List<FailureListener> newListeners = new ArrayList<>(connection.getFailureListeners());
+
+      for (FailureListener listener : oldListeners)
+      {
+         // Add all apart from the old DelegatingFailureListener
+         if (listener instanceof DelegatingFailureListener == false)
+         {
+            newListeners.add(listener);
+         }
+      }
+
+      connection.setFailureListeners(newListeners);
+
+      // This used to be done inside failover
+      // it needs to be done on the protocol
+      ((CoreRemotingConnection)connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence());
+
+      for (ClientSessionInternal session : sessionsToFailover)
+      {
+         session.handleFailover(connection, cause);
+      }
+   }
+
+   private void getConnectionWithRetry(final int reconnectAttempts)
+   {
+      if (!clientProtocolManager.isAlive())
+         return;
+      if (HornetQClientLogger.LOGGER.isTraceEnabled())
+      {
+         HornetQClientLogger.LOGGER.trace("getConnectionWithRetry::" + reconnectAttempts +
+                                             " with retryInterval = " +
+                                             retryInterval +
+                                             " multiplier = " +
+                                             retryIntervalMultiplier, new Exception("trace"));
+      }
+
+      long interval = retryInterval;
+
+      int count = 0;
+
+      while (clientProtocolManager.isAlive())
+      {
+         if (ClientSessionFactoryImpl.isDebug)
+         {
+            HornetQClientLogger.LOGGER.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);
+         }
+
+         if (getConnection() != null)
+         {
+            if (HornetQClientLogger.LOGGER.isDebugEnabled())
+            {
+               HornetQClientLogger.LOGGER.debug("Reconnection successful");
+            }
+            return;
+         }
+         else
+         {
+            // Failed to get connection
+
+            if (reconnectAttempts != 0)
+            {
+               count++;
+
+               if (reconnectAttempts != -1 && count == reconnectAttempts)
+               {
+                  if (reconnectAttempts != 1)
+                  {
+                     HornetQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);
+                  }
+
+                  return;
+               }
+
+               if (ClientSessionFactoryImpl.isTrace)
+               {
+                  HornetQClientLogger.LOGGER.waitingForRetry(interval, retryInterval, retryIntervalMultiplier);
+               }
+
+               try
+               {
+                  if (clientProtocolManager.waitOnLatch(interval))
+                  {
+                     return;
+                  }
+               }
+               catch (InterruptedException ignore)
+               {
+                  throw new HornetQInterruptedException(createTrace);
+               }
+
+               // Exponential back-off
+               long newInterval = (long) (interval * retryIntervalMultiplier);
+
+               if (newInterval > maxRetryInterval)
+               {
+                  newInterval = maxRetryInterval;
+               }
+
+               interval = newInterval;
+            }
+            else
+            {
+               HornetQClientLogger.LOGGER.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
+               return;
+            }
+         }
+      }
+   }
+
+   private void cancelScheduledTasks()
+   {
+      Future<?> pingerFutureLocal = pingerFuture;
+      if (pingerFutureLocal != null)
+      {
+         pingerFutureLocal.cancel(false);
+      }
+      PingRunnable pingRunnableLocal = pingRunnable;
+      if (pingRunnableLocal != null)
+      {
+         pingRunnableLocal.cancel();
+      }
+      pingerFuture = null;
+      pingRunnable = null;
+   }
+
+   private void checkCloseConnection()
+   {
+      if (connection != null && sessions.size() == 0)
+      {
+         cancelScheduledTasks();
+
+         try
+         {
+            connection.destroy();
+         }
+         catch (Throwable ignore)
+         {
+         }
+
+         connection = null;
+
+         try
+         {
+            if (connector != null)
+            {
+               connector.close();
+            }
+         }
+         catch (Throwable ignore)
+         {
+         }
+
+         connector = null;
+      }
+   }
+
+   public RemotingConnection getConnection()
+   {
+      if (closed)
+         throw new IllegalStateException("ClientSessionFactory is closed!");
+      if (!clientProtocolManager.isAlive())
+         return null;
+      synchronized (connectionLock)
+      {
+         if (connection != null)
+         {
+            // a connection already exists, so returning the same one
+            return connection;
+         }
+         else
+         {
+            connection = establishNewConnection();
+
+            //we check if we can actually connect.
+            // we do it here as to receive the reply connection has to be not null
+            if (connection != null && liveNodeID != null)
+            {
+               try
+               {
+                  if (!clientProtocolManager.checkForFailover(liveNodeID))
+                  {
+                     connection.destroy();
+                     connection = null;
+                  }
+               }
+               catch (HornetQException e)
+               {
+                  if (connection != null)
+                  {
+                     connection.destroy();
+                     connection = null;
+                  }
+               }
+            }
+
+            if (connection != null && serverLocator.getAfterConnectInternalListener() != null)
+            {
+               serverLocator.getAfterConnectInternalListener().onConnection(this);
+            }
+
+            if (serverLocator.getTopology() != null)
+            {
+               if (connection != null)
+               {
+                  if (ClientSessionFactoryImpl.isTrace)
+                  {
+                     HornetQClientLogger.LOGGER.trace(this + "::Subscribing Topology");
+                  }
+                  clientProtocolManager.sendSubscribeTopology(serverLocator.isClusterConnection());
+               }
+            }
+            else
+            {
+               HornetQClientLogger.LOGGER.debug("serverLocator@" + System.identityHashCode(serverLocator + " had no topology"));
+            }
+
+            return connection;
+         }
+      }
+   }
+
+
+   protected void schedulePing()
+   {
+      if (pingerFuture == null)
+      {
+         pingRunnable = new ClientSessionFactoryImpl.PingRunnable();
+
+         if (clientFailureCheckPeriod != -1)
+         {
+            pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(new ClientSessionFactoryImpl.ActualScheduledPinger(pingRunnable),
+                                                                      0,
+                                                                      clientFailureCheckPeriod,
+                                                                      TimeUnit.MILLISECONDS);
+         }
+
+         // To make sure the first ping will be sent
+         pingRunnable.send();
+      }
+      // send a ping every time we create a new remoting connection
+      // to set up its TTL on the server side
+      else
+      {
+         pingRunnable.run();
+      }
+   }
+
+
+   @Override
+   protected void finalize() throws Throwable
+   {
+      if (!closed && finalizeCheck)
+      {
+         HornetQClientLogger.LOGGER.factoryLeftOpen(createTrace, System.identityHashCode(this));
+
+         close();
+      }
+
+      super.finalize();
+   }
+
+   protected ConnectorFactory instantiateConnectorFactory(final String connectorFactoryClassName)
+   {
+
+      // Will set the instance here to avoid races where cachedFactory is set to null
+      ConnectorFactory cachedFactory = connectorFactory;
+
+      // First if cachedFactory had been used already, we take it from the cache.
+      if (cachedFactory != null && cachedFactory.getClass().getName().equals(connectorFactoryClassName))
+      {
+         return cachedFactory;
+      }
+      // else... we will try to instantiate a new one
+
+      return AccessController.doPrivileged(new PrivilegedAction<ConnectorFactory>()
+      {
+         public ConnectorFactory run()
+         {
+            return (ConnectorFactory) ClassloadingUtil.newInstanceFromClassLoader(connectorFactoryClassName);
+         }
+      });
+   }
+
+
+   public class CloseRunnable implements Runnable
+   {
+      private final RemotingConnection conn;
+      private final String scaleDownTargetNodeID;
+
+      public CloseRunnable(RemotingConnection conn, String scaleDownTargetNodeID)
+      {
+         this.conn = conn;
+         this.scaleDownTargetNodeID = scaleDownTargetNodeID;
+      }
+
+      // Must be executed on new thread since cannot block the Netty thread for a long time and fail
+      // can cause reconnect loop
+      public void run()
+      {
+         try
+         {
+            CLOSE_RUNNABLES.add(this);
+            if (scaleDownTargetNodeID == null)
+            {
+               conn.fail(HornetQClientMessageBundle.BUNDLE.disconnected());
+            }
+            else
+            {
+               conn.fail(HornetQClientMessageBundle.BUNDLE.disconnected(), scaleDownTargetNodeID);
+            }
+         }
+         finally
+         {
+            CLOSE_RUNNABLES.remove(this);
+         }
+
+      }
+
+      public ClientSessionFactoryImpl stop()
+      {
+         causeExit();
+         CLOSE_RUNNABLES.remove(this);
+         return ClientSessionFactoryImpl.this;
+      }
+
+   }
+
+
+   public void setReconnectAttempts(final int attempts)
+   {
+      reconnectAttempts = attempts;
+   }
+
+   public Object getConnector()
+   {
+      return connector;
+   }
+
+   @Override
+   public ConfirmationWindowWarning getConfirmationWindowWarning()
+   {
+      return confirmationWindowWarning;
+   }
+
+   protected Connection openTransportConnection(final Connector connector)
+   {
+      connector.start();
+
+      Connection transportConnection = connector.createConnection();
+
+      if (transportConnection == null)
+      {
+         if (ClientSessionFactoryImpl.isDebug)
+         {
+            HornetQClientLogger.LOGGER.debug("Connector towards " + connector + " failed");
+         }
+
+         try
+         {
+            connector.close();
+         }
+         catch (Throwable t)
+         {
+         }
+      }
+
+      return transportConnection;
+   }
+
+   protected Connector createConnector(ConnectorFactory connectorFactory, TransportConfiguration configuration)
+   {
+      return connectorFactory.createConnector(configuration.getParams(),
+                                              new DelegatingBufferHandler(),
+                                              this,
+                                              closeExecutor,
+                                              threadPool,
+                                              scheduledThreadPool,
+                                              clientProtocolManager);
+   }
+
+   private void checkTransportKeys(final ConnectorFactory factory, final TransportConfiguration tc)
+   {
+      if (tc.getParams() != null)
+      {
+         Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), tc.getParams().keySet());
+
+         if (!invalid.isEmpty())
+         {
+            String msg = "The following keys are invalid for configuring a connector: " +
+               ConfigurationHelper.stringSetToCommaListString(invalid);
+
+            throw new IllegalStateException(msg);
+
+         }
+      }
+   }
+
+   /**
+    * It will connect to either live or backup accordingly to the current configurations
+    * it will also switch to backup case it can't connect to live and there's a backup configured
+    *
+    * @return
+    */
+   protected Connection createTransportConnection()
+   {
+      Connection transportConnection = null;
+
+      try
+      {
+         if (ClientSessionFactoryImpl.isDebug)
+         {
+            HornetQClientLogger.LOGGER.debug("Trying to connect with connector = " + connectorFactory +
+                                                ", parameters = " +
+                                                connectorConfig.getParams() +
+                                                " connector = " +
+                                                connector);
+         }
+
+
+         Connector liveConnector = createConnector(connectorFactory, connectorConfig);
+
+         if ((transportConnection = openTransportConnection(liveConnector)) != null)
+         {
+            // if we can't connect the connect method will return null, hence we have to try the backup
+            connector = liveConnector;
+         }
+         else if (backupConfig != null)
+         {
+            if (ClientSessionFactoryImpl.isDebug)
+            {
+               HornetQClientLogger.LOGGER.debug("Trying backup config = " + backupConfig);
+            }
+
+            ConnectorFactory backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
+
+            Connector backupConnector = createConnector(backupConnectorFactory, backupConfig);
+
+            transportConnection = openTransportConnection(backupConnector);
+
+            if ((transportConnection = openTransportConnection(backupConnector)) != null)
+            {
+            /*looks like the backup is now live, let's use that*/
+
+               if (ClientSessionFactoryImpl.isDebug)
+               {
+                  HornetQClientLogger.LOGGER.debug("Connected to the backup at " + backupConfig);
+               }
+
+               // Switching backup as live
+               connector = backupConnector;
+               connectorConfig = backupConfig;
+               backupConfig = null;
+               connectorFactory = backupConnectorFactory;
+            }
+            else
+            {
+               if (ClientSessionFactoryImpl.isDebug)
+               {
+                  HornetQClientLogger.LOGGER.debug("Backup is not active yet");
+               }
+            }
+
+         }
+      }
+      catch (Exception cause)
+      {
+         // Sanity catch for badly behaved remoting plugins
+
+         HornetQClientLogger.LOGGER.createConnectorException(cause);
+
+         if (transportConnection != null)
+         {
+            try
+            {
+               transportConnection.close();
+            }
+            catch (Throwable t)
+            {
+            }
+         }
+
+         if (connector != null)
+         {
+            try
+            {
+               connector.close();
+            }
+            catch (Throwable t)
+            {
+            }
+         }
+
+         transportConnection = null;
+
+         connector = null;
+      }
+
+      return transportConnection;
+   }
+
+   private class DelegatingBufferHandler implements BufferHandler
+   {
+      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+      {
+         RemotingConnection theConn = connection;
+
+         if (theConn != null && connectionID.equals(theConn.getID()))
+         {
+            theConn.bufferReceived(connectionID, buffer);
+         }
+         else
+         {
+            HornetQClientLogger.LOGGER.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
+         }
+      }
+   }
+
+   private final class DelegatingFailureListener implements FailureListener
+   {
+      private final Object connectionID;
+
+      DelegatingFailureListener(final Object connectionID)
+      {
+         this.connectionID = connectionID;
+      }
+
+      @Override
+      public void connectionFailed(final HornetQException me, final boolean failedOver)
+      {
+         connectionFailed(me, failedOver, null);
+      }
+
+      @Override
+      public void connectionFailed(final HornetQException me, final boolean failedOver, String scaleDownTargetNodeID)
+      {
+         handleConnectionFailure(connectionID, me, scaleDownTargetNodeID);
+      }
+
+      @Override
+      public String toString()
+      {
+         return DelegatingFailureListener.class.getSimpleName() + "('reconnectsOrFailover', hash=" +
+            super.hashCode() + ")";
+      }
+   }
+
+   private static final class ActualScheduledPinger implements Runnable
+   {
+      private final WeakReference<PingRunnable> pingRunnable;
+
+      ActualScheduledPinger(final PingRunnable runnable)
+      {
+         pingRunnable = new WeakReference<PingRunnable>(runnable);
+      }
+
+      public void run()
+      {
+         PingRunnable runnable = pingRunnable.get();
+
+         if (runnable != null)
+         {
+            runnable.run();
+         }
+      }
+
+   }
+
+   private final class PingRunnable implements Runnable
+   {
+      private boolean cancelled;
+
+      private boolean first;
+
+      private long lastCheck = System.currentTimeMillis();
+
+      public synchronized void run()
+      {
+         if (cancelled || stopPingingAfterOne && !first)
+         {
+            return;
+         }
+
+         first = false;
+
+         long now = System.currentTimeMillis();
+
+         if (clientFailureCheckPeriod != -1 && connectionTTL != -1 && now >= lastCheck + connectionTTL)
+         {
+            if (!connection.checkDataReceived())
+            {
+
+               // We use a different thread to send the fail
+               // but the exception has to be created here to preserve the stack trace
+               final HornetQException me = HornetQClientMessageBundle.BUNDLE.connectionTimedOut(connection.getTransportConnection());
+
+               cancelled = true;
+
+               threadPool.execute(new Runnable()
+               {
+                  // Must be executed on different thread
+                  public void run()
+                  {
+                     connection.fail(me);
+                  }
+               });
+
+               return;
+            }
+            else
+            {
+               lastCheck = now;
+            }
+         }
+
+         send();
+      }
+
+      /**
+       *
+       */
+      public void send()
+      {
+
+         clientProtocolManager.ping(connectionTTL);
+      }
+
+      public synchronized void cancel()
+      {
+         cancelled = true;
+      }
+   }
+
+
+   protected RemotingConnection establishNewConnection()
+   {
+      Connection transportConnection = createTransportConnection();
+
+      if (transportConnection == null)
+      {
+         if (ClientSessionFactoryImpl.isTrace)
+         {
+            HornetQClientLogger.LOGGER.trace("Neither backup or live were active, will just give up now");
+         }
+         return null;
+      }
+
+      RemotingConnection newConnection = clientProtocolManager.connect(transportConnection, callTimeout,
+                                                                       callFailoverTimeout, incomingInterceptors,
+                                                                       outgoingInterceptors,
+                                                                       new SessionFactoryTopologyHandler());
+
+      newConnection.addFailureListener(new DelegatingFailureListener(newConnection.getID()));
+
+      schedulePing();
+
+      if (HornetQClientLogger.LOGGER.isTraceEnabled())
+      {
+         HornetQClientLogger.LOGGER.trace("returning " + connection);
+      }
+
+      return newConnection;
+   }
+
+
+   protected SessionContext createSessionChannel(final String name,
+                                                 final String username,
+                                                 final String password,
+                                                 final boolean xa,
+                                                 final boolean autoCommitSends,
+                                                 final boolean autoCommitAcks,
+                                                 final boolean preAcknowledge) throws HornetQException
+   {
+      synchronized (createSessionLock)
+      {
+         return clientProtocolManager.createSessionContext(name, username,
+                                                           password, xa, autoCommitSends, autoCommitAcks, preAcknowledge,
+                                                           serverLocator.getMinLargeMessageSize(), serverLocator.getConfirmationWindowSize());
+      }
+   }
+
+   @Override
+   public String getLiveNodeId()
+   {
+      return liveNodeID;
+   }
+
+   class SessionFactoryTopologyHandler implements TopologyResponseHandler
+   {
+
+      @Override
+      public void nodeDisconnected(RemotingConnection conn, String nodeID, String scaleDownTargetNodeID)
+      {
+
+         if (HornetQClientLogger.LOGGER.isTraceEnabled())
+         {
+            HornetQClientLogger.LOGGER.trace("Disconnect being called on client:" +
+                                                " server locator = " +
+                                                serverLocator +
+                                                " notifying node " +
+                                                nodeID +
+                                                " as down", new Exception("trace"));
+         }
+
+         serverLocator.notifyNodeDown(System.currentTimeMillis(), nodeID);
+
+         closeExecutor.execute(new CloseRunnable(conn, scaleDownTargetNodeID));
+
+      }
+
+      @Override
+      public void notifyNodeUp(long uniqueEventID, String nodeID, String backupGroupName, String scaleDownGroupName, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean isLast)
+      {
+         // if it is our connector then set the live id used for failover
+         if (connectorPair.getA() != null && connectorPair.getA().equals(connectorConfig))
+         {
+            liveNodeID = nodeID;
+         }
+         serverLocator.notifyNodeUp(uniqueEventID, nodeID, backupGroupName, scaleDownGroupName, connectorPair, isLast);
+      }
+
+      @Override
+      public void notifyNodeDown(long eventTime, String nodeID)
+      {
+         serverLocator.notifyNodeDown(eventTime, nodeID);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryInternal.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryInternal.java
new file mode 100644
index 0000000..3ddfde5
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryInternal.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+import java.util.concurrent.locks.Lock;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.client.ClientSessionFactory;
+import org.apache.activemq6.api.core.client.SessionFailureListener;
+import org.apache.activemq6.utils.ConfirmationWindowWarning;
+
+/**
+ * A ClientSessionFactoryInternal
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
+ *
+ */
+public interface ClientSessionFactoryInternal extends ClientSessionFactory
+{
+   void causeExit();
+
+   void addFailureListener(SessionFailureListener listener);
+
+   boolean removeFailureListener(SessionFailureListener listener);
+
+   void disableFinalizeCheck();
+
+   String getLiveNodeId();
+
+   // for testing
+
+   int numConnections();
+
+   int numSessions();
+
+   void removeSession(final ClientSessionInternal session, boolean failingOver);
+
+   void connect(int reconnectAttempts, boolean failoverOnInitialConnection) throws HornetQException;
+
+   void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp);
+
+   Object getConnector();
+
+   Object getBackupConnector();
+
+   void setReconnectAttempts(int i);
+
+   ConfirmationWindowWarning getConfirmationWindowWarning();
+
+   Lock lockFailover();
+}


Mime
View raw message