activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [18/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:00:48 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/NamedLiveNodeLocatorForReplication.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/NamedLiveNodeLocatorForReplication.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/NamedLiveNodeLocatorForReplication.java
index 0980ec1..98e84e2 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/NamedLiveNodeLocatorForReplication.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/NamedLiveNodeLocatorForReplication.java
@@ -29,7 +29,7 @@ import org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum;
  * NamedLiveNodeLocatorForReplication looks for a live server in the cluster with a specific backupGroupName
  *
  * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * @see org.hornetq.core.server.cluster.ha.HAPolicy#getBackupGroupName()
+ * @see org.hornetq.core.server.cluster.ha.HAPolicy#getGroupName()
  */
 public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator
 {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/PostOfficeJournalLoader.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/PostOfficeJournalLoader.java
index ced98fe..8fba846 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/PostOfficeJournalLoader.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/PostOfficeJournalLoader.java
@@ -122,7 +122,7 @@ public class PostOfficeJournalLoader implements JournalLoader
 
             if (isTopicIdentification)
             {
-               long tx = storageManager.generateUniqueID();
+               long tx = storageManager.generateID();
                storageManager.deleteQueueBinding(tx, queueBindingInfo.getId());
                storageManager.commitBindings(tx);
                continue;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
index 030c596..f4e8fe9 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
@@ -14,6 +14,7 @@ package org.hornetq.core.server.impl;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -33,10 +34,13 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.management.CoreNotificationType;
+import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.message.impl.MessageImpl;
@@ -47,8 +51,11 @@ import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.server.Consumer;
 import org.hornetq.core.server.HandleStatus;
+import org.hornetq.core.server.HornetQMessageBundle;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServerLogger;
 import org.hornetq.core.server.MessageReference;
@@ -58,14 +65,17 @@ import org.hornetq.core.server.ScheduledDeliveryHandler;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.cluster.RemoteQueueBinding;
 import org.hornetq.core.server.cluster.impl.Redistributor;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.HierarchicalRepositoryChangeListener;
 import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.settings.impl.SlowConsumerPolicy;
 import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperationAbstract;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.BindingsTransactionImpl;
 import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.FutureLatch;
 import org.hornetq.utils.LinkedListIterator;
@@ -73,6 +83,7 @@ import org.hornetq.utils.PriorityLinkedList;
 import org.hornetq.utils.PriorityLinkedListImpl;
 import org.hornetq.utils.ReferenceCounter;
 import org.hornetq.utils.ReusableLatch;
+import org.hornetq.utils.TypedProperties;
 
 /**
  * Implementation of a Queue
@@ -149,6 +160,8 @@ public class QueueImpl implements Queue
 
    private long messagesAdded;
 
+   private long messagesAcknowledged;
+
    protected final AtomicInteger deliveringCount = new AtomicInteger(0);
 
    private boolean paused;
@@ -204,6 +217,14 @@ public class QueueImpl implements Queue
 
    private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);
 
+   private AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
+
+   private AtomicLong messagesAddedSnapshot = new AtomicLong(0);
+
+   private ScheduledFuture slowConsumerReaperFuture;
+
+   private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
+
    /**
     * This is to avoid multi-thread races on calculating direct delivery,
     * to guarantee ordering will be always be correct
@@ -982,21 +1003,6 @@ public class QueueImpl implements Queue
 
    public long getMessageCount()
    {
-      return getMessageCount(FLUSH_TIMEOUT);
-   }
-
-   public long getMessageCount(final long timeout)
-   {
-      if (timeout > 0)
-      {
-         internalFlushExecutor(timeout);
-      }
-      return getInstantMessageCount();
-   }
-
-
-   public long getInstantMessageCount()
-   {
       synchronized (this)
       {
          if (pageSubscription != null)
@@ -1068,6 +1074,8 @@ public class QueueImpl implements Queue
          postAcknowledge(ref);
       }
 
+      messagesAcknowledged++;
+
    }
 
    public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception
@@ -1093,6 +1101,8 @@ public class QueueImpl implements Queue
 
          getRefsOperation(tx).addAck(ref);
       }
+
+      messagesAcknowledged++;
    }
 
    public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception
@@ -1108,6 +1118,8 @@ public class QueueImpl implements Queue
 
       // https://issues.jboss.org/browse/HORNETQ-609
       incDelivering();
+
+      messagesAcknowledged++;
    }
 
    private RefsOperation getRefsOperation(final Transaction tx)
@@ -1123,7 +1135,7 @@ public class QueueImpl implements Queue
 
          if (oper == null)
          {
-            oper = new RefsOperation();
+            oper = tx.createRefsOperation(this);
 
             tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
 
@@ -1209,17 +1221,6 @@ public class QueueImpl implements Queue
 
    public long getMessagesAdded()
    {
-      return getMessagesAdded(FLUSH_TIMEOUT);
-   }
-
-   public long getMessagesAdded(final long timeout)
-   {
-      if (timeout > 0) internalFlushExecutor(timeout);
-      return getInstantMessagesAdded();
-   }
-
-   public synchronized long getInstantMessagesAdded()
-   {
       if (pageSubscription != null)
       {
          return messagesAdded + pageSubscription.getCounter().getValue() - pagedReferences.get();
@@ -1230,6 +1231,10 @@ public class QueueImpl implements Queue
       }
    }
 
+   public long getMessagesAcknowledged()
+   {
+      return messagesAcknowledged;
+   }
 
    public int deleteAllReferences() throws Exception
    {
@@ -1410,6 +1415,12 @@ public class QueueImpl implements Queue
             }
          }
 
+         if (!deleted)
+         {
+            // Look in scheduled deliveries
+            deleted = scheduledDeliveryHandler.removeReferenceWithID(messageID) != null ? true : false;
+         }
+
          tx.commit();
 
          return deleted;
@@ -1456,6 +1467,11 @@ public class QueueImpl implements Queue
             tx.setContainsPersistent();
          }
 
+         if (slowConsumerReaperFuture != null)
+         {
+            slowConsumerReaperFuture.cancel(false);
+         }
+
          tx.commit();
       }
       catch (Exception e)
@@ -2545,7 +2561,7 @@ public class QueueImpl implements Queue
        and original message id
       */
 
-      long newID = storageManager.generateUniqueID();
+      long newID = storageManager.generateID();
 
       ServerMessage copy = message.makeCopyForExpiryOrDLA(newID, ref, expiry, copyOriginalHeaders);
 
@@ -2578,7 +2594,7 @@ public class QueueImpl implements Queue
    }
 
 
-   private void sendToDeadLetterAddress(final MessageReference ref) throws Exception
+   public void sendToDeadLetterAddress(final MessageReference ref) throws Exception
    {
       sendToDeadLetterAddress(ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
    }
@@ -2804,8 +2820,7 @@ public class QueueImpl implements Queue
       return consumerListClone;
    }
 
-   // Protected as testcases may change this behaviour
-   protected void postAcknowledge(final MessageReference ref)
+   public void postAcknowledge(final MessageReference ref)
    {
       QueueImpl queue = (QueueImpl) ref.getQueue();
 
@@ -2889,6 +2904,21 @@ public class QueueImpl implements Queue
       messagesAdded = 0;
    }
 
+   public synchronized void resetMessagesAcknowledged()
+   {
+      messagesAcknowledged = 0;
+   }
+
+   public float getRate()
+   {
+      float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
+      if (timeSlice == 0)
+      {
+         messagesAddedSnapshot.getAndSet(messagesAdded);
+         return 0.0f;
+      }
+      return BigDecimal.valueOf((messagesAdded - messagesAddedSnapshot.getAndSet(messagesAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
+   }
 
    // Inner classes
    // --------------------------------------------------------------------------
@@ -2906,189 +2936,6 @@ public class QueueImpl implements Queue
 
    }
 
-   public final class RefsOperation extends TransactionOperationAbstract
-   {
-      List<MessageReference> refsToAck = new ArrayList<MessageReference>();
-
-      List<ServerMessage> pagedMessagesToPostACK = null;
-
-      /**
-       * It will ignore redelivery check, which is used during consumer.close
-       * to not perform reschedule redelivery check
-       */
-      protected boolean ignoreRedeliveryCheck = false;
-
-
-      // once turned on, we shouldn't turn it off, that's why no parameters
-      public void setIgnoreRedeliveryCheck()
-      {
-         ignoreRedeliveryCheck = true;
-      }
-
-      synchronized void addAck(final MessageReference ref)
-      {
-         refsToAck.add(ref);
-         if (ref.isPaged())
-         {
-            if (pagedMessagesToPostACK == null)
-            {
-               pagedMessagesToPostACK = new ArrayList<ServerMessage>();
-            }
-            pagedMessagesToPostACK.add(ref.getMessage());
-         }
-      }
-
-      @Override
-      public void afterRollback(final Transaction tx)
-      {
-         Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<QueueImpl, LinkedList<MessageReference>>();
-
-         long timeBase = System.currentTimeMillis();
-
-         //add any already acked refs, this means that they have been transferred via a producer.send() and have no
-         // previous state persisted.
-         List<MessageReference> ackedRefs = new ArrayList<>();
-
-         for (MessageReference ref : refsToAck)
-         {
-            ref.setConsumerId(null);
-
-            if (HornetQServerLogger.LOGGER.isTraceEnabled())
-            {
-               HornetQServerLogger.LOGGER.trace("rolling back " + ref);
-            }
-            try
-            {
-               if (ref.isAlreadyAcked())
-               {
-                  ackedRefs.add(ref);
-               }
-               // if ignore redelivery check, we just perform redelivery straight
-               if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck))
-               {
-                  LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
-
-                  if (toCancel == null)
-                  {
-                     toCancel = new LinkedList<MessageReference>();
-
-                     queueMap.put((QueueImpl) ref.getQueue(), toCancel);
-                  }
-
-                  toCancel.addFirst(ref);
-               }
-            }
-            catch (Exception e)
-            {
-               HornetQServerLogger.LOGGER.errorCheckingDLQ(e);
-            }
-         }
-
-         for (Map.Entry<QueueImpl, LinkedList<MessageReference>> entry : queueMap.entrySet())
-         {
-            LinkedList<MessageReference> refs = entry.getValue();
-
-            QueueImpl queue = entry.getKey();
-
-            synchronized (queue)
-            {
-               queue.postRollback(refs);
-            }
-         }
-
-         if (!ackedRefs.isEmpty())
-         {
-            //since pre acked refs have no previous state we need to actually create this by storing the message and the
-            //message references
-            try
-            {
-               Transaction ackedTX = new TransactionImpl(storageManager);
-               for (MessageReference ref : ackedRefs)
-               {
-                  ServerMessage message = ref.getMessage();
-                  if (message.isDurable())
-                  {
-                     int durableRefCount = message.incrementDurableRefCount();
-
-                     if (durableRefCount == 1)
-                     {
-                        storageManager.storeMessageTransactional(ackedTX.getID(), message);
-                     }
-                     Queue queue = ref.getQueue();
-
-                     storageManager.storeReferenceTransactional(ackedTX.getID(), queue.getID(), message.getMessageID());
-
-                     ackedTX.setContainsPersistent();
-                  }
-
-                  message.incrementRefCount();
-               }
-               ackedTX.commit(true);
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      }
-
-      @Override
-      public void afterCommit(final Transaction tx)
-      {
-         for (MessageReference ref : refsToAck)
-         {
-            synchronized (ref.getQueue())
-            {
-               postAcknowledge(ref);
-            }
-         }
-
-         if (pagedMessagesToPostACK != null)
-         {
-            for (ServerMessage msg : pagedMessagesToPostACK)
-            {
-               try
-               {
-                  msg.decrementRefCount();
-               }
-               catch (Exception e)
-               {
-                  HornetQServerLogger.LOGGER.warn(e.getMessage(), e);
-               }
-            }
-         }
-      }
-
-      @Override
-      public synchronized List<MessageReference> getRelatedMessageReferences()
-      {
-         List<MessageReference> listRet = new LinkedList<MessageReference>();
-         listRet.addAll(listRet);
-         return listRet;
-      }
-
-      @Override
-      public synchronized List<MessageReference> getListOnConsumer(long consumerID)
-      {
-         List<MessageReference> list = new LinkedList<MessageReference>();
-         for (MessageReference ref : refsToAck)
-         {
-            if (ref.getConsumerId() != null && ref.getConsumerId().equals(consumerID))
-            {
-               list.add(ref);
-            }
-         }
-
-         return list;
-      }
-
-      public List<MessageReference> getReferencesToAcknowledge()
-      {
-         return refsToAck;
-      }
-
-   }
-
    private class DelayedAddRedistributor implements Runnable
    {
       private final Executor executor1;
@@ -3309,16 +3156,66 @@ public class QueueImpl implements Queue
       return deliveringCount.incrementAndGet();
    }
 
-   private void decDelivering()
+   public void decDelivering()
    {
       deliveringCount.decrementAndGet();
    }
 
-   private void configureExpiry(final SimpleString expiryAddressArgument)
+   private void configureExpiry(final AddressSettings settings)
    {
-      this.expiryAddress = expiryAddressArgument;
+      this.expiryAddress = settings == null ? null : settings.getExpiryAddress();
    }
 
+   private void configureSlowConsumerReaper(final AddressSettings settings)
+   {
+      if (settings == null || settings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD)
+      {
+         if (slowConsumerReaperFuture != null)
+         {
+            slowConsumerReaperFuture.cancel(false);
+            slowConsumerReaperFuture = null;
+            slowConsumerReaperRunnable = null;
+            if (HornetQServerLogger.LOGGER.isDebugEnabled())
+            {
+               HornetQServerLogger.LOGGER.debug("Cancelled slow-consumer-reaper thread for queue \"" + getName() + "\"");
+            }
+         }
+      }
+      else
+      {
+         if (slowConsumerReaperRunnable == null)
+         {
+            scheduleSlowConsumerReaper(settings);
+         }
+         else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() ||
+            slowConsumerReaperRunnable.threshold != settings.getSlowConsumerThreshold() ||
+            !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy()))
+         {
+            slowConsumerReaperFuture.cancel(false);
+            scheduleSlowConsumerReaper(settings);
+         }
+      }
+   }
+
+   void scheduleSlowConsumerReaper(AddressSettings settings)
+   {
+      slowConsumerReaperRunnable = new SlowConsumerReaperRunnable(settings.getSlowConsumerCheckPeriod(),
+                                                                  settings.getSlowConsumerThreshold(),
+                                                                  settings.getSlowConsumerPolicy());
+
+      slowConsumerReaperFuture = scheduledExecutor.scheduleWithFixedDelay(slowConsumerReaperRunnable,
+                                                                          settings.getSlowConsumerCheckPeriod(),
+                                                                          settings.getSlowConsumerCheckPeriod(),
+                                                                          TimeUnit.SECONDS);
+
+      if (HornetQServerLogger.LOGGER.isDebugEnabled())
+      {
+         HornetQServerLogger.LOGGER.debug("Scheduled slow-consumer-reaper thread for queue \"" + getName() +
+                                             "\"; slow-consumer-check-period=" + settings.getSlowConsumerCheckPeriod() +
+                                             ", slow-consumer-threshold=" + settings.getSlowConsumerThreshold() +
+                                             ", slow-consumer-policy=" + settings.getSlowConsumerPolicy());
+      }
+   }
 
    private class AddressSettingsRepositoryListener implements HierarchicalRepositoryChangeListener
    {
@@ -3326,13 +3223,103 @@ public class QueueImpl implements Queue
       public void onChange()
       {
          AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
-         if (settings == null)
+         configureExpiry(settings);
+         configureSlowConsumerReaper(settings);
+      }
+   }
+
+   private final class SlowConsumerReaperRunnable implements Runnable
+   {
+      private SlowConsumerPolicy policy;
+      private float threshold;
+      private long checkPeriod;
+
+      public SlowConsumerReaperRunnable(long checkPeriod, float threshold, SlowConsumerPolicy policy)
+      {
+         this.checkPeriod = checkPeriod;
+         this.policy = policy;
+         this.threshold = threshold;
+      }
+
+      @Override
+      public void run()
+      {
+         float queueRate = getRate();
+         if (HornetQServerLogger.LOGGER.isDebugEnabled())
          {
-            configureExpiry(null);
+            HornetQServerLogger.LOGGER.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
          }
-         else
+         for (Consumer consumer : getConsumers())
          {
-            configureExpiry(settings.getExpiryAddress());
+            if (consumer instanceof ServerConsumerImpl)
+            {
+               ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
+               float consumerRate = serverConsumer.getRate();
+               if (queueRate < threshold)
+               {
+                  if (HornetQServerLogger.LOGGER.isDebugEnabled())
+                  {
+                     HornetQServerLogger.LOGGER.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
+                  }
+               }
+               else if (consumerRate < threshold)
+               {
+                  RemotingConnection connection = null;
+                  RemotingService remotingService = ((PostOfficeImpl) postOffice).getServer().getRemotingService();
+
+                  for (RemotingConnection potentialConnection : remotingService.getConnections())
+                  {
+                     if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID()))
+                     {
+                        connection = potentialConnection;
+                     }
+                  }
+
+                  if (connection != null)
+                  {
+                     HornetQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
+                     if (policy.equals(SlowConsumerPolicy.KILL))
+                     {
+                        remotingService.removeConnection(connection.getID());
+                        connection.fail(HornetQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));
+                     }
+                     else if (policy.equals(SlowConsumerPolicy.NOTIFY))
+                     {
+                        TypedProperties props = new TypedProperties();
+
+                        props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, getConsumerCount());
+
+                        props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address);
+
+                        if (connection != null)
+                        {
+                           props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(connection.getRemoteAddress()));
+
+                           if (connection.getID() != null)
+                           {
+                              props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(connection.getID().toString()));
+                           }
+                        }
+
+                        props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, serverConsumer.getID());
+
+                        props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(serverConsumer.getSessionID()));
+
+                        Notification notification = new Notification(null, CoreNotificationType.CONSUMER_SLOW, props);
+
+                        ManagementService managementService = ((PostOfficeImpl) postOffice).getServer().getManagementService();
+                        try
+                        {
+                           managementService.sendNotification(notification);
+                        }
+                        catch (Exception e)
+                        {
+                           HornetQServerLogger.LOGGER.failedToSendSlowConsumerNotification(notification, e);
+                        }
+                     }
+                  }
+               }
+            }
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/RefsOperation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/RefsOperation.java
new file mode 100644
index 0000000..8a7a87c
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/RefsOperation.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.impl;
+
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
+import org.hornetq.core.transaction.impl.TransactionImpl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class RefsOperation extends TransactionOperationAbstract
+{
+   private final StorageManager storageManager;
+   private Queue queue;
+   List<MessageReference> refsToAck = new ArrayList<MessageReference>();
+
+   List<ServerMessage> pagedMessagesToPostACK = null;
+
+   /**
+    * It will ignore redelivery check, which is used during consumer.close
+    * to not perform reschedule redelivery check
+    */
+   protected boolean ignoreRedeliveryCheck = false;
+
+   public RefsOperation(Queue queue, StorageManager storageManager)
+   {
+      this.queue = queue;
+      this.storageManager = storageManager;
+   }
+
+   // once turned on, we shouldn't turn it off, that's why no parameters
+   public void setIgnoreRedeliveryCheck()
+   {
+      ignoreRedeliveryCheck = true;
+   }
+
+   synchronized void addAck(final MessageReference ref)
+   {
+      refsToAck.add(ref);
+      if (ref.isPaged())
+      {
+         if (pagedMessagesToPostACK == null)
+         {
+            pagedMessagesToPostACK = new ArrayList<ServerMessage>();
+         }
+         pagedMessagesToPostACK.add(ref.getMessage());
+      }
+   }
+
+   @Override
+   public void afterRollback(final Transaction tx)
+   {
+      Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<QueueImpl, LinkedList<MessageReference>>();
+
+      long timeBase = System.currentTimeMillis();
+
+      //add any already acked refs, this means that they have been transferred via a producer.send() and have no
+      // previous state persisted.
+      List<MessageReference> ackedRefs = new ArrayList<>();
+
+      for (MessageReference ref : refsToAck)
+      {
+         ref.setConsumerId(null);
+
+         if (HornetQServerLogger.LOGGER.isTraceEnabled())
+         {
+            HornetQServerLogger.LOGGER.trace("rolling back " + ref);
+         }
+         try
+         {
+            if (ref.isAlreadyAcked())
+            {
+               ackedRefs.add(ref);
+            }
+            // if ignore redelivery check, we just perform redelivery straight
+            if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck))
+            {
+               LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
+
+               if (toCancel == null)
+               {
+                  toCancel = new LinkedList<MessageReference>();
+
+                  queueMap.put((QueueImpl) ref.getQueue(), toCancel);
+               }
+
+               toCancel.addFirst(ref);
+            }
+         }
+         catch (Exception e)
+         {
+            HornetQServerLogger.LOGGER.errorCheckingDLQ(e);
+         }
+      }
+
+      for (Map.Entry<QueueImpl, LinkedList<MessageReference>> entry : queueMap.entrySet())
+      {
+         LinkedList<MessageReference> refs = entry.getValue();
+
+         QueueImpl queue = entry.getKey();
+
+         synchronized (queue)
+         {
+            queue.postRollback(refs);
+         }
+      }
+
+      if (!ackedRefs.isEmpty())
+      {
+         //since pre acked refs have no previous state we need to actually create this by storing the message and the
+         //message references
+         try
+         {
+            Transaction ackedTX = new TransactionImpl(storageManager);
+            for (MessageReference ref : ackedRefs)
+            {
+               ServerMessage message = ref.getMessage();
+               if (message.isDurable())
+               {
+                  int durableRefCount = message.incrementDurableRefCount();
+
+                  if (durableRefCount == 1)
+                  {
+                     storageManager.storeMessageTransactional(ackedTX.getID(), message);
+                  }
+                  Queue queue = ref.getQueue();
+
+                  storageManager.storeReferenceTransactional(ackedTX.getID(), queue.getID(), message.getMessageID());
+
+                  ackedTX.setContainsPersistent();
+               }
+
+               message.incrementRefCount();
+            }
+            ackedTX.commit(true);
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+      }
+   }
+
+   @Override
+   public void afterCommit(final Transaction tx)
+   {
+      for (MessageReference ref : refsToAck)
+      {
+         synchronized (ref.getQueue())
+         {
+            queue.postAcknowledge(ref);
+         }
+      }
+
+      if (pagedMessagesToPostACK != null)
+      {
+         for (ServerMessage msg : pagedMessagesToPostACK)
+         {
+            try
+            {
+               msg.decrementRefCount();
+            }
+            catch (Exception e)
+            {
+               HornetQServerLogger.LOGGER.warn(e.getMessage(), e);
+            }
+         }
+      }
+   }
+
+   @Override
+   public synchronized List<MessageReference> getRelatedMessageReferences()
+   {
+      List<MessageReference> listRet = new LinkedList<MessageReference>();
+      listRet.addAll(listRet);
+      return listRet;
+   }
+
+   @Override
+   public synchronized List<MessageReference> getListOnConsumer(long consumerID)
+   {
+      List<MessageReference> list = new LinkedList<MessageReference>();
+      for (MessageReference ref : refsToAck)
+      {
+         if (ref.getConsumerId() != null && ref.getConsumerId().equals(consumerID))
+         {
+            list.add(ref);
+         }
+      }
+
+      return list;
+   }
+
+   public List<MessageReference> getReferencesToAcknowledge()
+   {
+      return refsToAck;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ReplicationError.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ReplicationError.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ReplicationError.java
index f7625e6..7bcb040 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ReplicationError.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ReplicationError.java
@@ -30,7 +30,7 @@ import org.hornetq.spi.core.protocol.RemotingConnection;
  * Stops the backup in case of an error at the start of Replication.
  * <p>
  * Using an interceptor for the task to avoid a server reference inside of the 'basic' channel-0
- * handler at {@link ClientSessionFactoryImpl#Channel0Handler}. As {@link ClientSessionFactoryImpl}
+ * handler at {@link org.hornetq.core.protocol.core.impl.HornetQClientProtocolManager.Channel0Handler}. As {@link org.hornetq.core.protocol.core.impl.HornetQClientProtocolManager}
  * is also shipped in the HQ-client JAR (which does not include {@link HornetQServer}).
  */
 final class ReplicationError implements Interceptor

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ScaleDownHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ScaleDownHandler.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ScaleDownHandler.java
index dd166f9..c7ce890 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ScaleDownHandler.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ScaleDownHandler.java
@@ -80,6 +80,7 @@ public class ScaleDownHandler
       ClusterControl clusterControl = clusterController.connectToNodeInCluster((ClientSessionFactoryInternal) sessionFactory);
       clusterControl.authorize();
       long num = scaleDownMessages(sessionFactory, targetNodeId);
+      HornetQServerLogger.LOGGER.info("Scaled down " + num + " messages total.");
       scaleDownTransactions(sessionFactory, resourceManager);
       scaleDownDuplicateIDs(duplicateIDMap, sessionFactory, managementAddress);
       clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId());
@@ -107,11 +108,8 @@ public class ScaleDownHandler
             boolean storeAndForward = false;
             if (address.toString().startsWith("sf."))
             {
-               if (address.toString().endsWith(targetNodeId))
-               {
-                  // send messages in this queue to the original address
-                  storeAndForward = true;
-               }
+               // these get special treatment later
+               storeAndForward = true;
             }
 
             // this means we haven't inspected this address before
@@ -159,10 +157,72 @@ public class ScaleDownHandler
                      // loop through every message of this queue
                      while (bigLoopMessageIterator.hasNext())
                      {
+                        MessageReference bigLoopRef = bigLoopMessageIterator.next();
+                        Message message = bigLoopRef.getMessage().copy();
+
                         if (storeAndForward)
                         {
-                           MessageReference bigLoopRef = bigLoopMessageIterator.next();
-                           Message message = bigLoopRef.getMessage();
+                           if (address.toString().endsWith(targetNodeId))
+                           {
+                              /* Here we are taking messages out of a store-and-forward queue and sending them to the corresponding
+                               * address on the scale-down target server.  However, we have to take the existing _HQ_ROUTE_TOsf.*
+                               * property and put its value into the _HQ_ROUTE_TO property so the message is routed properly.
+                               */
+
+                              byte[] oldRouteToIDs = null;
+
+                              List<SimpleString> propertiesToRemove = new ArrayList<>();
+                              message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
+                              for (SimpleString propName : message.getPropertyNames())
+                              {
+                                 if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
+                                 {
+                                    if (propName.toString().endsWith(targetNodeId))
+                                    {
+                                       oldRouteToIDs = message.getBytesProperty(propName);
+                                    }
+                                    propertiesToRemove.add(propName);
+                                 }
+                              }
+
+                              for (SimpleString propertyToRemove : propertiesToRemove)
+                              {
+                                 message.removeProperty(propertyToRemove);
+                              }
+
+                              message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, oldRouteToIDs);
+                           }
+                           else
+                           {
+                              /* Here we are taking messages out of a store-and-forward queue and sending them to the corresponding
+                               * store-and-forward address on the scale-down target server.  In this case we use a special property
+                               * for the queue ID so that the scale-down target server can route it appropriately.
+                               */
+                              byte[] oldRouteToIDs = null;
+
+                              List<SimpleString> propertiesToRemove = new ArrayList<>();
+                              message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
+                              for (SimpleString propName : message.getPropertyNames())
+                              {
+                                 if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
+                                 {
+                                    if (propName.toString().endsWith(address.toString().substring(address.toString().lastIndexOf("."))))
+                                    {
+                                       oldRouteToIDs = message.getBytesProperty(propName);
+                                    }
+                                    propertiesToRemove.add(propName);
+                                 }
+                              }
+
+                              for (SimpleString propertyToRemove : propertiesToRemove)
+                              {
+                                 message.removeProperty(propertyToRemove);
+                              }
+
+                              message.putBytesProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS, oldRouteToIDs);
+                           }
+
+                           HornetQServerLogger.LOGGER.debug("Scaling down message " + message + " from " + address + " to " + message.getAddress() + " on node " + targetNodeId);
                            producer.send(message.getAddress(), message);
                            messageCount++;
                            bigLoopQueue.deleteReference(message.getMessageID());
@@ -171,8 +231,7 @@ public class ScaleDownHandler
                         {
                            List<Queue> queuesWithMessage = new ArrayList<>();
                            queuesWithMessage.add(bigLoopQueue);
-                           MessageReference bigLoopRef = bigLoopMessageIterator.next();
-                           long messageId = bigLoopRef.getMessage().getMessageID();
+                           long messageId = message.getMessageID();
 
                            getQueuesWithMessage(store, queues, queueIterators, checkedQueues, bigLoopQueue, queuesWithMessage, bigLoopRef, messageId);
 
@@ -200,9 +259,8 @@ public class ScaleDownHandler
                            }
 
                            logMessage.delete(logMessage.length() - 2, logMessage.length());  // trim off the trailing comma and space
-                           HornetQServerLogger.LOGGER.info(logMessage.append(" on address ").append(address));
+                           HornetQServerLogger.LOGGER.debug(logMessage.append(" on address ").append(address));
 
-                           Message message = bigLoopRef.getMessage();
                            message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
                            //we need this incase we are sending back to the source server of the message, this basically
                            //acts like the bridge and ignores dup detection
@@ -258,6 +316,7 @@ public class ScaleDownHandler
       Map<String, Long> queueIDs = new HashMap<>();
       for (Xid xid : preparedTransactions)
       {
+         HornetQServerLogger.LOGGER.debug("Scaling down transaction: " + xid);
          Transaction transaction = resourceManager.getTransaction(xid);
          session.start(xid, XAResource.TMNOFLAGS);
          List<TransactionOperation> allOperations = transaction.getAllOperations();
@@ -293,9 +352,9 @@ public class ScaleDownHandler
                   queueIds.getA().add(queueID);
                }
             }
-            else if (operation instanceof QueueImpl.RefsOperation)
+            else if (operation instanceof RefsOperation)
             {
-               QueueImpl.RefsOperation refsOperation = (QueueImpl.RefsOperation) operation;
+               RefsOperation refsOperation = (RefsOperation) operation;
                List<MessageReference> refs = refsOperation.getReferencesToAcknowledge();
                for (MessageReference ref : refs)
                {
@@ -456,6 +515,7 @@ public class ScaleDownHandler
       if (queueID == -1)
       {
          session.createQueue(addressName, queue.getName(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.isDurable());
+         HornetQServerLogger.LOGGER.debug("Failed to get queue ID, creating queue [addressName=" + addressName + ", queueName=" + queue.getName() + ", filter=" + (queue.getFilter() == null ? "" : queue.getFilter().getFilterString()) + ", durable=" + queue.isDurable() + "]");
          queueID = getQueueID(session, queue.getName());
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
index 2bc2e26..2d1f8a3 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
@@ -12,12 +12,14 @@
  */
 package org.hornetq.core.server.impl;
 
+import java.math.BigDecimal;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -26,8 +28,8 @@ import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.HornetQIllegalStateException;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ClientConsumerImpl;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.message.BodyEncoder;
@@ -73,7 +75,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
 
    private final long id;
 
-   private final Queue messageQueue;
+   protected final Queue messageQueue;
 
    private final Filter filter;
 
@@ -85,6 +87,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
 
    private final boolean supportLargeMessage;
 
+   private Object protocolContext;
+
    /**
     * We get a readLock when a message is handled, and return the readLock when the message is finally delivered
     * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
@@ -109,13 +113,13 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
     */
    private final boolean browseOnly;
 
-   private BrowserDeliverer browserDeliverer;
+   protected BrowserDeliverer browserDeliverer;
 
    private final boolean strictUpdateDeliveryCount;
 
    private final StorageManager storageManager;
 
-   private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
+   protected final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
 
    private final SessionCallback callback;
 
@@ -135,6 +139,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
 
    private final long creationTime;
 
+   private AtomicLong consumerRateCheckTime = new AtomicLong(System.currentTimeMillis());
+
+   private AtomicLong messageConsumedSnapshot = new AtomicLong(0);
+
+   private long acks;
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerConsumerImpl(final long id,
@@ -223,6 +233,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
    // ServerConsumer implementation
    // ----------------------------------------------------------------------
 
+   public Object getProtocolContext()
+   {
+      return protocolContext;
+   }
+
+   public void setProtocolContext(Object protocolContext)
+   {
+      this.protocolContext = protocolContext;
+   }
+
    public long getID()
    {
       return id;
@@ -266,7 +286,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
 
    public HandleStatus handle(final MessageReference ref) throws Exception
    {
-      if (availableCredits != null && availableCredits.get() <= 0)
+      if (callback != null && !callback.hasCredits(this) || availableCredits != null && availableCredits.get() <= 0)
       {
          if (HornetQServerLogger.LOGGER.isDebugEnabled())
          {
@@ -408,6 +428,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
       return filter;
    }
 
+   @Override
    public void close(final boolean failed) throws Exception
    {
       callback.removeReadyListener(this);
@@ -421,16 +442,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
          del.finish();
       }
 
-      if (browseOnly)
-      {
-         browserDeliverer.close();
-      }
-      else
-      {
-         messageQueue.removeConsumer(this);
-      }
-
-      session.removeConsumer(id);
+      removeItself();
 
       LinkedList<MessageReference> refs = cancelRefs(failed, false, null);
 
@@ -472,12 +484,27 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
 
          props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(session.getName()));
 
-         Notification notification = new Notification(null, NotificationType.CONSUMER_CLOSED, props);
+         Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CLOSED, props);
 
          managementService.sendNotification(notification);
       }
    }
 
+   @Override
+   public void removeItself() throws Exception
+   {
+      if (browseOnly)
+      {
+         browserDeliverer.close();
+      }
+      else
+      {
+         messageQueue.removeConsumer(this);
+      }
+
+      session.removeConsumer(id);
+   }
+
    /**
     * Prompt delivery and send a "forced delivery" message to the consumer.
     * <p/>
@@ -513,12 +540,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
                   }
                   else
                   {
-                     ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
+                     ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50);
 
                      forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
                      forcedDeliveryMessage.setAddress(messageQueue.getName());
 
-                     callback.sendMessage(forcedDeliveryMessage, id, 0);
+                     callback.sendMessage(forcedDeliveryMessage, ServerConsumerImpl.this, 0);
                   }
                }
             }
@@ -565,7 +592,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
             }
             if (performACK)
             {
-               acknowledge(false, tx, ref.getMessage().getMessageID());
+               acknowledge(tx, ref.getMessage().getMessageID());
 
                performACK = false;
             }
@@ -655,7 +682,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
       }
    }
 
-   public void receiveCredits(final int credits) throws Exception
+   public void receiveCredits(final int credits)
    {
       if (credits == -1)
       {
@@ -705,7 +732,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
       return messageQueue;
    }
 
-   public void acknowledge(final boolean autoCommitAcks, Transaction tx, final long messageID) throws Exception
+   public void acknowledge(Transaction tx, final long messageID) throws Exception
    {
       if (browseOnly)
       {
@@ -720,7 +747,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
 
       boolean startedTransaction = false;
 
-      if (tx == null || autoCommitAcks)
+      if (tx == null)
       {
          startedTransaction = true;
          tx = new TransactionImpl(storageManager);
@@ -745,6 +772,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
             }
 
             ref.getQueue().acknowledge(tx, ref);
+            acks++;
          }
          while (ref.getMessage().getMessageID() != messageID);
 
@@ -781,7 +809,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
       }
    }
 
-   public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
+   public void individualAcknowledge(final Transaction tx, final long messageID) throws Exception
    {
       if (browseOnly)
       {
@@ -795,7 +823,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
          throw new IllegalStateException("Cannot find ref to ack " + messageID);
       }
 
-      if (autoCommitAcks)
+      if (tx == null)
       {
          ref.getQueue().acknowledge(ref);
       }
@@ -803,6 +831,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
       {
          ref.getQueue().acknowledge(tx, ref);
       }
+      acks++;
    }
 
    public void individualCancel(final long messageID, boolean failed) throws Exception
@@ -896,12 +925,23 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
    @Override
    public void disconnect()
    {
-      callback.disconnect(id, getQueue().getName().toString());
+      callback.disconnect(this, getQueue().getName().toString());
+   }
+
+   public float getRate()
+   {
+      float timeSlice = ((System.currentTimeMillis() - consumerRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
+      if (timeSlice == 0)
+      {
+         messageConsumedSnapshot.getAndSet(acks);
+         return 0.0f;
+      }
+      return BigDecimal.valueOf((acks - messageConsumedSnapshot.getAndSet(acks)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
    }
 
    // Private --------------------------------------------------------------------------------------
 
-   private void promptDelivery()
+   public void promptDelivery()
    {
       // largeMessageDeliverer is always set inside a lock
       // if we don't acquire a lock, we will have NPE eventually
@@ -938,7 +978,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
     */
    private void deliverStandardMessage(final MessageReference ref, final ServerMessage message)
    {
-      int packetSize = callback.sendMessage(message, id, ref.getDeliveryCount());
+      int packetSize = callback.sendMessage(message, ServerConsumerImpl.this, ref.getDeliveryCount());
 
       if (availableCredits != null)
       {
@@ -1040,7 +1080,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
                sentInitialPacket = true;
 
                int packetSize = callback.sendLargeMessage(largeMessage,
-                                                          id,
+                                                          ServerConsumerImpl.this,
                                                           context.getLargeBodySize(),
                                                           ref.getDeliveryCount());
 
@@ -1088,7 +1128,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
 
                byte[] body = bodyBuffer.toByteBuffer().array();
 
-               int packetSize = callback.sendLargeMessageContinuation(id,
+               int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this,
                                                                       body,
                                                                       positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage,
                                                                       false);
@@ -1166,16 +1206,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
       }
    }
 
-   private class BrowserDeliverer implements Runnable
+   protected class BrowserDeliverer implements Runnable
    {
-      private MessageReference current = null;
+      protected MessageReference current = null;
 
       public BrowserDeliverer(final LinkedListIterator<MessageReference> iterator)
       {
          this.iterator = iterator;
       }
 
-      private final LinkedListIterator<MessageReference> iterator;
+      public final LinkedListIterator<MessageReference> iterator;
 
       public synchronized void close()
       {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerMessageImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerMessageImpl.java
index 54899ad..e780957 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerMessageImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerMessageImpl.java
@@ -102,9 +102,10 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage
       return true;
    }
 
-   public void setMessageID(final long id)
+   public ServerMessageImpl setMessageID(final long id)
    {
       messageID = id;
+      return this;
    }
 
    public MessageReference createReference(final Queue queue)
@@ -247,30 +248,27 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage
    @Override
    public void setOriginalHeaders(final ServerMessage other, final MessageReference originalReference, final boolean expiry)
    {
+      SimpleString originalQueue = other.getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE);
+
+      if (originalQueue != null)
+      {
+         putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
+      }
+      else if (originalReference != null)
+      {
+         putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalReference.getQueue().getName());
+      }
+
       if (other.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
       {
          putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS));
 
-         SimpleString originalQueue = other.getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE);
-
-         if (originalQueue != null)
-         {
-            putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
-         }
-
          putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getLongProperty(Message.HDR_ORIG_MESSAGE_ID));
       }
       else
       {
          putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getAddress());
 
-         /**
-          * This could be null in some DLA cases since the message wasn't routed yet
-          */
-         if (originalReference != null)
-         {
-            putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalReference.getQueue().getName());
-         }
 
          putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getMessageID());
       }
@@ -328,9 +326,6 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage
          ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
    }
 
-   // FIXME - this is stuff that is only used in large messages
-
-   // This is only valid on the client side - why is it here?
    public InputStream getBodyInputStream()
    {
       return null;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java
index fb9a637..1083dfb 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java
@@ -26,6 +26,7 @@ package org.hornetq.core.server.impl;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.Xid;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -42,6 +43,7 @@ import org.hornetq.api.core.HornetQNonExistentQueueException;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.core.client.impl.ClientMessageImpl;
 import org.hornetq.core.exception.HornetQXAException;
@@ -78,6 +80,7 @@ import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.Transaction.State;
+import org.hornetq.core.transaction.TransactionFactory;
 import org.hornetq.core.transaction.TransactionOperationAbstract;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.TransactionImpl;
@@ -88,8 +91,6 @@ import org.hornetq.utils.UUID;
 import org.hornetq.utils.json.JSONArray;
 import org.hornetq.utils.json.JSONObject;
 
-import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
-
 /**
  * Server side Session implementation
  *
@@ -108,29 +109,29 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
    // Attributes ----------------------------------------------------------------------------
 
-   private final String username;
+   protected final String username;
 
-   private final String password;
+   protected final String password;
 
    private final int minLargeMessageSize;
 
-   private final boolean autoCommitSends;
+   protected boolean autoCommitSends;
 
-   private final boolean autoCommitAcks;
+   protected boolean autoCommitAcks;
 
-   private final boolean preAcknowledge;
+   protected final boolean preAcknowledge;
 
-   private final boolean strictUpdateDeliveryCount;
+   protected final boolean strictUpdateDeliveryCount;
 
-   private final RemotingConnection remotingConnection;
+   protected final RemotingConnection remotingConnection;
 
-   private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
+   protected final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
 
-   private Transaction tx;
+   protected Transaction tx;
 
-   private final boolean xa;
+   protected boolean xa;
 
-   private final StorageManager storageManager;
+   protected final StorageManager storageManager;
 
    private final ResourceManager resourceManager;
 
@@ -138,24 +139,24 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
    private final SecurityStore securityStore;
 
-   private final ManagementService managementService;
+   protected final ManagementService managementService;
 
-   private volatile boolean started = false;
+   protected volatile boolean started = false;
 
-   private final Map<SimpleString, TempQueueCleanerUpper> tempQueueCleannerUppers = new HashMap<SimpleString, TempQueueCleanerUpper>();
+   protected final Map<SimpleString, TempQueueCleanerUpper> tempQueueCleannerUppers = new HashMap<SimpleString, TempQueueCleanerUpper>();
 
-   private final String name;
+   protected final String name;
 
-   private final HornetQServer server;
+   protected final HornetQServer server;
 
    private final SimpleString managementAddress;
 
    // The current currentLargeMessage being processed
    private volatile LargeServerMessage currentLargeMessage;
 
-   private final RoutingContext routingContext = new RoutingContextImpl(null);
+   protected final RoutingContext routingContext = new RoutingContextImpl(null);
 
-   private final SessionCallback callback;
+   protected final SessionCallback callback;
 
    private volatile SimpleString defaultAddress;
 
@@ -166,7 +167,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
    private final OperationContext context;
 
    // Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
-   private final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
+   protected final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
 
    private final long creationTime = System.currentTimeMillis();
 
@@ -178,8 +179,34 @@ public class ServerSessionImpl implements ServerSession, FailureListener
    // concurrently.
    private volatile boolean closed = false;
 
+   private final TransactionFactory transactionFactory;
+
    // Constructors ---------------------------------------------------------------------------------
 
+   //create an 'empty' session. Only used by AMQServerSession
+   //in order to check username and password
+   protected ServerSessionImpl(String username, String password)
+   {
+      this.username = username;
+      this.password = password;
+
+      this.transactionFactory = null;
+      this.strictUpdateDeliveryCount = false;
+      this.storageManager = null;
+      this.server = null;
+      this.securityStore = null;
+      this.resourceManager = null;
+      this.remotingConnection = null;
+      this.preAcknowledge = false;
+      this.postOffice = null;
+      this.name = null;
+      this.minLargeMessageSize = 0;
+      this.managementService = null;
+      this.managementAddress = null;
+      this.context = null;
+      this.callback = null;
+   }
+
    public ServerSessionImpl(final String name,
                             final String username,
                             final String password,
@@ -201,6 +228,36 @@ public class ServerSessionImpl implements ServerSession, FailureListener
                             final SessionCallback callback,
                             final OperationContext context) throws Exception
    {
+      this(name, username, password, minLargeMessageSize,
+         autoCommitSends, autoCommitAcks, preAcknowledge,
+         strictUpdateDeliveryCount, xa, remotingConnection,
+         storageManager, postOffice, resourceManager, securityStore,
+         managementService, server, managementAddress, defaultAddress,
+         callback, context, null);
+   }
+
+   public ServerSessionImpl(final String name,
+                            final String username,
+                            final String password,
+                            final int minLargeMessageSize,
+                            final boolean autoCommitSends,
+                            final boolean autoCommitAcks,
+                            final boolean preAcknowledge,
+                            final boolean strictUpdateDeliveryCount,
+                            final boolean xa,
+                            final RemotingConnection remotingConnection,
+                            final StorageManager storageManager,
+                            final PostOffice postOffice,
+                            final ResourceManager resourceManager,
+                            final SecurityStore securityStore,
+                            final ManagementService managementService,
+                            final HornetQServer server,
+                            final SimpleString managementAddress,
+                            final SimpleString defaultAddress,
+                            final SessionCallback callback,
+                            final OperationContext context,
+                            TransactionFactory transactionFactory) throws Exception
+   {
       this.username = username;
 
       this.password = password;
@@ -242,6 +299,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
       remotingConnection.addFailureListener(this);
       this.context = context;
+
+      if (transactionFactory == null)
+      {
+         this.transactionFactory = new DefaultTransactionFactory();
+      }
+      else
+      {
+         this.transactionFactory = transactionFactory;
+      }
+
       if (!xa)
       {
          tx = newTransaction();
@@ -289,15 +356,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       return Collections.unmodifiableSet(consumersClone);
    }
 
-   public void removeConsumer(final long consumerID) throws Exception
+   public boolean removeConsumer(final long consumerID) throws Exception
    {
-      if (consumers.remove(consumerID) == null)
-      {
-         throw new IllegalStateException("Cannot find consumer with id " + consumerID + " to remove");
-      }
+      return consumers.remove(consumerID) != null;
    }
 
-   private void doClose(final boolean failed) throws Exception
+   protected void doClose(final boolean failed) throws Exception
    {
       synchronized (this)
       {
@@ -316,14 +380,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener
                HornetQServerLogger.LOGGER.warn(e.getMessage(), e);
             }
          }
-
-         server.removeSession(name);
-
-         remotingConnection.removeFailureListener(this);
-
-         callback.closed();
-
-         closed = true;
       }
 
       //putting closing of consumers outside the sync block
@@ -332,7 +388,22 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
       for (ServerConsumer consumer : consumersClone)
       {
-         consumer.close(failed);
+         try
+         {
+            consumer.close(failed);
+         }
+         catch (Throwable e)
+         {
+            HornetQServerLogger.LOGGER.warn(e.getMessage(), e);
+            try
+            {
+               consumer.removeItself();
+            }
+            catch (Throwable e2)
+            {
+               HornetQServerLogger.LOGGER.warn(e2.getMessage(), e2);
+            }
+         }
       }
 
       consumers.clear();
@@ -348,22 +419,34 @@ public class ServerSessionImpl implements ServerSession, FailureListener
             HornetQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
          }
       }
+
+
+      synchronized (this)
+      {
+         server.removeSession(name);
+
+         remotingConnection.removeFailureListener(this);
+
+         callback.closed();
+
+         closed = true;
+      }
    }
 
-   public void createConsumer(final long consumerID,
-                              final SimpleString queueName,
-                              final SimpleString filterString,
-                              final boolean browseOnly) throws Exception
+   public ServerConsumer createConsumer(final long consumerID,
+                                        final SimpleString queueName,
+                                        final SimpleString filterString,
+                                        final boolean browseOnly) throws Exception
    {
-      this.createConsumer(consumerID, queueName, filterString, browseOnly, true, null);
+      return this.createConsumer(consumerID, queueName, filterString, browseOnly, true, null);
    }
 
-   public void createConsumer(final long consumerID,
-                              final SimpleString queueName,
-                              final SimpleString filterString,
-                              final boolean browseOnly,
-                              final boolean supportLargeMessage,
-                              final Integer credits) throws Exception
+   public ServerConsumer createConsumer(final long consumerID,
+                                        final SimpleString queueName,
+                                        final SimpleString filterString,
+                                        final boolean browseOnly,
+                                        final boolean supportLargeMessage,
+                                        final Integer credits) throws Exception
    {
       Binding binding = postOffice.getBinding(queueName);
 
@@ -376,7 +459,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
       Filter filter = FilterImpl.createFilter(filterString);
 
-      ServerConsumer consumer = new ServerConsumerImpl(consumerID,
+      ServerConsumer consumer = newConsumer(consumerID,
                                                        this,
                                                        (QueueBinding) binding,
                                                        filter,
@@ -419,7 +502,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
             props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
          }
 
-         Notification notification = new Notification(null, CONSUMER_CREATED, props);
+         Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props);
 
          if (HornetQServerLogger.LOGGER.isDebugEnabled())
          {
@@ -431,6 +514,31 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
          managementService.sendNotification(notification);
       }
+
+      return consumer;
+   }
+
+   protected ServerConsumer newConsumer(long consumerID,
+         ServerSessionImpl serverSessionImpl, QueueBinding binding,
+         Filter filter, boolean started2, boolean browseOnly,
+         StorageManager storageManager2, SessionCallback callback2,
+         boolean preAcknowledge2, boolean strictUpdateDeliveryCount2,
+         ManagementService managementService2, boolean supportLargeMessage,
+         Integer credits) throws Exception
+   {
+      return new ServerConsumerImpl(consumerID,
+            this,
+            (QueueBinding) binding,
+            filter,
+            started,
+            browseOnly,
+            storageManager,
+            callback,
+            preAcknowledge,
+            strictUpdateDeliveryCount,
+            managementService,
+            supportLargeMessage,
+            credits);
    }
 
    public void createQueue(final SimpleString address,
@@ -492,13 +600,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       return remotingConnection;
    }
 
-   private static class TempQueueCleanerUpper implements CloseListener, FailureListener
+   public static class TempQueueCleanerUpper implements CloseListener, FailureListener
    {
       private final SimpleString bindingName;
 
       private final HornetQServer server;
 
-      TempQueueCleanerUpper(final HornetQServer server, final SimpleString bindingName)
+      public TempQueueCleanerUpper(final HornetQServer server, final SimpleString bindingName)
       {
          this.server = server;
 
@@ -599,7 +707,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
                                          queue.isTemporary(),
                                          filterString,
                                          queue.getConsumerCount(),
-                                         queue.getMessageCount(QueueImpl.DELIVERY_TIMEOUT));
+                                         queue.getMessageCount());
       }
       // make an exception for the management address (see HORNETQ-29)
       else if (name.equals(managementAddress))
@@ -668,12 +776,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener
          // have these messages to be stuck on the limbo until the server is restarted
          // The tx has already timed out, so we need to ack and rollback immediately
          Transaction newTX = newTransaction();
-         consumer.acknowledge(autoCommitAcks, newTX, messageID);
+         consumer.acknowledge(newTX, messageID);
          newTX.rollback();
       }
       else
       {
-         consumer.acknowledge(autoCommitAcks, tx, messageID);
+         consumer.acknowledge(autoCommitAcks ? null : tx, messageID);
       }
    }
 
@@ -681,23 +789,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener
    {
       ServerConsumer consumer = consumers.get(consumerID);
 
-      if (this.xa && tx == null)
-      {
-         throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
-      }
-
       if (tx != null && tx.getState() == State.ROLLEDBACK)
       {
          // JBPAPP-8845 - if we let stuff to be acked on a rolled back TX, we will just
          // have these messages to be stuck on the limbo until the server is restarted
          // The tx has already timed out, so we need to ack and rollback immediately
          Transaction newTX = newTransaction();
-         consumer.individualAcknowledge(autoCommitAcks, tx, messageID);
+         consumer.individualAcknowledge(tx, messageID);
          newTX.rollback();
       }
       else
       {
-         consumer.individualAcknowledge(autoCommitAcks, tx, messageID);
+         consumer.individualAcknowledge(autoCommitAcks ? null : tx, messageID);
       }
 
    }
@@ -732,11 +835,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       }
       try
       {
-         tx.commit();
+         if (tx != null)
+         {
+            tx.commit();
+         }
       }
       finally
       {
-         tx = newTransaction();
+         if (xa)
+         {
+            tx = null;
+         }
+         else
+         {
+            tx = newTransaction();
+         }
       }
    }
 
@@ -774,18 +887,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener
    /**
     * @return
     */
-   private TransactionImpl newTransaction()
+   protected Transaction newTransaction()
    {
-      return new TransactionImpl(storageManager, timeoutSeconds);
+      return transactionFactory.newTransaction(null, storageManager, timeoutSeconds);
    }
 
    /**
     * @param xid
     * @return
     */
-   private TransactionImpl newTransaction(final Xid xid)
+   private Transaction newTransaction(final Xid xid)
    {
-      return new TransactionImpl(xid, storageManager, timeoutSeconds);
+      return transactionFactory.newTransaction(xid, storageManager, timeoutSeconds);
    }
 
    public synchronized void xaCommit(final Xid xid, final boolean onePhase) throws Exception
@@ -1306,13 +1419,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener
    @Override
    public Transaction getCurrentTransaction()
    {
+      if (tx == null)
+      {
+         tx = newTransaction();
+      }
       return tx;
    }
 
    public void sendLarge(final MessageInternal message) throws Exception
    {
       // need to create the LargeMessage before continue
-      long id = storageManager.generateUniqueID();
+      long id = storageManager.generateID();
 
       LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
 
@@ -1335,7 +1452,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       //case the id header already generated.
       if (!message.isLargeMessage())
       {
-         long id = storageManager.generateUniqueID();
+         long id = storageManager.generateID();
 
          message.setMessageID(id);
          message.encodeMessageIDToBuffer();
@@ -1658,6 +1775,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener
          toCancel.addAll(consumer.cancelRefs(clientFailed, lastMessageAsDelived, theTx));
       }
 
+      //we need to check this before we cancel the refs and add them to the tx, any delivering refs will have been delivered
+      //after the last tx was rolled back so we should handle them separately. if not they
+      //will end up added to the tx but never ever handled even tho they were removed from the consumers delivering refs.
+      //we add them to a new tx and roll them back as the calling client will assume that this has happened.
+      if (theTx.getState() == State.ROLLEDBACK)
+      {
+         Transaction newTX = newTransaction();
+         cancelAndRollback(clientFailed, newTX, wasStarted, toCancel);
+         throw new IllegalStateException("Transaction has already been rolled back");
+      }
+      cancelAndRollback(clientFailed, theTx, wasStarted, toCancel);
+   }
+
+   private void cancelAndRollback(boolean clientFailed, Transaction theTx, boolean wasStarted, List<MessageReference> toCancel) throws Exception
+   {
       for (MessageReference ref : toCancel)
       {
          ref.getQueue().cancel(theTx, ref);
@@ -1683,7 +1815,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       theTx.rollback();
    }
 
-   private void doSend(final ServerMessage msg, final boolean direct) throws Exception
+   protected void doSend(final ServerMessage msg, final boolean direct) throws Exception
    {
       // check the user has write access to this address.
       try
@@ -1692,7 +1824,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       }
       catch (HornetQException e)
       {
-         if (!autoCommitSends)
+         if (!autoCommitSends && tx != null)
          {
             tx.markAsRollbackOnly(e);
          }
@@ -1735,7 +1867,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
    {
       if (this.tx != null)
       {
-         QueueImpl.RefsOperation oper = (QueueImpl.RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+         RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
 
          if (oper == null)
          {
@@ -1752,4 +1884,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       }
    }
 
+   private static class DefaultTransactionFactory implements TransactionFactory
+   {
+      @Override
+      public Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds)
+      {
+         return new TransactionImpl(xid, storageManager, timeoutSeconds);
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServiceRegistry.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServiceRegistry.java
new file mode 100644
index 0000000..0e0e809
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ServiceRegistry.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.core.config.ConnectorServiceConfiguration;
+import org.hornetq.core.server.ConnectorServiceFactory;
+
+/**
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
+ */
+
+public class ServiceRegistry
+{
+   private ExecutorService executorService;
+
+   private ScheduledExecutorService scheduledExecutorService;
+
+   /* We are using a List rather than HashMap here as HornetQ allows multiple instances of the same class to be added
+   * to the interceptor list
+   */
+   private Map<String, Interceptor> incomingInterceptors;
+
+   private Map<String, Interceptor> outgoingInterceptors;
+
+   private Map<String, Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServices;
+
+   public ServiceRegistry()
+   {
+      this.incomingInterceptors = new ConcurrentHashMap<String, Interceptor>();
+      this.outgoingInterceptors = new ConcurrentHashMap<String, Interceptor>();
+      this.connectorServices = new ConcurrentHashMap<String, Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>>();
+   }
+
+   public ExecutorService getExecutorService()
+   {
+      return executorService;
+   }
+
+   public void setExecutorService(ExecutorService executorService)
+   {
+      this.executorService = executorService;
+   }
+
+   public ScheduledExecutorService getScheduledExecutorService()
+   {
+      return scheduledExecutorService;
+   }
+
+   public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService)
+   {
+      this.scheduledExecutorService = scheduledExecutorService;
+   }
+
+   public void addConnectorService(ConnectorServiceFactory connectorServiceFactory, ConnectorServiceConfiguration configuration)
+   {
+      connectorServices.put(configuration.getConnectorName(), new Pair<>(connectorServiceFactory, configuration));
+   }
+
+   public void removeConnectorService(ConnectorServiceConfiguration configuration)
+   {
+      connectorServices.remove(configuration.getConnectorName());
+   }
+
+   public Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices()
+   {
+      return connectorServices.values();
+   }
+
+   public void addIncomingInterceptor(String name, Interceptor interceptor)
+   {
+      incomingInterceptors.put(name, interceptor);
+   }
+
+   public void removeIncomingInterceptor(String name)
+   {
+      incomingInterceptors.remove(name);
+   }
+
+   public Collection<Interceptor> getIncomingInterceptors()
+   {
+      return Collections.unmodifiableCollection(incomingInterceptors.values());
+   }
+
+   public Interceptor getIncomingInterceptor(String name)
+   {
+      return incomingInterceptors.get(name);
+   }
+
+   public void addOutgoingInterceptor(String name, Interceptor interceptor)
+   {
+      outgoingInterceptors.put(name, interceptor);
+   }
+
+   public Interceptor getOutgoingInterceptor(String name)
+   {
+      return outgoingInterceptors.get(name);
+   }
+
+   public void removeOutgoingInterceptor(String name)
+   {
+      outgoingInterceptors.remove(name);
+   }
+
+   public Collection<Interceptor> getOutgoingInterceptors()
+   {
+      return Collections.unmodifiableCollection(outgoingInterceptors.values());
+   }
+}


Mime
View raw message