activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [40/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:42:10 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerImpl.java
new file mode 100644
index 0000000..ed89a21
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerImpl.java
@@ -0,0 +1,1239 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+import java.io.File;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQInterruptedException;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.api.core.client.ClientMessage;
+import org.apache.activemq6.api.core.client.ClientSession;
+import org.apache.activemq6.api.core.client.ClientSessionFactory;
+import org.apache.activemq6.api.core.client.MessageHandler;
+import org.apache.activemq6.api.core.client.ServerLocator;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+import org.apache.activemq6.spi.core.remoting.ConsumerContext;
+import org.apache.activemq6.spi.core.remoting.SessionContext;
+import org.apache.activemq6.utils.FutureLatch;
+import org.apache.activemq6.utils.PriorityLinkedList;
+import org.apache.activemq6.utils.PriorityLinkedListImpl;
+import org.apache.activemq6.utils.ReusableLatch;
+import org.apache.activemq6.utils.TokenBucketLimiter;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ * @version <tt>$Revision: 3603 $</tt> $Id: ClientConsumerImpl.java 3603 2008-01-21 18:49:20Z timfox $
+ */
+public final class ClientConsumerImpl implements ClientConsumerInternal
+{
+   // Constants
+   // ------------------------------------------------------------------------------------
+
+   private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled();
+
+   private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
+
+   private static final int NUM_PRIORITIES = 10;
+
+   public static final SimpleString FORCED_DELIVERY_MESSAGE = new SimpleString("_hornetq.forced.delivery.seq");
+
+   // Attributes
+   // -----------------------------------------------------------------------------------
+
+   private final ClientSessionInternal session;
+
+   private final SessionContext sessionContext;
+
+   private final ConsumerContext consumerContext;
+
+   private final SimpleString filterString;
+
+   private final SimpleString queueName;
+
+   private final boolean browseOnly;
+
+   private final Executor sessionExecutor;
+
+   // For failover we can't send credits back
+   // while holding a lock or failover could dead lock eventually
+   // And we can't use the sessionExecutor as that's being used for message handlers
+   // for that reason we have a separate flowControlExecutor that's using the thread pool
+   // Which is a OrderedExecutor
+   private final Executor flowControlExecutor;
+
+   // Number of pending calls on flow control
+   private final ReusableLatch pendingFlowControl = new ReusableLatch(0);
+
+   private final int clientWindowSize;
+
+   private final int ackBatchSize;
+
+   private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<ClientMessageInternal>(ClientConsumerImpl.NUM_PRIORITIES);
+
+   private final Runner runner = new Runner();
+
+   private LargeMessageControllerImpl currentLargeMessageController;
+
+   // When receiving LargeMessages, the user may choose to not read the body, on this case we need to discard the body
+   // before moving to the next message.
+   private ClientMessageInternal largeMessageReceived;
+
+   private final TokenBucketLimiter rateLimiter;
+
+   private volatile Thread receiverThread;
+
+   private volatile Thread onMessageThread;
+
+   private volatile MessageHandler handler;
+
+   private volatile boolean closing;
+
+   private volatile boolean closed;
+
+   private volatile int creditsToSend;
+
+   private volatile boolean failedOver;
+
+   private volatile Exception lastException;
+
+   private volatile int ackBytes;
+
+   private volatile ClientMessageInternal lastAckedMessage;
+
+   private boolean stopped = false;
+
+   private long forceDeliveryCount;
+
+   private final ClientSession.QueueQuery queueInfo;
+
+   private volatile boolean ackIndividually;
+
+   private final ClassLoader contextClassLoader;
+
+   // Constructors
+   // ---------------------------------------------------------------------------------
+
+   public ClientConsumerImpl(final ClientSessionInternal session,
+                             final ConsumerContext consumerContext,
+                             final SimpleString queueName,
+                             final SimpleString filterString,
+                             final boolean browseOnly,
+                             final int clientWindowSize,
+                             final int ackBatchSize,
+                             final TokenBucketLimiter rateLimiter,
+                             final Executor executor,
+                             final Executor flowControlExecutor,
+                             final SessionContext sessionContext,
+                             final ClientSession.QueueQuery queueInfo,
+                             final ClassLoader contextClassLoader)
+   {
+      this.consumerContext = consumerContext;
+
+      this.queueName = queueName;
+
+      this.filterString = filterString;
+
+      this.browseOnly = browseOnly;
+
+      this.sessionContext = sessionContext;
+
+      this.session = session;
+
+      this.rateLimiter = rateLimiter;
+
+      sessionExecutor = executor;
+
+      this.clientWindowSize = clientWindowSize;
+
+      this.ackBatchSize = ackBatchSize;
+
+      this.queueInfo = queueInfo;
+
+      this.contextClassLoader = contextClassLoader;
+
+      this.flowControlExecutor = flowControlExecutor;
+   }
+
+   // ClientConsumer implementation
+   // -----------------------------------------------------------------
+
+   public ConsumerContext getConsumerContext()
+   {
+      return consumerContext;
+   }
+
+   private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws HornetQException
+   {
+      checkClosed();
+
+      if (largeMessageReceived != null)
+      {
+         // Check if there are pending packets to be received
+         largeMessageReceived.discardBody();
+         largeMessageReceived = null;
+      }
+
+      if (rateLimiter != null)
+      {
+         rateLimiter.limit();
+      }
+
+      if (handler != null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.messageHandlerSet();
+      }
+
+      if (clientWindowSize == 0)
+      {
+         startSlowConsumer();
+      }
+
+      receiverThread = Thread.currentThread();
+
+      // To verify if deliveryForced was already call
+      boolean deliveryForced = false;
+      // To control when to call deliveryForce
+      boolean callForceDelivery = false;
+
+      long start = -1;
+
+      long toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
+
+      try
+      {
+         while (true)
+         {
+            ClientMessageInternal m = null;
+
+            synchronized (this)
+            {
+               while ((stopped || (m = buffer.poll()) == null) && !closed && toWait > 0)
+               {
+                  if (start == -1)
+                  {
+                     start = System.currentTimeMillis();
+                  }
+
+                  if (m == null && forcingDelivery)
+                  {
+                     if (stopped)
+                     {
+                        break;
+                     }
+
+                     // we only force delivery once per call to receive
+                     if (!deliveryForced)
+                     {
+                        callForceDelivery = true;
+                        break;
+                     }
+                  }
+
+                  try
+                  {
+                     wait(toWait);
+                  }
+                  catch (InterruptedException e)
+                  {
+                     throw new HornetQInterruptedException(e);
+                  }
+
+                  if (m != null || closed)
+                  {
+                     break;
+                  }
+
+                  long now = System.currentTimeMillis();
+
+                  toWait -= now - start;
+
+                  start = now;
+               }
+            }
+
+            if (failedOver)
+            {
+               if (m == null)
+               {
+                  // if failed over and the buffer is null, we reset the state and try it again
+                  failedOver = false;
+                  deliveryForced = false;
+                  toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
+                  continue;
+               }
+               else
+               {
+                  failedOver = false;
+               }
+            }
+
+            if (callForceDelivery)
+            {
+               if (isTrace)
+               {
+                  HornetQClientLogger.LOGGER.trace("Forcing delivery");
+               }
+               // JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
+               sessionContext.forceDelivery(this, forceDeliveryCount++);
+               callForceDelivery = false;
+               deliveryForced = true;
+               continue;
+            }
+
+            if (m != null)
+            {
+               session.workDone();
+
+               if (m.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
+               {
+                  long seq = m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE);
+
+                  // Need to check if forceDelivery was called at this call
+                  // As we could be receiving a message that came from a previous call
+                  if (forcingDelivery && deliveryForced && seq == forceDeliveryCount - 1)
+                  {
+                     // forced delivery messages are discarded, nothing has been delivered by the queue
+                     resetIfSlowConsumer();
+
+                     if (isTrace)
+                     {
+                        HornetQClientLogger.LOGGER.trace("There was nothing on the queue, leaving it now:: returning null");
+                     }
+
+                     return null;
+                  }
+                  else
+                  {
+                     if (isTrace)
+                     {
+                        HornetQClientLogger.LOGGER.trace("Ignored force delivery answer as it belonged to another call");
+                     }
+                     // Ignore the message
+                     continue;
+                  }
+               }
+               // if we have already pre acked we can't expire
+               boolean expired = m.isExpired();
+
+               flowControlBeforeConsumption(m);
+
+               if (expired)
+               {
+                  m.discardBody();
+
+                  session.expire(this, m);
+
+                  if (clientWindowSize == 0)
+                  {
+                     startSlowConsumer();
+                  }
+
+                  if (toWait > 0)
+                  {
+                     continue;
+                  }
+                  else
+                  {
+                     return null;
+                  }
+               }
+
+               if (m.isLargeMessage())
+               {
+                  largeMessageReceived = m;
+               }
+
+               if (isTrace)
+               {
+                  HornetQClientLogger.LOGGER.trace("Returning " + m);
+               }
+
+               return m;
+            }
+            else
+            {
+               if (isTrace)
+               {
+                  HornetQClientLogger.LOGGER.trace("Returning null");
+               }
+               resetIfSlowConsumer();
+               return null;
+            }
+         }
+      }
+      finally
+      {
+         receiverThread = null;
+      }
+   }
+
+   public ClientMessage receive(final long timeout) throws HornetQException
+   {
+      ClientMessage msg = receive(timeout, false);
+
+      if (msg == null && !closed)
+      {
+         msg = receive(0, true);
+      }
+
+      return msg;
+   }
+
+   public ClientMessage receive() throws HornetQException
+   {
+      return receive(0, false);
+   }
+
+   public ClientMessage receiveImmediate() throws HornetQException
+   {
+      return receive(0, true);
+   }
+
+   public MessageHandler getMessageHandler() throws HornetQException
+   {
+      checkClosed();
+
+      return handler;
+   }
+
+   // Must be synchronized since messages may be arriving while handler is being set and might otherwise end
+   // up not queueing enough executors - so messages get stranded
+   public synchronized ClientConsumerImpl setMessageHandler(final MessageHandler theHandler) throws HornetQException
+   {
+      checkClosed();
+
+      if (receiverThread != null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.inReceive();
+      }
+
+      boolean noPreviousHandler = handler == null;
+
+      if (handler != theHandler && clientWindowSize == 0)
+      {
+         startSlowConsumer();
+      }
+
+      handler = theHandler;
+
+      // if no previous handler existed queue up messages for delivery
+      if (handler != null && noPreviousHandler)
+      {
+         requeueExecutors();
+      }
+      // if unsetting a previous handler may be in onMessage so wait for completion
+      else if (handler == null && !noPreviousHandler)
+      {
+         waitForOnMessageToComplete(true);
+      }
+
+      return this;
+   }
+
+   public void close() throws HornetQException
+   {
+      doCleanUp(true);
+   }
+
+   /**
+    * To be used by MDBs to stop any more handling of messages.
+    *
+    * @throws HornetQException
+    * @param future the future to run once the onMessage Thread has completed
+    */
+   public Thread prepareForClose(final FutureLatch future) throws HornetQException
+   {
+      closing = true;
+
+      resetLargeMessageController();
+
+      //execute the future after the last onMessage call
+      sessionExecutor.execute(new Runnable()
+      {
+         @Override
+         public void run()
+         {
+            future.run();
+         }
+      });
+
+      return onMessageThread;
+   }
+
+   public void cleanUp()
+   {
+      try
+      {
+         doCleanUp(false);
+      }
+      catch (HornetQException e)
+      {
+         HornetQClientLogger.LOGGER.warn("problem cleaning up: " + this);
+      }
+   }
+
+   public boolean isClosed()
+   {
+      return closed;
+   }
+
+   public void stop(final boolean waitForOnMessage) throws HornetQException
+   {
+      waitForOnMessageToComplete(waitForOnMessage);
+
+      if (browseOnly)
+      {
+         // stop shouldn't affect browser delivery
+         return;
+      }
+
+      synchronized (this)
+      {
+         if (stopped)
+         {
+            return;
+         }
+
+         stopped = true;
+      }
+   }
+
+   public void clearAtFailover()
+   {
+      clearBuffer();
+
+      // failover will issue a start later
+      this.stopped = true;
+
+      resetLargeMessageController();
+
+      lastAckedMessage = null;
+
+      creditsToSend = 0;
+
+      failedOver = true;
+
+      ackIndividually = false;
+   }
+
+   public synchronized void start()
+   {
+      stopped = false;
+
+      requeueExecutors();
+   }
+
+   public Exception getLastException()
+   {
+      return lastException;
+   }
+
+   // ClientConsumerInternal implementation
+   // --------------------------------------------------------------
+
+   public ClientSession.QueueQuery getQueueInfo()
+   {
+      return queueInfo;
+   }
+
+   public SimpleString getFilterString()
+   {
+      return filterString;
+   }
+
+   public SimpleString getQueueName()
+   {
+      return queueName;
+   }
+
+   public boolean isBrowseOnly()
+   {
+      return browseOnly;
+   }
+
+   public synchronized void handleMessage(final ClientMessageInternal message) throws Exception
+   {
+      if (closing)
+      {
+         // This is ok - we just ignore the message
+         return;
+      }
+
+      if (message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED))
+      {
+         handleCompressedMessage(message);
+      }
+      else
+      {
+         handleRegularMessage(message);
+      }
+   }
+
+   private void handleRegularMessage(ClientMessageInternal message)
+   {
+      if (message.getAddress() == null)
+      {
+         message.setAddressTransient(queueInfo.getAddress());
+      }
+
+      message.onReceipt(this);
+
+      if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
+      {
+         // We have messages of different priorities so we need to ack them individually since the order
+         // of them in the ServerConsumerImpl delivery list might not be the same as the order they are
+         // consumed in, which means that acking all up to won't work
+         ackIndividually = true;
+      }
+
+      // Add it to the buffer
+      buffer.addTail(message, message.getPriority());
+
+      if (handler != null)
+      {
+         // Execute using executor
+         if (!stopped)
+         {
+            queueExecutor();
+         }
+      }
+      else
+      {
+         notify();
+      }
+   }
+
+   /**
+    * This method deals with messages arrived as regular message but its contents are compressed.
+    * Such messages come from message senders who are configured to compress large messages, and
+    * if some of the messages are compressed below the min-large-message-size limit, they are sent
+    * as regular messages.
+    * <p/>
+    * However when decompressing the message, we are not sure how large the message could be..
+    * for that reason we fake a large message controller that will deal with the message as it was a large message
+    * <p/>
+    * Say that you sent a 1G message full of spaces. That could be just bellow 100K compressed but you wouldn't have
+    * enough memory to decompress it
+    */
+   private void handleCompressedMessage(final ClientMessageInternal clMessage) throws Exception
+   {
+      ClientLargeMessageImpl largeMessage = new ClientLargeMessageImpl();
+      largeMessage.retrieveExistingData(clMessage);
+
+      File largeMessageCache = null;
+
+      if (session.isCacheLargeMessageClient())
+      {
+         largeMessageCache = File.createTempFile("tmp-large-message-" + largeMessage.getMessageID() + "-",
+                                                 ".tmp");
+         largeMessageCache.deleteOnExit();
+      }
+
+      ClientSessionFactory sf = session.getSessionFactory();
+      ServerLocator locator = sf.getServerLocator();
+      long callTimeout = locator.getCallTimeout();
+
+      currentLargeMessageController = new LargeMessageControllerImpl(this, largeMessage.getLargeMessageSize(), callTimeout, largeMessageCache);
+      currentLargeMessageController.setLocal(true);
+
+      //sets the packet
+      HornetQBuffer qbuff = clMessage.getBodyBuffer();
+      int bytesToRead = qbuff.writerIndex() - qbuff.readerIndex();
+      final byte[] body = qbuff.readBytes(bytesToRead).toByteBuffer().array();
+
+      largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
+      currentLargeMessageController.addPacket(body, body.length, false);
+
+      handleRegularMessage(largeMessage);
+   }
+
+   public synchronized void handleLargeMessage(final ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception
+   {
+      if (closing)
+      {
+         // This is ok - we just ignore the message
+         return;
+      }
+
+      // Flow control for the first packet, we will have others
+      File largeMessageCache = null;
+
+      if (session.isCacheLargeMessageClient())
+      {
+         largeMessageCache = File.createTempFile("tmp-large-message-" + clientLargeMessage.getMessageID() + "-",
+                                                 ".tmp");
+         largeMessageCache.deleteOnExit();
+      }
+
+      ClientSessionFactory sf = session.getSessionFactory();
+      ServerLocator locator = sf.getServerLocator();
+      long callTimeout = locator.getCallTimeout();
+
+      currentLargeMessageController = new LargeMessageControllerImpl(this, largeMessageSize, callTimeout, largeMessageCache);
+
+      if (clientLargeMessage.isCompressed())
+      {
+         clientLargeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
+      }
+      else
+      {
+         clientLargeMessage.setLargeMessageController(currentLargeMessageController);
+      }
+
+      handleRegularMessage(clientLargeMessage);
+   }
+
+   public synchronized void handleLargeMessageContinuation(final byte[] chunk, final int flowControlSize, final boolean isContinues) throws Exception
+   {
+      if (closing)
+      {
+         return;
+      }
+      if (currentLargeMessageController == null)
+      {
+         if (isTrace)
+         {
+            HornetQClientLogger.LOGGER.trace("Sending back credits for largeController = null " + flowControlSize);
+         }
+         flowControl(flowControlSize, false);
+      }
+      else
+      {
+         currentLargeMessageController.addPacket(chunk, flowControlSize, isContinues);
+      }
+   }
+
+   public void clear(boolean waitForOnMessage) throws HornetQException
+   {
+      synchronized (this)
+      {
+         // Need to send credits for the messages in the buffer
+
+         Iterator<ClientMessageInternal> iter = buffer.iterator();
+
+         while (iter.hasNext())
+         {
+            try
+            {
+               ClientMessageInternal message = iter.next();
+
+               if (message.isLargeMessage())
+               {
+                  ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal) message;
+                  largeMessage.getLargeMessageController().cancel();
+               }
+
+               flowControlBeforeConsumption(message);
+            }
+            catch (Exception e)
+            {
+               HornetQClientLogger.LOGGER.errorClearingMessages(e);
+            }
+         }
+
+         clearBuffer();
+
+         try
+         {
+            resetLargeMessageController();
+         }
+         catch (Throwable e)
+         {
+            // nothing that could be done here
+            HornetQClientLogger.LOGGER.errorClearingMessages(e);
+         }
+      }
+
+      // Need to send credits for the messages in the buffer
+
+      waitForOnMessageToComplete(waitForOnMessage);
+   }
+
+   private void resetLargeMessageController()
+   {
+
+      LargeMessageController controller = currentLargeMessageController;
+      if (controller != null)
+      {
+         controller.cancel();
+         currentLargeMessageController = null;
+      }
+   }
+
+   public int getClientWindowSize()
+   {
+      return clientWindowSize;
+   }
+
+   public int getBufferSize()
+   {
+      return buffer.size();
+   }
+
+   public void acknowledge(final ClientMessage message) throws HornetQException
+   {
+      ClientMessageInternal cmi = (ClientMessageInternal) message;
+
+      if (ackIndividually)
+      {
+         individualAcknowledge(message);
+      }
+      else
+      {
+         ackBytes += message.getEncodeSize();
+
+         if (ackBytes >= ackBatchSize)
+         {
+            doAck(cmi);
+         }
+         else
+         {
+            lastAckedMessage = cmi;
+         }
+      }
+   }
+
+   public void individualAcknowledge(ClientMessage message) throws HornetQException
+   {
+      if (lastAckedMessage != null)
+      {
+         flushAcks();
+      }
+
+      session.individualAcknowledge(this, message);
+   }
+
+   public void flushAcks() throws HornetQException
+   {
+      if (lastAckedMessage != null)
+      {
+         doAck(lastAckedMessage);
+      }
+   }
+
+   /**
+    * LargeMessageBuffer will call flowcontrol here, while other handleMessage will also be calling flowControl.
+    * So, this operation needs to be atomic.
+    *
+    * @param discountSlowConsumer When dealing with slowConsumers, we need to discount one credit that was pre-sent when the first receive was called. For largeMessage that is only done at the latest packet
+    */
+   public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException
+   {
+      if (clientWindowSize >= 0)
+      {
+         creditsToSend += messageBytes;
+
+         if (creditsToSend >= clientWindowSize)
+         {
+            if (clientWindowSize == 0 && discountSlowConsumer)
+            {
+               if (isTrace)
+               {
+                  HornetQClientLogger.LOGGER.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
+               }
+
+               // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
+               // always buffering one after received the first message
+               final int credits = creditsToSend - 1;
+
+               creditsToSend = 0;
+
+               if (credits > 0)
+               {
+                  sendCredits(credits);
+               }
+            }
+            else
+            {
+               if (HornetQClientLogger.LOGGER.isDebugEnabled())
+               {
+                  HornetQClientLogger.LOGGER.debug("Sending " + messageBytes + " from flow-control");
+               }
+
+               final int credits = creditsToSend;
+
+               creditsToSend = 0;
+
+               if (credits > 0)
+               {
+                  sendCredits(credits);
+               }
+            }
+         }
+      }
+   }
+
+   // Public
+   // ---------------------------------------------------------------------------------------
+
+   // Package protected
+   // ---------------------------------------------------------------------------------------
+
+   // Protected
+   // ---------------------------------------------------------------------------------------
+
+   // Private
+   // ---------------------------------------------------------------------------------------
+
+   /**
+    * Sending a initial credit for slow consumers
+    */
+   private void startSlowConsumer()
+   {
+      if (isTrace)
+      {
+         HornetQClientLogger.LOGGER.trace("Sending 1 credit to start delivering of one message to slow consumer");
+      }
+      sendCredits(1);
+      try
+      {
+         // We use an executor here to guarantee the messages will arrive in order.
+         // However when starting a slow consumer, we have to guarantee the credit was sent before we can perform any
+         // operations like forceDelivery
+         pendingFlowControl.await(10, TimeUnit.SECONDS);
+      }
+      catch (InterruptedException e)
+      {
+         // will just ignore and forward the ignored
+         Thread.currentThread().interrupt();
+      }
+   }
+
+   private void resetIfSlowConsumer()
+   {
+      if (clientWindowSize == 0)
+      {
+         sendCredits(0);
+
+         // If resetting a slow consumer, we need to wait the execution
+         final CountDownLatch latch = new CountDownLatch(1);
+         flowControlExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               latch.countDown();
+            }
+         });
+
+         try
+         {
+            latch.await(10, TimeUnit.SECONDS);
+         }
+         catch (InterruptedException e)
+         {
+            throw new HornetQInterruptedException(e);
+         }
+      }
+   }
+
+   private void requeueExecutors()
+   {
+      for (int i = 0; i < buffer.size(); i++)
+      {
+         queueExecutor();
+      }
+   }
+
+   private void queueExecutor()
+   {
+      if (isTrace)
+      {
+         HornetQClientLogger.LOGGER.trace("Adding Runner on Executor for delivery");
+      }
+
+      sessionExecutor.execute(runner);
+   }
+
+   /**
+    * @param credits
+    */
+   private void sendCredits(final int credits)
+   {
+      pendingFlowControl.countUp();
+      flowControlExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               sessionContext.sendConsumerCredits(ClientConsumerImpl.this, credits);
+            }
+            finally
+            {
+               pendingFlowControl.countDown();
+            }
+         }
+      });
+   }
+
+   private void waitForOnMessageToComplete(boolean waitForOnMessage)
+   {
+      if (handler == null)
+      {
+         return;
+      }
+
+      if (!waitForOnMessage || Thread.currentThread() == onMessageThread)
+      {
+         // If called from inside onMessage then return immediately - otherwise would block
+         return;
+      }
+
+      org.apache.activemq6.utils.FutureLatch future = new FutureLatch();
+
+      sessionExecutor.execute(future);
+
+      boolean ok = future.await(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS);
+
+      if (!ok)
+      {
+         HornetQClientLogger.LOGGER.timeOutWaitingForProcessing();
+      }
+   }
+
+   private void checkClosed() throws HornetQException
+   {
+      if (closed)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.consumerClosed();
+      }
+   }
+
+   private void callOnMessage() throws Exception
+   {
+      if (closing || stopped)
+      {
+         return;
+      }
+
+      session.workDone();
+
+      // We pull the message from the buffer from inside the Runnable so we can ensure priority
+      // ordering. If we just added a Runnable with the message to the executor immediately as we get it
+      // we could not do that
+
+      ClientMessageInternal message;
+
+      // Must store handler in local variable since might get set to null
+      // otherwise while this is executing and give NPE when calling onMessage
+      MessageHandler theHandler = handler;
+
+      if (theHandler != null)
+      {
+         if (rateLimiter != null)
+         {
+            rateLimiter.limit();
+         }
+
+         failedOver = false;
+
+         synchronized (this)
+         {
+            message = buffer.poll();
+         }
+
+         if (message != null)
+         {
+            if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
+            {
+               //Ignore, this could be a relic from a previous receiveImmediate();
+               return;
+            }
+
+
+            boolean expired = message.isExpired();
+
+            flowControlBeforeConsumption(message);
+
+            if (!expired)
+            {
+               if (isTrace)
+               {
+                  HornetQClientLogger.LOGGER.trace("Calling handler.onMessage");
+               }
+               final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+               {
+                  public ClassLoader run()
+                  {
+                     ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
+
+                     Thread.currentThread().setContextClassLoader(contextClassLoader);
+
+                     return originalLoader;
+                  }
+               });
+
+               onMessageThread = Thread.currentThread();
+               try
+               {
+                  theHandler.onMessage(message);
+               }
+               finally
+               {
+                  try
+                  {
+                     AccessController.doPrivileged(new PrivilegedAction<Object>()
+                     {
+                        public Object run()
+                        {
+                           Thread.currentThread().setContextClassLoader(originalLoader);
+                           return null;
+                        }
+                     });
+                  }
+                  catch (Exception e)
+                  {
+                     HornetQClientLogger.LOGGER.warn(e.getMessage(), e);
+                  }
+
+                  onMessageThread = null;
+               }
+
+               if (isTrace)
+               {
+                  HornetQClientLogger.LOGGER.trace("Handler.onMessage done");
+               }
+
+               if (message.isLargeMessage())
+               {
+                  message.discardBody();
+               }
+            }
+            else
+            {
+               session.expire(this, message);
+            }
+
+            // If slow consumer, we need to send 1 credit to make sure we get another message
+            if (clientWindowSize == 0)
+            {
+               startSlowConsumer();
+            }
+         }
+      }
+   }
+
+   /**
+    * @param message
+    * @throws HornetQException
+    */
+   private void flowControlBeforeConsumption(final ClientMessageInternal message) throws HornetQException
+   {
+      // Chunk messages will execute the flow control while receiving the chunks
+      if (message.getFlowControlSize() != 0)
+      {
+         // on large messages we should discount 1 on the first packets as we need continuity until the last packet
+         flowControl(message.getFlowControlSize(), !message.isLargeMessage());
+      }
+   }
+
+   private void doCleanUp(final boolean sendCloseMessage) throws HornetQException
+   {
+      try
+      {
+         if (closed)
+         {
+            return;
+         }
+
+         // We need an extra flag closing, since we need to prevent any more messages getting queued to execute
+         // after this and we can't just set the closed flag to true here, since after/in onmessage the message
+         // might be acked and if the consumer is already closed, the ack will be ignored
+         closing = true;
+
+         // Now we wait for any current handler runners to run.
+         waitForOnMessageToComplete(true);
+
+         resetLargeMessageController();
+
+         closed = true;
+
+         synchronized (this)
+         {
+            if (receiverThread != null)
+            {
+               // Wake up any receive() thread that might be waiting
+               notify();
+            }
+
+            handler = null;
+
+            receiverThread = null;
+         }
+
+         flushAcks();
+
+         clearBuffer();
+
+         if (sendCloseMessage)
+         {
+            sessionContext.closeConsumer(this);
+         }
+      }
+      catch (Throwable t)
+      {
+         // Consumer close should always return without exception
+      }
+
+      session.removeConsumer(this);
+   }
+
+   private void clearBuffer()
+   {
+      buffer.clear();
+   }
+
+   private void doAck(final ClientMessageInternal message) throws HornetQException
+   {
+      ackBytes = 0;
+
+      lastAckedMessage = null;
+
+      session.acknowledge(this, message);
+   }
+
+   // Inner classes
+   // --------------------------------------------------------------------------------
+
+   private class Runner implements Runnable
+   {
+      public void run()
+      {
+         try
+         {
+            callOnMessage();
+         }
+         catch (Exception e)
+         {
+            HornetQClientLogger.LOGGER.onMessageError(e);
+
+            lastException = e;
+         }
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerInternal.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerInternal.java
new file mode 100644
index 0000000..a4f1551
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerInternal.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.api.core.client.ClientConsumer;
+import org.apache.activemq6.api.core.client.ClientMessage;
+import org.apache.activemq6.api.core.client.ClientSession;
+import org.apache.activemq6.utils.FutureLatch;
+
+/**
+ * A ClientConsumerInternal
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public interface ClientConsumerInternal extends ClientConsumer
+{
+   SimpleString getQueueName();
+
+   SimpleString getFilterString();
+
+   boolean isBrowseOnly();
+
+   void handleMessage(ClientMessageInternal message) throws Exception;
+
+   void handleLargeMessage(ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception;
+
+   void handleLargeMessageContinuation(byte[] chunk, int flowControlSize, boolean isContinues) throws Exception;
+
+   void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException;
+
+   void clear(boolean waitForOnMessage) throws HornetQException;
+
+   /**
+    * To be called by things like MDBs during shutdown of the server
+    *
+    * @throws HornetQException
+    * @param future
+    */
+   Thread prepareForClose(FutureLatch future) throws HornetQException;
+
+   void clearAtFailover();
+
+   int getClientWindowSize();
+
+   int getBufferSize();
+
+   void cleanUp() throws HornetQException;
+
+   void acknowledge(ClientMessage message) throws HornetQException;
+
+   void individualAcknowledge(ClientMessage message) throws HornetQException;
+
+   void flushAcks() throws HornetQException;
+
+   void stop(boolean waitForOnMessage) throws HornetQException;
+
+   void start();
+
+   ClientSession.QueueQuery getQueueInfo();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageImpl.java
new file mode 100644
index 0000000..49f4d89
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageImpl.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
+import org.apache.activemq6.utils.DataConstants;
+
+/**
+ * ClientLargeMessageImpl is only created when receiving large messages.
+ * <p>
+ * At the time of sending a regular Message is sent as we won't know the message is considered large
+ * until the buffer is filled up or the user set a streaming.
+ * @author clebertsuconic
+ */
+public final class ClientLargeMessageImpl extends ClientMessageImpl implements ClientLargeMessageInternal
+{
+
+   // Used only when receiving large messages
+   private LargeMessageController largeMessageController;
+
+   private long largeMessageSize;
+
+   /**
+    * @param largeMessageSize the largeMessageSize to set
+    */
+   public void setLargeMessageSize(long largeMessageSize)
+   {
+      this.largeMessageSize = largeMessageSize;
+   }
+
+   public long getLargeMessageSize()
+   {
+      return this.largeMessageSize;
+   }
+
+   // we only need this constructor as this is only used at decoding large messages on client
+   public ClientLargeMessageImpl()
+   {
+      super();
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public int getEncodeSize()
+   {
+      if (bodyBuffer != null)
+      {
+         return super.getEncodeSize();
+      }
+      else
+      {
+         return DataConstants.SIZE_INT + DataConstants.SIZE_INT + getHeadersAndPropertiesEncodeSize();
+      }
+   }
+
+   /**
+    * @return the largeMessage
+    */
+   @Override
+   public boolean isLargeMessage()
+   {
+      return true;
+   }
+
+   public void setLargeMessageController(final LargeMessageController controller)
+   {
+      largeMessageController = controller;
+   }
+
+   public void checkCompletion() throws HornetQException
+   {
+      checkBuffer();
+   }
+
+   @Override
+   public HornetQBuffer getBodyBuffer()
+   {
+
+      try
+      {
+         checkBuffer();
+      }
+      catch (HornetQException e)
+      {
+         throw new RuntimeException(e.getMessage(), e);
+      }
+
+      return bodyBuffer;
+   }
+
+   @Override
+   public int getBodySize()
+   {
+      return getLongProperty(Message.HDR_LARGE_BODY_SIZE).intValue();
+   }
+
+   public LargeMessageController getLargeMessageController()
+   {
+      return largeMessageController;
+   }
+
+   @Override
+   public void saveToOutputStream(final OutputStream out) throws HornetQException
+   {
+      if (bodyBuffer != null)
+      {
+         // The body was rebuilt on the client, so we need to behave as a regular message on this case
+         super.saveToOutputStream(out);
+      }
+      else
+      {
+         largeMessageController.saveBuffer(out);
+      }
+   }
+
+   @Override
+   public ClientLargeMessageImpl setOutputStream(final OutputStream out) throws HornetQException
+   {
+      if (bodyBuffer != null)
+      {
+         super.setOutputStream(out);
+      }
+      else
+      {
+         largeMessageController.setOutputStream(out);
+      }
+
+      return this;
+   }
+
+   @Override
+   public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws HornetQException
+   {
+      if (bodyBuffer != null)
+      {
+         return super.waitOutputStreamCompletion(timeMilliseconds);
+      }
+      else
+      {
+         return largeMessageController.waitCompletion(timeMilliseconds);
+      }
+   }
+
+   @Override
+   public void discardBody()
+   {
+      if (bodyBuffer != null)
+      {
+         super.discardBody();
+      }
+      else
+      {
+         largeMessageController.discardUnusedPackets();
+      }
+   }
+
+   private void checkBuffer() throws HornetQException
+   {
+      if (bodyBuffer == null)
+      {
+
+         long bodySize = this.largeMessageSize + BODY_OFFSET;
+         if (bodySize > Integer.MAX_VALUE)
+         {
+            bodySize = Integer.MAX_VALUE;
+         }
+         createBody((int)bodySize);
+
+         bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
+
+         largeMessageController.saveBuffer(new HornetQOutputStream(bodyBuffer));
+      }
+   }
+
+   // Inner classes -------------------------------------------------
+
+   private static class HornetQOutputStream extends OutputStream
+   {
+      private final HornetQBuffer bufferOut;
+
+      HornetQOutputStream(HornetQBuffer out)
+      {
+         this.bufferOut = out;
+      }
+
+      @Override
+      public void write(int b) throws IOException
+      {
+         bufferOut.writeByte((byte)(b & 0xff));
+      }
+   }
+
+   public void retrieveExistingData(ClientMessageInternal clMessage)
+   {
+      this.messageID = clMessage.getMessageID();
+      this.address = clMessage.getAddress();
+      this.setUserID(clMessage.getUserID());
+      this.setFlowControlSize(clMessage.getFlowControlSize());
+      this.setDeliveryCount(clMessage.getDeliveryCount());
+      this.type = clMessage.getType();
+      this.durable = clMessage.isDurable();
+      this.setExpiration(clMessage.getExpiration());
+      this.timestamp = clMessage.getTimestamp();
+      this.priority = clMessage.getPriority();
+      this.properties = clMessage.getProperties();
+      this.largeMessageSize = clMessage.getLongProperty(HDR_LARGE_BODY_SIZE);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageInternal.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageInternal.java
new file mode 100644
index 0000000..61fdd67
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageInternal.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+
+/**
+ * A ClientLargeMessageInternal
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface ClientLargeMessageInternal extends ClientMessageInternal
+{
+
+   void setLargeMessageController(LargeMessageController controller);
+
+   LargeMessageController getLargeMessageController();
+
+   void setLargeMessageSize(long size);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageImpl.java
new file mode 100644
index 0000000..58d29b6
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageImpl.java
@@ -0,0 +1,413 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQBuffers;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQPropertyConversionException;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+import org.apache.activemq6.core.message.BodyEncoder;
+import org.apache.activemq6.core.message.impl.MessageImpl;
+import org.apache.activemq6.reader.MessageUtil;
+
+/**
+ *
+ * A ClientMessageImpl
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ *
+ */
+public class ClientMessageImpl extends MessageImpl implements ClientMessageInternal
+{
+   // added this constant here so that the client package have no dependency on JMS
+   public static final SimpleString REPLYTO_HEADER_NAME = MessageUtil.REPLYTO_HEADER_NAME;
+
+
+   private int deliveryCount;
+
+   private ClientConsumerInternal consumer;
+
+   private int flowControlSize = -1;
+
+   /** Used on LargeMessages only */
+   private InputStream bodyInputStream;
+
+   /*
+    * Constructor for when reading from remoting
+    */
+   public ClientMessageImpl()
+   {
+   }
+
+   /*
+    * Construct messages before sending
+    */
+   public ClientMessageImpl(final byte type,
+                            final boolean durable,
+                            final long expiration,
+                            final long timestamp,
+                            final byte priority,
+                            final int initialMessageBufferSize)
+   {
+      super(type, durable, expiration, timestamp, priority, initialMessageBufferSize);
+   }
+
+   @Override
+   public boolean isServerMessage()
+   {
+      return false;
+   }
+
+   @Override
+   public void onReceipt(final ClientConsumerInternal consumer)
+   {
+      this.consumer = consumer;
+   }
+
+   @Override
+   public ClientMessageImpl setDeliveryCount(final int deliveryCount)
+   {
+      this.deliveryCount = deliveryCount;
+      return this;
+   }
+
+   @Override
+   public int getDeliveryCount()
+   {
+      return deliveryCount;
+   }
+
+   @Override
+   public ClientMessageImpl acknowledge() throws HornetQException
+   {
+      if (consumer != null)
+      {
+         consumer.acknowledge(this);
+      }
+
+      return this;
+   }
+
+   @Override
+   public ClientMessageImpl individualAcknowledge() throws HornetQException
+   {
+      if (consumer != null)
+      {
+         consumer.individualAcknowledge(this);
+      }
+
+      return this;
+   }
+
+   @Override
+   public int getFlowControlSize()
+   {
+      if (flowControlSize < 0)
+      {
+         throw new IllegalStateException("Flow Control hasn't been set");
+      }
+      return flowControlSize;
+   }
+
+   @Override
+   public void setFlowControlSize(final int flowControlSize)
+   {
+      this.flowControlSize = flowControlSize;
+   }
+
+   /**
+    * @return the largeMessage
+    */
+   @Override
+   public boolean isLargeMessage()
+   {
+      return false;
+   }
+
+   @Override
+   public boolean isCompressed()
+   {
+      return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED);
+   }
+
+   @Override
+   public int getBodySize()
+   {
+      return buffer.writerIndex() - buffer.readerIndex();
+   }
+
+   @Override
+   public String toString()
+   {
+      return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress()  + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]";
+   }
+
+   @Override
+   public void saveToOutputStream(final OutputStream out) throws HornetQException
+   {
+      try
+      {
+         byte[] readBuffer = new byte[getBodySize()];
+         getBodyBuffer().readBytes(readBuffer);
+         out.write(readBuffer);
+         out.flush();
+      }
+      catch (IOException e)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.errorSavingBody(e);
+      }
+   }
+
+   @Override
+   public ClientMessageImpl setOutputStream(final OutputStream out) throws HornetQException
+   {
+      saveToOutputStream(out);
+      return this;
+   }
+
+   @Override
+   public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws HornetQException
+   {
+      return true;
+   }
+
+   @Override
+   public void discardBody()
+   {
+   }
+
+   /**
+    * @return the bodyInputStream
+    */
+   @Override
+   public InputStream getBodyInputStream()
+   {
+      return bodyInputStream;
+   }
+
+   /**
+    * @param bodyInputStream the bodyInputStream to set
+    */
+   @Override
+   public ClientMessageImpl setBodyInputStream(final InputStream bodyInputStream)
+   {
+      this.bodyInputStream = bodyInputStream;
+      return this;
+   }
+
+   @Override
+   public BodyEncoder getBodyEncoder() throws HornetQException
+   {
+      return new DecodingContext();
+   }
+
+   @Override
+   public ClientMessageImpl putBooleanProperty(final SimpleString key, final boolean value)
+   {
+      return (ClientMessageImpl) super.putBooleanProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putByteProperty(final SimpleString key, final byte value)
+   {
+      return (ClientMessageImpl) super.putByteProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putBytesProperty(final SimpleString key, final byte[] value)
+   {
+      return (ClientMessageImpl) super.putBytesProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putCharProperty(SimpleString key, char value)
+   {
+      return (ClientMessageImpl) super.putCharProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putCharProperty(String key, char value)
+   {
+      return (ClientMessageImpl) super.putCharProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putShortProperty(final SimpleString key, final short value)
+   {
+      return (ClientMessageImpl) super.putShortProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putIntProperty(final SimpleString key, final int value)
+   {
+      return (ClientMessageImpl) super.putIntProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putLongProperty(final SimpleString key, final long value)
+   {
+      return (ClientMessageImpl) super.putLongProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putFloatProperty(final SimpleString key, final float value)
+   {
+      return (ClientMessageImpl) super.putFloatProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putDoubleProperty(final SimpleString key, final double value)
+   {
+      return (ClientMessageImpl) super.putDoubleProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putStringProperty(final SimpleString key, final SimpleString value)
+   {
+      return (ClientMessageImpl) super.putStringProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putObjectProperty(final SimpleString key, final Object value) throws HornetQPropertyConversionException
+   {
+      return (ClientMessageImpl) super.putObjectProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putObjectProperty(final String key, final Object value) throws HornetQPropertyConversionException
+   {
+      return (ClientMessageImpl) super.putObjectProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putBooleanProperty(final String key, final boolean value)
+   {
+      return (ClientMessageImpl) super.putBooleanProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putByteProperty(final String key, final byte value)
+   {
+      return (ClientMessageImpl) super.putByteProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putBytesProperty(final String key, final byte[] value)
+   {
+      return (ClientMessageImpl) super.putBytesProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putShortProperty(final String key, final short value)
+   {
+      return (ClientMessageImpl) super.putShortProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putIntProperty(final String key, final int value)
+   {
+      return (ClientMessageImpl) super.putIntProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putLongProperty(final String key, final long value)
+   {
+      return (ClientMessageImpl) super.putLongProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putFloatProperty(final String key, final float value)
+   {
+      return (ClientMessageImpl) super.putFloatProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putDoubleProperty(final String key, final double value)
+   {
+      return (ClientMessageImpl) super.putDoubleProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl putStringProperty(final String key, final String value)
+   {
+      return (ClientMessageImpl) super.putStringProperty(key, value);
+   }
+
+   @Override
+   public ClientMessageImpl writeBodyBufferBytes(byte[] bytes)
+   {
+      return (ClientMessageImpl) super.writeBodyBufferBytes(bytes);
+   }
+
+   @Override
+   public ClientMessageImpl writeBodyBufferString(String string)
+   {
+      return (ClientMessageImpl) super.writeBodyBufferString(string);
+   }
+
+   private final class DecodingContext implements BodyEncoder
+   {
+      public DecodingContext()
+      {
+      }
+
+      @Override
+      public void open()
+      {
+         getBodyBuffer().readerIndex(0);
+      }
+
+      @Override
+      public void close()
+      {
+      }
+
+      @Override
+      public long getLargeBodySize()
+      {
+         if (isLargeMessage())
+         {
+            return getBodyBuffer().writerIndex();
+         }
+         else
+         {
+            return getBodyBuffer().writerIndex() - BODY_OFFSET;
+         }
+      }
+
+      @Override
+      public int encode(final ByteBuffer bufferRead) throws HornetQException
+      {
+         HornetQBuffer buffer1 = HornetQBuffers.wrappedBuffer(bufferRead);
+         return encode(buffer1, bufferRead.capacity());
+      }
+
+      @Override
+      public int encode(final HornetQBuffer bufferOut, final int size)
+      {
+         byte[] bytes = new byte[size];
+         getWholeBuffer().readBytes(bytes);
+         bufferOut.writeBytes(bytes, 0, size);
+         return size;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageInternal.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageInternal.java
new file mode 100644
index 0000000..ab55b88
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageInternal.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.client.impl;
+
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.api.core.client.ClientMessage;
+import org.apache.activemq6.utils.TypedProperties;
+
+/**
+ * A ClientMessageInternal
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public interface ClientMessageInternal extends ClientMessage
+{
+
+   TypedProperties getProperties();
+
+   /** Size used for FlowControl */
+   int getFlowControlSize();
+
+   /** Size used for FlowControl */
+   void setFlowControlSize(int flowControlSize);
+
+   void setAddressTransient(SimpleString address);
+
+   void onReceipt(ClientConsumerInternal consumer);
+
+   /**
+    * Discard unused packets (used on large-message)
+    */
+   void discardBody();
+
+   boolean isCompressed();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManager.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManager.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManager.java
new file mode 100644
index 0000000..83360b7
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManager.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.spi.core.remoting.SessionContext;
+
+/**
+ * A ClientProducerCreditManager
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ClientProducerCreditManager
+{
+   ClientProducerCredits getCredits(SimpleString address, boolean anon, SessionContext context);
+
+   void returnCredits(SimpleString address);
+
+   void receiveCredits(SimpleString address, int credits);
+
+   void receiveFailCredits(SimpleString address, int credits);
+
+   void reset();
+
+   void close();
+
+   int creditsMapSize();
+
+   int unReferencedCreditsSize();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManagerImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManagerImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManagerImpl.java
new file mode 100644
index 0000000..c0fd364
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManagerImpl.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.spi.core.remoting.SessionContext;
+
+/**
+ * A ProducerCreditManager
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public class ClientProducerCreditManagerImpl implements ClientProducerCreditManager
+{
+   public static final int MAX_UNREFERENCED_CREDITS_CACHE_SIZE = 1000;
+
+   private final Map<SimpleString, ClientProducerCredits> producerCredits = new LinkedHashMap<SimpleString, ClientProducerCredits>();
+
+   private final Map<SimpleString, ClientProducerCredits> unReferencedCredits = new LinkedHashMap<SimpleString, ClientProducerCredits>();
+
+   private final ClientSessionInternal session;
+
+   private int windowSize;
+
+   public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize)
+   {
+      this.session = session;
+
+      this.windowSize = windowSize;
+   }
+
+   public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon, SessionContext context)
+   {
+      if (windowSize == -1)
+      {
+         return ClientProducerCreditsNoFlowControl.instance;
+      }
+      else
+      {
+         boolean needInit = false;
+         ClientProducerCredits credits;
+
+         synchronized (this)
+         {
+            credits = producerCredits.get(address);
+
+            if (credits == null)
+            {
+               // Doesn't need to be fair since session is single threaded
+               credits = new ClientProducerCreditsImpl(session, address, windowSize);
+               needInit = true;
+
+               producerCredits.put(address, credits);
+            }
+
+            if (!anon)
+            {
+               credits.incrementRefCount();
+
+               // Remove from anon credits (if there)
+               unReferencedCredits.remove(address);
+            }
+            else
+            {
+               addToUnReferencedCache(address, credits);
+            }
+         }
+
+         // The init is done outside of the lock
+         // otherwise packages may arrive with flow control
+         // while this is still sending requests causing a dead lock
+         if (needInit)
+         {
+            credits.init(context);
+         }
+
+         return credits;
+      }
+   }
+
+   public synchronized void returnCredits(final SimpleString address)
+   {
+      ClientProducerCredits credits = producerCredits.get(address);
+
+      if (credits != null && credits.decrementRefCount() == 0)
+      {
+         addToUnReferencedCache(address, credits);
+      }
+   }
+
+   public synchronized void receiveCredits(final SimpleString address, final int credits)
+   {
+      ClientProducerCredits cr = producerCredits.get(address);
+
+      if (cr != null)
+      {
+         cr.receiveCredits(credits);
+      }
+   }
+
+   public synchronized void receiveFailCredits(final SimpleString address, int credits)
+   {
+      ClientProducerCredits cr = producerCredits.get(address);
+
+      if (cr != null)
+      {
+         cr.receiveFailCredits(credits);
+      }
+   }
+
+   public synchronized void reset()
+   {
+      for (ClientProducerCredits credits : producerCredits.values())
+      {
+         credits.reset();
+      }
+   }
+
+   public synchronized void close()
+   {
+      windowSize = -1;
+
+      for (ClientProducerCredits credits : producerCredits.values())
+      {
+         credits.close();
+      }
+
+      producerCredits.clear();
+
+      unReferencedCredits.clear();
+   }
+
+   public synchronized int creditsMapSize()
+   {
+      return producerCredits.size();
+   }
+
+   public synchronized int unReferencedCreditsSize()
+   {
+      return unReferencedCredits.size();
+   }
+
+   private void addToUnReferencedCache(final SimpleString address, final ClientProducerCredits credits)
+   {
+      unReferencedCredits.put(address, credits);
+
+      if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE)
+      {
+         // Remove the oldest entry
+
+         Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator();
+
+         Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();
+
+         iter.remove();
+
+         removeEntry(oldest.getKey(), oldest.getValue());
+      }
+   }
+
+   private void removeEntry(final SimpleString address, final ClientProducerCredits credits)
+   {
+      producerCredits.remove(address);
+
+      credits.releaseOutstanding();
+
+      credits.close();
+   }
+
+
+   static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits
+   {
+      static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();
+
+      public void acquireCredits(int credits) throws InterruptedException
+      {
+      }
+
+      public void receiveCredits(int credits)
+      {
+      }
+
+      public void receiveFailCredits(int credits)
+      {
+      }
+
+      public boolean isBlocked()
+      {
+         return false;
+      }
+
+      public void init(SessionContext ctx)
+      {
+      }
+
+      public void reset()
+      {
+      }
+
+      public void close()
+      {
+      }
+
+      public void incrementRefCount()
+      {
+      }
+
+      public int decrementRefCount()
+      {
+         return 1;
+      }
+
+      public void releaseOutstanding()
+      {
+      }
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCredits.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCredits.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCredits.java
new file mode 100644
index 0000000..7ef4916
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCredits.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.spi.core.remoting.SessionContext;
+
+/**
+ * A ClientProducerCredits
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ClientProducerCredits
+{
+   void acquireCredits(int credits) throws InterruptedException, HornetQException;
+
+   void receiveCredits(int credits);
+
+   void receiveFailCredits(int credits);
+
+   boolean isBlocked();
+
+   void init(SessionContext sessionContext);
+
+   void reset();
+
+   void close();
+
+   void incrementRefCount();
+
+   int decrementRefCount();
+
+   void releaseOutstanding();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditsImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditsImpl.java
new file mode 100644
index 0000000..2b420d5
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditsImpl.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+import org.apache.activemq6.spi.core.remoting.SessionContext;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A ClientProducerCreditsImpl
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public class ClientProducerCreditsImpl implements ClientProducerCredits
+{
+   private final Semaphore semaphore;
+
+   private final int windowSize;
+
+   private volatile boolean closed;
+
+   private boolean blocked;
+
+   private final SimpleString address;
+
+   private final ClientSessionInternal session;
+
+   private int pendingCredits;
+
+   private int arriving;
+
+   private int refCount;
+
+   private boolean serverRespondedWithFail;
+
+   private SessionContext sessionContext;
+
+   public ClientProducerCreditsImpl(final ClientSessionInternal session,
+                                    final SimpleString address,
+                                    final int windowSize)
+   {
+      this.session = session;
+
+      this.address = address;
+
+      this.windowSize = windowSize / 2;
+
+      // Doesn't need to be fair since session is single threaded
+
+      semaphore = new Semaphore(0, false);
+   }
+
+   public void init(SessionContext sessionContext)
+   {
+      // We initial request twice as many credits as we request in subsequent requests
+      // This allows the producer to keep sending as more arrive, minimising pauses
+      checkCredits(windowSize);
+
+      this.sessionContext = sessionContext;
+
+      this.sessionContext.linkFlowControl(address, this);
+   }
+
+   public void acquireCredits(final int credits) throws InterruptedException, HornetQException
+   {
+      checkCredits(credits);
+
+
+      boolean tryAcquire;
+
+      synchronized (this)
+      {
+         tryAcquire = semaphore.tryAcquire(credits);
+      }
+
+      if (!tryAcquire)
+      {
+         if (!closed)
+         {
+            this.blocked = true;
+            try
+            {
+               while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS))
+               {
+                  // I'm using string concatenation here in case address is null
+                  // better getting a "null" string than a NPE
+                  HornetQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
+               }
+            }
+            finally
+            {
+               this.blocked = false;
+            }
+         }
+      }
+
+
+      synchronized (this)
+      {
+         pendingCredits -= credits;
+      }
+
+      // check to see if the blocking mode is FAIL on the server
+      synchronized (this)
+      {
+         if (serverRespondedWithFail)
+         {
+            serverRespondedWithFail = false;
+
+            // remove existing credits to force the client to ask the server for more on the next send
+            semaphore.drainPermits();
+            pendingCredits = 0;
+            arriving = 0;
+
+            throw HornetQClientMessageBundle.BUNDLE.addressIsFull(address.toString(), credits);
+         }
+      }
+   }
+
+   public boolean isBlocked()
+   {
+      return blocked;
+   }
+
+   public int getBalance()
+   {
+      return semaphore.availablePermits();
+   }
+
+   public void receiveCredits(final int credits)
+   {
+      synchronized (this)
+      {
+         arriving -= credits;
+      }
+
+      semaphore.release(credits);
+   }
+
+   public void receiveFailCredits(final int credits)
+   {
+      serverRespondedWithFail = true;
+      // receive credits like normal to keep the sender from blocking
+      receiveCredits(credits);
+   }
+
+   public synchronized void reset()
+   {
+      // Any pendingCredits credits from before failover won't arrive, so we re-initialise
+
+      semaphore.drainPermits();
+
+      int beforeFailure = pendingCredits;
+
+      pendingCredits = 0;
+      arriving = 0;
+
+      // If we are waiting for more credits than what's configured, then we need to use what we tried before
+      // otherwise the client may starve as the credit will never arrive
+      checkCredits(Math.max(windowSize * 2, beforeFailure));
+   }
+
+   public void close()
+   {
+      // Closing a producer that is blocking should make it return
+      closed = true;
+
+      semaphore.release(Integer.MAX_VALUE / 2);
+   }
+
+   public synchronized void incrementRefCount()
+   {
+      refCount++;
+   }
+
+   public synchronized int decrementRefCount()
+   {
+      return --refCount;
+   }
+
+   public synchronized void releaseOutstanding()
+   {
+      semaphore.drainPermits();
+   }
+
+   private void checkCredits(final int credits)
+   {
+      int needed = Math.max(credits, windowSize);
+
+      int toRequest = -1;
+
+      synchronized (this)
+      {
+         if (semaphore.availablePermits() + arriving < needed)
+         {
+            toRequest = needed - arriving;
+
+            pendingCredits += toRequest;
+            arriving += toRequest;
+         }
+      }
+
+      if (toRequest != -1)
+      {
+         requestCredits(toRequest);
+      }
+   }
+
+   private void requestCredits(final int credits)
+   {
+      session.sendProducerCreditsMessage(credits, address);
+   }
+}
\ No newline at end of file


Mime
View raw message