qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1671184 [3/5] - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/...
Date Fri, 03 Apr 2015 22:21:06 GMT
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java Fri Apr  3 22:21:05 2015
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
 
 import org.slf4j.Logger;
@@ -32,10 +33,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.Xid;
 
@@ -48,14 +53,15 @@ public class DtxBranch
     private       State                          _state = State.ACTIVE;
     private long _timeout;
     private Map<AMQSessionModel, State> _associatedSessions = new HashMap<AMQSessionModel, State>();
-    private final List<Record> _enqueueRecords = new ArrayList<Record>();
-    private final List<Record> _dequeueRecords = new ArrayList<Record>();
+    private final List<EnqueueRecord> _enqueueRecords = new ArrayList<>();
+    private final List<DequeueRecord> _dequeueRecords = new ArrayList<>();
 
     private Transaction _transaction;
     private long _expiration;
     private VirtualHostImpl _vhost;
     private ScheduledFuture<?> _timeoutFuture;
     private MessageStore _store;
+    private Transaction.StoredXidRecord _storedXidRecord;
 
 
     public enum State
@@ -77,6 +83,12 @@ public class DtxBranch
         _vhost = vhost;
     }
 
+    public DtxBranch(Transaction.StoredXidRecord storedXidRecord, MessageStore store, VirtualHostImpl vhost)
+    {
+        this(new Xid(storedXidRecord.getFormat(), storedXidRecord.getGlobalId(), storedXidRecord.getBranchId()), store, vhost);
+        _storedXidRecord = storedXidRecord;
+    }
+
     public Xid getXid()
     {
         return _xid;
@@ -227,11 +239,11 @@ public class DtxBranch
         }
 
         Transaction txn = _store.newTransaction();
-        txn.recordXid(_xid.getFormat(),
+        _storedXidRecord = txn.recordXid(_xid.getFormat(),
                       _xid.getGlobalId(),
                       _xid.getBranchId(),
-                      _enqueueRecords.toArray(new Record[_enqueueRecords.size()]),
-                      _dequeueRecords.toArray(new Record[_dequeueRecords.size()]));
+                      _enqueueRecords.toArray(new EnqueueRecord[_enqueueRecords.size()]),
+                      _dequeueRecords.toArray(new DequeueRecord[_dequeueRecords.size()]));
         txn.commitTran();
 
         prePrepareTransaction();
@@ -266,7 +278,7 @@ public class DtxBranch
             // prepare has previously been called
 
             Transaction txn = _store.newTransaction();
-            txn.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId());
+            txn.removeXid(_storedXidRecord);
             txn.commitTran();
 
             _transaction.abortTran();
@@ -309,7 +321,7 @@ public class DtxBranch
         }
         else
         {
-            _transaction.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId());
+            _transaction.removeXid(_storedXidRecord);
         }
         _transaction.commitTran();
 
@@ -324,21 +336,25 @@ public class DtxBranch
     {
         _transaction = _store.newTransaction();
 
-        for(Record enqueue : _enqueueRecords)
+        for(final EnqueueRecord enqueue : _enqueueRecords)
         {
+            final MessageEnqueueRecord record;
             if(enqueue.isDurable())
             {
-                _transaction.enqueueMessage(enqueue.getResource(), enqueue.getMessage());
+                record = _transaction.enqueueMessage(enqueue.getResource(), enqueue.getMessage());
+
             }
+            else
+            {
+                record = null;
+            }
+            enqueue.getEnqueueAction().performAction(record);
         }
 
 
-        for(Record enqueue : _dequeueRecords)
+        for(DequeueRecord dequeue : _dequeueRecords)
         {
-            if(enqueue.isDurable())
-            {
-                _transaction.dequeueMessage(enqueue.getResource(), enqueue.getMessage());
-            }
+            _transaction.dequeueMessage(dequeue.getEnqueueRecord());
         }
     }
 
@@ -349,28 +365,58 @@ public class DtxBranch
     }
 
 
-    public void dequeue(TransactionLogResource resource, EnqueueableMessage message)
+    public void dequeue(MessageEnqueueRecord record)
     {
-        _dequeueRecords.add(new Record(resource, message));
+        if(record != null)
+        {
+            _dequeueRecords.add(new DequeueRecord(record));
+        }
     }
 
+    public void enqueue(TransactionLogResource queue,
+                        EnqueueableMessage message,
+                        final Action<MessageEnqueueRecord> enqueueAction)
+    {
+        _enqueueRecords.add(new EnqueueRecord(queue, message, enqueueAction));
+    }
 
-    public void enqueue(TransactionLogResource queue, EnqueueableMessage message)
+    private static class DequeueRecord implements Transaction.DequeueRecord
     {
-        _enqueueRecords.add(new Record(queue, message));
+        private final MessageEnqueueRecord _enqueueRecord;
+
+        public DequeueRecord(MessageEnqueueRecord enqueueRecord)
+        {
+            _enqueueRecord = enqueueRecord;
+        }
+
+        public MessageEnqueueRecord getEnqueueRecord()
+        {
+            return _enqueueRecord;
+        }
+
+
     }
 
-    private static final class Record implements Transaction.Record
+    private static class EnqueueRecord implements Transaction.EnqueueRecord
     {
         private final TransactionLogResource _resource;
         private final EnqueueableMessage _message;
 
-        public Record(TransactionLogResource resource, EnqueueableMessage message)
+        private final Action<MessageEnqueueRecord> _enqueueAction;
+
+        public EnqueueRecord(final TransactionLogResource resource,
+                             final EnqueueableMessage message,
+                             final Action<MessageEnqueueRecord> enqueueAction)
         {
             _resource = resource;
             _message = message;
+            _enqueueAction = enqueueAction;
         }
 
+        public Action<MessageEnqueueRecord> getEnqueueAction()
+        {
+            return _enqueueAction;
+        }
         public TransactionLogResource getResource()
         {
             return _resource;
@@ -385,6 +431,7 @@ public class DtxBranch
         {
             return _resource.getMessageDurability().persist(_message.isPersistent());
         }
+
     }
 
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Fri Apr  3 22:21:05 2015
@@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.util.FutureResult;
 import org.apache.qpid.server.store.Transaction;
@@ -92,23 +92,23 @@ public class LocalTransaction implements
         _postTransactionActions.add(postTransactionAction);
     }
 
-    public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+    public void dequeue(MessageEnqueueRecord record, Action postTransactionAction)
     {
         sync();
         _postTransactionActions.add(postTransactionAction);
         initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
-        if(queue.getMessageDurability().persist(message.isPersistent()))
+        if(record != null)
         {
             try
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
+                    _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId());
                 }
 
                 beginTranIfNecessary();
-                _transaction.dequeueMessage(queue, message);
+                _transaction.dequeueMessage(record);
             }
             catch(RuntimeException e)
             {
@@ -127,18 +127,16 @@ public class LocalTransaction implements
         {
             for(MessageInstance entry : queueEntries)
             {
-                ServerMessage message = entry.getMessage();
-                TransactionLogResource queue = entry.getOwningResource();
-
-                if(queue.getMessageDurability().persist(message.isPersistent()))
+                final MessageEnqueueRecord record = entry.getEnqueueRecord();
+                if(record != null)
                 {
                     if (_logger.isDebugEnabled())
                     {
-                        _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
+                        _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId());
                     }
 
                     beginTranIfNecessary();
-                    _transaction.dequeueMessage(queue, message);
+                    _transaction.dequeueMessage(record);
                 }
             }
 
@@ -181,10 +179,9 @@ public class LocalTransaction implements
         }
     }
 
-    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction)
     {
         sync();
-        _postTransactionActions.add(postTransactionAction);
         initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
         if(queue.getMessageDurability().persist(message.isPersistent()))
@@ -197,23 +194,83 @@ public class LocalTransaction implements
                 }
 
                 beginTranIfNecessary();
-                _transaction.enqueueMessage(queue, message);
+                final MessageEnqueueRecord record = _transaction.enqueueMessage(queue, message);
+                if(postTransactionAction != null)
+                {
+                    final EnqueueAction underlying = postTransactionAction;
+
+                    _postTransactionActions.add(new Action()
+                    {
+                        @Override
+                        public void postCommit()
+                        {
+                            underlying.postCommit(record);
+                        }
+
+                        @Override
+                        public void onRollback()
+                        {
+                            underlying.onRollback();
+                        }
+                    });
+                }
             }
             catch(RuntimeException e)
             {
+                if(postTransactionAction != null)
+                {
+                    final EnqueueAction underlying = postTransactionAction;
+
+                    _postTransactionActions.add(new Action()
+                    {
+                        @Override
+                        public void postCommit()
+                        {
+
+                        }
+
+                        @Override
+                        public void onRollback()
+                        {
+                            underlying.onRollback();
+                        }
+                    });
+                }
                 tidyUpOnError(e);
             }
         }
+        else
+        {
+            if(postTransactionAction != null)
+            {
+                final EnqueueAction underlying = postTransactionAction;
+                _postTransactionActions.add(new Action()
+                {
+                    @Override
+                    public void postCommit()
+                    {
+                        underlying.postCommit((MessageEnqueueRecord)null);
+                    }
+
+                    @Override
+                    public void onRollback()
+                    {
+                        underlying.onRollback();
+                    }
+                });
+            }
+        }
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction)
     {
         sync();
-        _postTransactionActions.add(postTransactionAction);
         initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
         try
         {
+            final MessageEnqueueRecord[] records = new MessageEnqueueRecord[queues.size()];
+            int i = 0;
             for(BaseQueue queue : queues)
             {
                 if(queue.getMessageDurability().persist(message.isPersistent()))
@@ -224,13 +281,53 @@ public class LocalTransaction implements
                     }
 
                     beginTranIfNecessary();
-                    _transaction.enqueueMessage(queue, message);
+                    records[i] = _transaction.enqueueMessage(queue, message);
 
                 }
+                i++;
+            }
+            if(postTransactionAction != null)
+            {
+                final EnqueueAction underlying = postTransactionAction;
+
+                _postTransactionActions.add(new Action()
+                {
+                    @Override
+                    public void postCommit()
+                    {
+                        underlying.postCommit(records);
+                    }
+
+                    @Override
+                    public void onRollback()
+                    {
+                        underlying.onRollback();
+                    }
+                });
+                postTransactionAction = null;
             }
         }
         catch(RuntimeException e)
         {
+            if(postTransactionAction != null)
+            {
+                final EnqueueAction underlying = postTransactionAction;
+
+                _postTransactionActions.add(new Action()
+                {
+                    @Override
+                    public void postCommit()
+                    {
+
+                    }
+
+                    @Override
+                    public void onRollback()
+                    {
+                        underlying.onRollback();
+                    }
+                });
+            }
             tidyUpOnError(e);
         }
     }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Fri Apr  3 22:21:05 2015
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
 
 
@@ -55,6 +56,15 @@ public interface ServerTransaction
         public void onRollback();
     }
 
+    public static interface EnqueueAction
+    {
+        public void postCommit(MessageEnqueueRecord... records);
+
+        public void onRollback();
+    }
+
+
+
     /**
      * Return the time the current transaction started.
      *
@@ -75,12 +85,12 @@ public interface ServerTransaction
      */
     void addPostTransactionAction(Action postTransactionAction);
 
-    /** 
+    /**
      * Dequeue a message from a queue registering a post transaction action.
-     * 
-     * A store operation will result only for a persistent message on a durable queue.
+     *
+     * A store operation will result only for a if the record is not null.
      */
-    void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction);
+    void dequeue(MessageEnqueueRecord record, Action postTransactionAction);
 
     /** 
      * Dequeue a message(s) from queue(s) registering a post transaction action.
@@ -94,14 +104,14 @@ public interface ServerTransaction
      * 
      * A store operation will result only for a persistent message on a durable queue.
      */
-    void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction);
+    void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction);
 
     /** 
      * Enqueue a message(s) to queue(s) registering a post transaction action.
      * 
      * Store operations will result only for a persistent messages on durable queues.
      */
-    void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, Action postTransactionAction);
+    void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction);
 
     /** 
      * Commit the transaction represented by this object.

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Fri Apr  3 22:21:05 2015
@@ -86,6 +86,7 @@ import org.apache.qpid.server.store.Dura
 import org.apache.qpid.server.store.Event;
 import org.apache.qpid.server.store.EventListener;
 import org.apache.qpid.server.store.GenericRecoverer;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreProvider;
 import org.apache.qpid.server.store.StoreException;
@@ -1182,7 +1183,7 @@ public abstract class AbstractVirtualHos
                 }
                 if(acquired)
                 {
-                    txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action()
+                    txn.dequeue(messageInstance.getEnqueueRecord(), new ServerTransaction.Action()
                     {
                         public void postCommit()
                         {
@@ -1201,11 +1202,11 @@ public abstract class AbstractVirtualHos
                 final ServerMessage message = entry.getMessage();
                 final AMQQueue toQueue = (AMQQueue)queue;
 
-                txn.enqueue(toQueue, message, new ServerTransaction.Action()
+                txn.enqueue(toQueue, message, new ServerTransaction.EnqueueAction()
                 {
-                    public void postCommit()
+                    public void postCommit(MessageEnqueueRecord... records)
                     {
-                        toQueue.enqueue(message, null);
+                        toQueue.enqueue(message, null, records[0]);
                     }
 
                     public void onRollback()
@@ -1222,12 +1223,12 @@ public abstract class AbstractVirtualHos
                 if(entry.acquire())
                 {
                     txn.enqueue(toQueue, message,
-                                new ServerTransaction.Action()
+                                new ServerTransaction.EnqueueAction()
                                 {
 
-                                    public void postCommit()
+                                    public void postCommit(MessageEnqueueRecord... records)
                                     {
-                                        toQueue.enqueue(message, null);
+                                        toQueue.enqueue(message, null, records[0]);
                                     }
 
                                     public void onRollback()
@@ -1235,20 +1236,23 @@ public abstract class AbstractVirtualHos
                                         entry.release();
                                     }
                                 });
-                    txn.dequeue(entry.getOwningResource(), message,
-                                new ServerTransaction.Action()
-                                {
-
-                                    public void postCommit()
+                    if(entry instanceof QueueEntry)
+                    {
+                        txn.dequeue(entry.getEnqueueRecord(),
+                                    new ServerTransaction.Action()
                                     {
-                                        entry.delete();
-                                    }
 
-                                    public void onRollback()
-                                    {
+                                        public void postCommit()
+                                        {
+                                            entry.delete();
+                                        }
 
-                                    }
-                                });
+                                        public void onRollback()
+                                        {
+
+                                        }
+                                    });
+                    }
                 }
             }
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Fri Apr  3 22:21:05 2015
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -38,12 +37,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.TransactionLogMessages;
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
@@ -54,6 +53,7 @@ import org.apache.qpid.server.store.hand
 import org.apache.qpid.server.txn.DtxBranch;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.Xid;
 import org.apache.qpid.transport.util.Functions;
 
@@ -91,6 +91,7 @@ public class AsynchronousMessageStoreRec
         private final AtomicBoolean _recoveryComplete = new AtomicBoolean();
         private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>();
         private final ExecutorService _queueRecoveryExecutor = Executors.newCachedThreadPool();
+        private final MessageStore.MessageStoreReader _storeReader;
         private AtomicBoolean _continueRecovery = new AtomicBoolean(true);
 
         private AsynchronousRecoverer(final VirtualHostImpl<?, ?, ?> virtualHost)
@@ -98,6 +99,7 @@ public class AsynchronousMessageStoreRec
             _virtualHost = virtualHost;
             _eventLogger = virtualHost.getEventLogger();
             _store = virtualHost.getMessageStore();
+            _storeReader = _store.newMessageStoreReader();
             _logSubject = new MessageStoreLogSubject(virtualHost.getName(), _store.getClass().getSimpleName());
 
             _maxMessageId = _store.getNextMessageId();
@@ -107,7 +109,7 @@ public class AsynchronousMessageStoreRec
 
         public void recover()
         {
-            getStore().visitDistributedTransactions(new DistributedTransactionVisitor());
+            getStoreReader().visitDistributedTransactions(new DistributedTransactionVisitor());
 
             for(AMQQueue<?> queue : _recoveringQueues)
             {
@@ -125,9 +127,9 @@ public class AsynchronousMessageStoreRec
             return _eventLogger;
         }
 
-        public MessageStore getStore()
+        public MessageStore.MessageStoreReader getStoreReader()
         {
-            return _store;
+            return _storeReader;
         }
 
         public MessageStoreLogSubject getLogSubject()
@@ -143,7 +145,7 @@ public class AsynchronousMessageStoreRec
         private void recoverQueue(AMQQueue<?> queue)
         {
             MessageInstanceVisitor handler = new MessageInstanceVisitor(queue);
-            _store.visitMessageInstances(queue, handler);
+            _storeReader.visitMessageInstances(queue, handler);
 
             getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERED(handler.getRecoveredCount(), queue.getName()));
             getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERY_COMPLETE(queue.getName(), true));
@@ -165,18 +167,18 @@ public class AsynchronousMessageStoreRec
                 entry.setValue(null); // free up any memory associated with the reference object
             }
             final List<StoredMessage<?>> messagesToDelete = new ArrayList<>();
-            getStore().visitMessages(new MessageHandler()
+            getStoreReader().visitMessages(new MessageHandler()
             {
                 @Override
                 public boolean handle(final StoredMessage<?> storedMessage)
                 {
 
                     long messageNumber = storedMessage.getMessageNumber();
-                    if(!_recoveredMessages.containsKey(messageNumber))
+                    if (!_recoveredMessages.containsKey(messageNumber))
                     {
                         messagesToDelete.add(storedMessage);
                     }
-                    return _continueRecovery.get() && messageNumber <_maxMessageId-1;
+                    return _continueRecovery.get() && messageNumber < _maxMessageId - 1;
                 }
             });
             for(StoredMessage<?> storedMessage : messagesToDelete)
@@ -192,6 +194,7 @@ public class AsynchronousMessageStoreRec
 
             messagesToDelete.clear();
             _recoveredMessages.clear();
+            _storeReader.close();
         }
 
         private synchronized ServerMessage<?> getRecoveredMessage(final long messageId)
@@ -199,7 +202,7 @@ public class AsynchronousMessageStoreRec
             MessageReference<? extends ServerMessage<?>> ref = _recoveredMessages.get(messageId);
             if (ref == null)
             {
-                StoredMessage<?> message = _store.getMessage(messageId);
+                StoredMessage<?> message = _storeReader.getMessage(messageId);
                 if(message != null)
                 {
                     StorableMessageMetaData metaData = message.getMetaData();
@@ -234,6 +237,7 @@ public class AsynchronousMessageStoreRec
             {
                 Thread.currentThread().interrupt();
             }
+            _storeReader.close();
         }
 
 
@@ -241,23 +245,20 @@ public class AsynchronousMessageStoreRec
         {
 
 
-
             @Override
-            public boolean handle(long format,
-                                  byte[] globalId,
-                                  byte[] branchId,
-                                  Transaction.Record[] enqueues,
-                                  Transaction.Record[] dequeues)
+            public boolean handle(final Transaction.StoredXidRecord storedXid,
+                                  final Transaction.EnqueueRecord[] enqueues,
+                                  final Transaction.DequeueRecord[] dequeues)
             {
-                Xid id = new Xid(format, globalId, branchId);
+                Xid id = new Xid(storedXid.getFormat(), storedXid.getGlobalId(), storedXid.getBranchId());
                 DtxRegistry dtxRegistry = getVirtualHost().getDtxRegistry();
                 DtxBranch branch = dtxRegistry.getBranch(id);
                 if (branch == null)
                 {
-                    branch = new DtxBranch(id, getStore(), getVirtualHost());
+                    branch = new DtxBranch(storedXid, _store, getVirtualHost());
                     dtxRegistry.registerBranch(branch);
                 }
-                for (Transaction.Record record : enqueues)
+                for (Transaction.EnqueueRecord record : enqueues)
                 {
                     final AMQQueue<?> queue = getVirtualHost().getQueue(record.getResource().getId());
                     if (queue != null)
@@ -269,22 +270,32 @@ public class AsynchronousMessageStoreRec
                         {
                             final MessageReference<?> ref = message.newReference();
 
-                            branch.enqueue(queue, message);
+                            final MessageEnqueueRecord[] records = new MessageEnqueueRecord[1];
 
+                            branch.enqueue(queue, message, new Action<MessageEnqueueRecord>()
+                            {
+                                @Override
+                                public void performAction(final MessageEnqueueRecord record)
+                                {
+                                    records[0] = record;
+                                }
+                            });
                             branch.addPostTransactionAction(new ServerTransaction.Action()
                             {
-
+                                @Override
                                 public void postCommit()
                                 {
-                                    queue.enqueue(message, null);
+                                    queue.enqueue(message, null, records[0]);
                                     ref.release();
                                 }
 
+                                @Override
                                 public void onRollback()
                                 {
                                     ref.release();
                                 }
                             });
+
                         }
                         else
                         {
@@ -306,10 +317,10 @@ public class AsynchronousMessageStoreRec
 
                     }
                 }
-                for (Transaction.Record record : dequeues)
+                for (Transaction.DequeueRecord record : dequeues)
                 {
 
-                    final AMQQueue<?> queue = getVirtualHost().getQueue(record.getResource().getId());
+                    final AMQQueue<?> queue = getVirtualHost().getQueue(record.getEnqueueRecord().getQueueId());
 
                     if (queue != null)
                     {
@@ -321,7 +332,7 @@ public class AsynchronousMessageStoreRec
                             recoverQueue(queue);
                         }
 
-                        final long messageId = record.getMessage().getMessageNumber();
+                        final long messageId = record.getEnqueueRecord().getMessageNumber();
                         final ServerMessage<?> message = getRecoveredMessage(messageId);
 
                         if (message != null)
@@ -330,7 +341,7 @@ public class AsynchronousMessageStoreRec
 
                             entry.acquire();
 
-                            branch.dequeue(queue, message);
+                            branch.dequeue(entry.getEnqueueRecord());
 
                             branch.addPostTransactionAction(new ServerTransaction.Action()
                             {
@@ -362,8 +373,8 @@ public class AsynchronousMessageStoreRec
                         StringBuilder xidString = xidAsString(id);
                         getEventLogger().message(getLogSubject(),
                                                         TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
-                                                                                                   record.getResource()
-                                                                                                           .getId()
+                                                                                                   record.getEnqueueRecord()
+                                                                                                           .getQueueId()
                                                                                                            .toString()));
                     }
 
@@ -427,8 +438,9 @@ public class AsynchronousMessageStoreRec
             }
 
             @Override
-            public boolean handle(final UUID queueId, long messageId)
+            public boolean handle(final MessageEnqueueRecord record)
             {
+                long messageId = record.getMessageNumber();
                 String queueName = _queue.getName();
 
                 if(messageId < _maxMessageId)
@@ -442,7 +454,7 @@ public class AsynchronousMessageStoreRec
                             _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
                         }
 
-                        _queue.recover(message);
+                        _queue.recover(message, record);
                         _recoveredCount++;
                     }
                     else
@@ -452,8 +464,8 @@ public class AsynchronousMessageStoreRec
                                      + " referenced in log as enqueued in queue "
                                      + queueName
                                      + " is unknown, entry will be discarded");
-                        Transaction txn = getStore().newTransaction();
-                        txn.dequeueMessage(_queue, new DummyMessage(messageId));
+                        Transaction txn = _store.newTransaction();
+                        txn.dequeueMessage(record);
                         txn.commitTranAsync();
                     }
                     return _continueRecovery.get();
@@ -472,31 +484,5 @@ public class AsynchronousMessageStoreRec
         }
     }
 
-    private static class DummyMessage implements EnqueueableMessage
-    {
-
-        private final long _messageId;
-
-        public DummyMessage(long messageId)
-        {
-            _messageId = messageId;
-        }
-
-        public long getMessageNumber()
-        {
-            return _messageId;
-        }
-
-        public boolean isPersistent()
-        {
-            return true;
-        }
-
-        public StoredMessage getStoredMessage()
-        {
-            return null;
-        }
-    }
-
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java Fri Apr  3 22:21:05 2015
@@ -33,25 +33,24 @@ import org.apache.qpid.server.logging.Ev
 import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 import org.apache.qpid.server.logging.messages.TransactionLogMessages;
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.Transaction.Record;
-import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Transaction.EnqueueRecord;
 import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
 import org.apache.qpid.server.store.handler.MessageHandler;
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.apache.qpid.server.txn.DtxBranch;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.Xid;
 import org.apache.qpid.transport.util.Functions;
 
@@ -64,6 +63,7 @@ public class SynchronousMessageStoreReco
     {
         EventLogger eventLogger = virtualHost.getEventLogger();
         MessageStore store = virtualHost.getMessageStore();
+        MessageStore.MessageStoreReader storeReader = store.newMessageStoreReader();
         MessageStoreLogSubject logSubject = new MessageStoreLogSubject(virtualHost.getName(), store.getClass().getSimpleName());
 
         Map<String, Integer> queueRecoveries = new TreeMap<>();
@@ -73,10 +73,10 @@ public class SynchronousMessageStoreReco
 
         eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_START());
 
-        store.visitMessages(new MessageVisitor(recoveredMessages, unusedMessages));
+        storeReader.visitMessages(new MessageVisitor(recoveredMessages, unusedMessages));
 
         eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_START(null, false));
-        store.visitMessageInstances(new MessageInstanceVisitor(virtualHost, store, queueRecoveries,
+        storeReader.visitMessageInstances(new MessageInstanceVisitor(virtualHost, store, queueRecoveries,
                                                                recoveredMessages, unusedMessages));
         for(Map.Entry<String,Integer> entry : queueRecoveries.entrySet())
         {
@@ -95,7 +95,7 @@ public class SynchronousMessageStoreReco
             }
         }
 
-        store.visitDistributedTransactions(new DistributedTransactionVisitor(virtualHost, store, eventLogger,
+        storeReader.visitDistributedTransactions(new DistributedTransactionVisitor(virtualHost, store, eventLogger,
                                                                              logSubject, recoveredMessages, unusedMessages));
 
 
@@ -174,8 +174,10 @@ public class SynchronousMessageStoreReco
         }
 
         @Override
-        public boolean handle(final UUID queueId, long messageId)
+        public boolean handle(final MessageEnqueueRecord record)
         {
+            final UUID queueId = record.getQueueId();
+            long messageId = record.getMessageNumber();
             AMQQueue<?> queue = _virtualHost.getQueue(queueId);
             if(queue != null)
             {
@@ -196,7 +198,7 @@ public class SynchronousMessageStoreReco
                         count = 0;
                     }
 
-                    queue.recover(message);
+                    queue.recover(message, record);
 
                     _queueRecoveries.put(queueName, ++count);
                 }
@@ -204,7 +206,7 @@ public class SynchronousMessageStoreReco
                 {
                     _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
                     Transaction txn = _store.newTransaction();
-                    txn.dequeueMessage(queue, new DummyMessage(messageId));
+                    txn.dequeueMessage(record);
                     txn.commitTranAsync();
                 }
             }
@@ -212,28 +214,7 @@ public class SynchronousMessageStoreReco
             {
                 _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
                 Transaction txn = _store.newTransaction();
-                TransactionLogResource mockQueue =
-                        new TransactionLogResource()
-                        {
-                            @Override
-                            public String getName()
-                            {
-                                return "<<UNKNOWN>>";
-                            }
-
-                            @Override
-                            public UUID getId()
-                            {
-                                return queueId;
-                            }
-
-                            @Override
-                            public MessageDurability getMessageDurability()
-                            {
-                                return MessageDurability.DEFAULT;
-                            }
-                        };
-                txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
+                txn.dequeueMessage(record);
                 txn.commitTranAsync();
             }
             return true;
@@ -267,17 +248,19 @@ public class SynchronousMessageStoreReco
         }
 
         @Override
-        public boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+        public boolean handle(final Transaction.StoredXidRecord storedXid,
+                              final Transaction.EnqueueRecord[] enqueues,
+                              final Transaction.DequeueRecord[] dequeues)
         {
-            Xid id = new Xid(format, globalId, branchId);
+            Xid id = new Xid(storedXid.getFormat(), storedXid.getGlobalId(), storedXid.getBranchId());
             DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
             DtxBranch branch = dtxRegistry.getBranch(id);
             if(branch == null)
             {
-                branch = new DtxBranch(id, _store, _virtualHost);
+                branch = new DtxBranch(storedXid, _store, _virtualHost);
                 dtxRegistry.registerBranch(branch);
             }
-            for(Transaction.Record record : enqueues)
+            for(EnqueueRecord record : enqueues)
             {
                 final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
                 if(queue != null)
@@ -289,23 +272,32 @@ public class SynchronousMessageStoreReco
                     if(message != null)
                     {
                         final MessageReference<?> ref = message.newReference();
+                        final MessageEnqueueRecord[] records = new MessageEnqueueRecord[1];
 
-                        branch.enqueue(queue,message);
-
+                        branch.enqueue(queue, message, new Action<MessageEnqueueRecord>()
+                        {
+                            @Override
+                            public void performAction(final MessageEnqueueRecord record)
+                            {
+                                records[0] = record;
+                            }
+                        });
                         branch.addPostTransactionAction(new ServerTransaction.Action()
                         {
-
+                            @Override
                             public void postCommit()
                             {
-                                queue.enqueue(message, null);
+                                queue.enqueue(message, null, records[0]);
                                 ref.release();
                             }
 
+                            @Override
                             public void onRollback()
                             {
                                 ref.release();
                             }
                         });
+
                     }
                     else
                     {
@@ -324,12 +316,12 @@ public class SynchronousMessageStoreReco
 
                 }
             }
-            for(Transaction.Record record : dequeues)
+            for(Transaction.DequeueRecord record : dequeues)
             {
-                final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
+                final AMQQueue<?> queue = _virtualHost.getQueue(record.getEnqueueRecord().getQueueId());
                 if(queue != null)
                 {
-                    final long messageId = record.getMessage().getMessageNumber();
+                    final long messageId = record.getEnqueueRecord().getMessageNumber();
                     final ServerMessage<?> message = _recoveredMessages.get(messageId);
                     _unusedMessages.remove(messageId);
 
@@ -339,7 +331,7 @@ public class SynchronousMessageStoreReco
 
                         entry.acquire();
 
-                        branch.dequeue(queue, message);
+                        branch.dequeue(entry.getEnqueueRecord());
 
                         branch.addPostTransactionAction(new ServerTransaction.Action()
                         {
@@ -370,7 +362,7 @@ public class SynchronousMessageStoreReco
                     StringBuilder xidString = xidAsString(id);
                     _eventLogger.message(_logSubject,
                                       TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
-                                                                                 record.getResource().getId().toString()));
+                                                                                 record.getEnqueueRecord().getQueueId().toString()));
                 }
 
             }
@@ -395,30 +387,4 @@ public class SynchronousMessageStoreReco
     }
 
 
-    private static class DummyMessage implements EnqueueableMessage
-    {
-
-        private final long _messageId;
-
-        public DummyMessage(long messageId)
-        {
-            _messageId = messageId;
-        }
-
-        public long getMessageNumber()
-        {
-            return _messageId;
-        }
-
-        public boolean isPersistent()
-        {
-            return true;
-        }
-
-        public StoredMessage getStoredMessage()
-        {
-            return null;
-        }
-    }
-
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Fri Apr  3 22:21:05 2015
@@ -503,7 +503,7 @@ public class TopicExchangeTest extends Q
         when(message.getMessageNumber()).thenReturn(messageNumber);
         for(BaseQueue q : queues)
         {
-            q.enqueue(message, null);
+            q.enqueue(message, null, null);
         }
 
         return queues.size();

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Fri Apr  3 22:21:05 2015
@@ -179,7 +179,7 @@ abstract class AbstractQueueTestBase ext
                      _queue.getConsumerCountWithCredit());
 
         // Check sending a message ends up with the subscriber
-        _queue.enqueue(messageA, null);
+        _queue.enqueue(messageA, null, null);
         try
         {
             Thread.sleep(2000L);
@@ -198,7 +198,7 @@ abstract class AbstractQueueTestBase ext
                     1 == _queue.getConsumerCountWithCredit());
 
         ServerMessage messageB = createMessage(new Long (25));
-        _queue.enqueue(messageB, null);
+        _queue.enqueue(messageB, null, null);
          assertNull(_consumer.getQueueContext());
 
     }
@@ -206,7 +206,7 @@ abstract class AbstractQueueTestBase ext
     public void testEnqueueMessageThenRegisterConsumer() throws Exception, InterruptedException
     {
         ServerMessage messageA = createMessage(new Long(24));
-        _queue.enqueue(messageA, null);
+        _queue.enqueue(messageA, null, null);
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                        EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                   ConsumerImpl.Option.SEES_REQUEUES));
@@ -223,8 +223,8 @@ abstract class AbstractQueueTestBase ext
     {
         ServerMessage messageA = createMessage(new Long(24));
         ServerMessage messageB = createMessage(new Long(25));
-        _queue.enqueue(messageA, null);
-        _queue.enqueue(messageB, null);
+        _queue.enqueue(messageA, null, null);
+        _queue.enqueue(messageB, null, null);
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                        EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                   ConsumerImpl.Option.SEES_REQUEUES));
@@ -255,9 +255,9 @@ abstract class AbstractQueueTestBase ext
 
         /* Enqueue three messages */
 
-        _queue.enqueue(messageA, postEnqueueAction);
-        _queue.enqueue(messageB, postEnqueueAction);
-        _queue.enqueue(messageC, postEnqueueAction);
+        _queue.enqueue(messageA, postEnqueueAction, null);
+        _queue.enqueue(messageB, postEnqueueAction, null);
+        _queue.enqueue(messageC, postEnqueueAction, null);
 
         Thread.sleep(150);  // Work done by QueueRunner Thread
 
@@ -306,7 +306,7 @@ abstract class AbstractQueueTestBase ext
         final long expiration = System.currentTimeMillis() + messageExpirationOffset;
         when(messageA.getExpiration()).thenReturn(expiration);
 
-        _queue.enqueue(messageA, postEnqueueAction);
+        _queue.enqueue(messageA, postEnqueueAction, null);
 
         int subFlushWaitTime = 150;
         Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
@@ -354,9 +354,9 @@ abstract class AbstractQueueTestBase ext
 
         /* Enqueue three messages */
 
-        _queue.enqueue(messageA, postEnqueueAction);
-        _queue.enqueue(messageB, postEnqueueAction);
-        _queue.enqueue(messageC, postEnqueueAction);
+        _queue.enqueue(messageA, postEnqueueAction, null);
+        _queue.enqueue(messageB, postEnqueueAction, null);
+        _queue.enqueue(messageC, postEnqueueAction, null);
 
         Thread.sleep(150);  // Work done by QueueRunner Thread
 
@@ -412,8 +412,8 @@ abstract class AbstractQueueTestBase ext
 
         /* Enqueue two messages */
 
-        _queue.enqueue(messageA, postEnqueueAction);
-        _queue.enqueue(messageB, postEnqueueAction);
+        _queue.enqueue(messageA, postEnqueueAction, null);
+        _queue.enqueue(messageB, postEnqueueAction, null);
 
         Thread.sleep(150);  // Work done by QueueRunner Thread
 
@@ -450,7 +450,7 @@ abstract class AbstractQueueTestBase ext
                      _queue.getConsumerCountWithCredit());
 
         // Check sending a message ends up with the subscriber
-        _queue.enqueue(messageA, null);
+        _queue.enqueue(messageA, null, null);
         try
         {
             Thread.sleep(2000L);
@@ -517,7 +517,7 @@ abstract class AbstractQueueTestBase ext
                 _consumer.resend(entry);
 
             }
-        });
+        }, null);
 
 
 
@@ -530,7 +530,7 @@ abstract class AbstractQueueTestBase ext
         ServerMessage message = createMessage(messageId);
 
         // Put message on queue
-        _queue.enqueue(message, null);
+        _queue.enqueue(message, null, null);
         // Get message id
         Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
 
@@ -546,7 +546,7 @@ abstract class AbstractQueueTestBase ext
             Long messageId = new Long(i);
             ServerMessage message = createMessage(messageId);
             // Put message on queue
-            _queue.enqueue(message, null);
+            _queue.enqueue(message, null, null);
         }
         // Get message ids
         List<Long> msgids = _queue.getMessagesOnTheQueue(5);
@@ -567,7 +567,7 @@ abstract class AbstractQueueTestBase ext
             Long messageId = new Long(i);
             ServerMessage message = createMessage(messageId);
             // Put message on queue
-            _queue.enqueue(message, null);
+            _queue.enqueue(message, null, null);
         }
         // Get message ids
         List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
@@ -588,7 +588,7 @@ abstract class AbstractQueueTestBase ext
             Long messageId = new Long(i);
             ServerMessage message = createMessage(messageId);
             // Put message on queue
-            _queue.enqueue(message, null);
+            _queue.enqueue(message, null, null);
         }
 
         // Get non-existent 0th QueueEntry & check returned list was empty
@@ -763,10 +763,10 @@ abstract class AbstractQueueTestBase ext
         _queue.setAttributes(Collections.<String, Object>singletonMap(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
                                                                       Integer.valueOf(2)));
 
-        _queue.enqueue(createMessage(new Long(24)), null);
+        _queue.enqueue(createMessage(new Long(24)), null, null);
         verifyZeroInteractions(listener);
 
-        _queue.enqueue(createMessage(new Long(25)), null);
+        _queue.enqueue(createMessage(new Long(25)), null, null);
 
         verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
     }
@@ -775,9 +775,9 @@ abstract class AbstractQueueTestBase ext
     {
         QueueNotificationListener  listener = mock(QueueNotificationListener .class);
 
-        _queue.enqueue(createMessage(new Long(24)), null);
-        _queue.enqueue(createMessage(new Long(25)), null);
-        _queue.enqueue(createMessage(new Long(26)), null);
+        _queue.enqueue(createMessage(new Long(24)), null, null);
+        _queue.enqueue(createMessage(new Long(25)), null, null);
+        _queue.enqueue(createMessage(new Long(26)), null, null);
 
         _queue.setNotificationListener(listener);
         _queue.setAttributes(Collections.<String, Object>singletonMap(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
@@ -861,9 +861,9 @@ abstract class AbstractQueueTestBase ext
     public void testOldestMessage()
     {
         AMQQueue<?> queue = getQueue();
-        queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("sortKey", (Object) "Z"), 10l), null);
-        queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("sortKey", (Object) "M"), 100l), null);
-        queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("sortKey", (Object) "A"), 1000l), null);
+        queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("sortKey", (Object) "Z"), 10l), null, null);
+        queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("sortKey", (Object) "M"), 100l), null, null);
+        queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("sortKey", (Object) "A"), 1000l), null, null);
 
         assertEquals(10l,queue.getOldestMessageArrivalTime());
     }
@@ -875,7 +875,7 @@ abstract class AbstractQueueTestBase ext
         ServerMessage message = createMessage(1l);
         when(message.getArrivalTime()).thenReturn(arrivalTime);
         when(message.getExpiration()).thenReturn(expiration);
-        queue.enqueue(message,null);
+        queue.enqueue(message,null, null);
         queue.visit(new QueueEntryVisitor()
         {
             @Override
@@ -940,7 +940,7 @@ abstract class AbstractQueueTestBase ext
             message = createMessage((long)i);
 
             // Put message on queue
-            queue.enqueue(message,null);
+            queue.enqueue(message,null, null);
 
         }
         try

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java Fri Apr  3 22:21:05 2015
@@ -86,7 +86,7 @@ public class LastValueQueueListTest exte
     {
         ServerMessage message = createTestServerMessage(null);
 
-        _list.add(message);
+        _list.add(message, null);
         int numberOfEntries = countEntries(_list);
         assertEquals(1, numberOfEntries);
     }
@@ -95,7 +95,7 @@ public class LastValueQueueListTest exte
     {
         ServerMessage message = createTestServerMessage(null);
 
-        QueueEntry addedEntry = _list.add(message);
+        QueueEntry addedEntry = _list.add(message, null);
         addedEntry.acquire();
         addedEntry.delete();
 
@@ -107,7 +107,7 @@ public class LastValueQueueListTest exte
     {
         ServerMessage message = createTestServerMessage(TEST_KEY_VALUE);
 
-        _list.add(message);
+        _list.add(message, null);
         int numberOfEntries = countEntries(_list);
         assertEquals(1, numberOfEntries);
     }
@@ -116,7 +116,7 @@ public class LastValueQueueListTest exte
     {
         ServerMessage message = createTestServerMessage(TEST_KEY_VALUE);
 
-        QueueEntry addedEntry = _list.add(message);
+        QueueEntry addedEntry = _list.add(message, null);
         addedEntry.acquire();
         addedEntry.delete();
 
@@ -129,8 +129,8 @@ public class LastValueQueueListTest exte
         ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE1);
         ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE2);
 
-        _list.add(message1);
-        _list.add(message2);
+        _list.add(message1, null);
+        _list.add(message2, null);
 
         int numberOfEntries = countEntries(_list);
         assertEquals(2, numberOfEntries);
@@ -141,8 +141,8 @@ public class LastValueQueueListTest exte
         ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE);
         ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE);
 
-        _list.add(message1);
-        _list.add(message2);
+        _list.add(message1, null);
+        _list.add(message2, null);
 
         int numberOfEntries = countEntries(_list);
         assertEquals(1, numberOfEntries);
@@ -153,10 +153,10 @@ public class LastValueQueueListTest exte
         ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE);
         ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE);
 
-        QueueEntry entry1 = _list.add(message1);
+        QueueEntry entry1 = _list.add(message1, null);
         entry1.acquire(); // simulate an in-progress delivery to consumer
 
-        _list.add(message2);
+        _list.add(message2, null);
         assertFalse(entry1.isDeleted());
 
         assertEquals(2, countEntries(_list));
@@ -173,7 +173,7 @@ public class LastValueQueueListTest exte
 
         ServerMessage message = createTestServerMessage(TEST_KEY_VALUE);
 
-        QueueEntry addedEntry = _list.add(message);
+        QueueEntry addedEntry = _list.add(message, null);
 
         assertEquals(1, countEntries(_list));
         assertEquals(1, _list.getLatestValuesMap().size());
@@ -193,8 +193,8 @@ public class LastValueQueueListTest exte
         ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE1);
         ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE2);
 
-        QueueEntry addedEntry1 = _list.add(message1);
-        QueueEntry addedEntry2 = _list.add(message2);
+        QueueEntry addedEntry1 = _list.add(message1, null);
+        QueueEntry addedEntry2 = _list.add(message2, null);
 
         assertEquals(2, countEntries(_list));
         assertEquals(2, _list.getLatestValuesMap().size());

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java Fri Apr  3 22:21:05 2015
@@ -43,11 +43,11 @@ public class LastValueQueueTest extends
     public void testOldestMessage()
     {
         AMQQueue<?> queue = getQueue();
-        queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("lvqKey", (Object) "Z"), 10l), null);
+        queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("lvqKey", (Object) "Z"), 10l), null, null);
         assertEquals(10l,queue.getOldestMessageArrivalTime());
-        queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("lvqKey", (Object) "M"), 100l), null);
+        queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("lvqKey", (Object) "M"), 100l), null, null);
         assertEquals(10l,queue.getOldestMessageArrivalTime());
-        queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("lvqKey", (Object) "Z"), 1000l), null);
+        queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("lvqKey", (Object) "Z"), 1000l), null, null);
         assertEquals(100l,queue.getOldestMessageArrivalTime());
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Fri Apr  3 22:21:05 2015
@@ -26,6 +26,7 @@ import org.apache.qpid.server.message.AM
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
@@ -64,6 +65,12 @@ public class MockMessageInstance impleme
     {
         return null;
     }
+
+    @Override
+    public MessageEnqueueRecord getEnqueueRecord()
+    {
+        return null;
+    }
 
     @Override
     public boolean isAcquiredBy(final ConsumerImpl consumer)

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java Fri Apr  3 22:21:05 2015
@@ -88,7 +88,7 @@ public class PriorityQueueListTest exten
             when(ref.getMessage()).thenReturn(message);
             when(header.getPriority()).thenReturn(PRIORITIES[i]);
 
-            entries[i] = _list.add(message);
+            entries[i] = _list.add(message, null);
         }
 
         _priority4message1 = entries[0];

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java Fri Apr  3 22:21:05 2015
@@ -48,19 +48,19 @@ public class PriorityQueueTest extends A
 
         // Enqueue messages in order
         AbstractQueue queue = (AbstractQueue) getQueue();
-        queue.enqueue(createMessage(1L, (byte) 10), null);
-        queue.enqueue(createMessage(2L, (byte) 4), null);
-        queue.enqueue(createMessage(3L, (byte) 0), null);
+        queue.enqueue(createMessage(1L, (byte) 10), null, null);
+        queue.enqueue(createMessage(2L, (byte) 4), null, null);
+        queue.enqueue(createMessage(3L, (byte) 0), null, null);
 
         // Enqueue messages in reverse order
-        queue.enqueue(createMessage(4L, (byte) 0), null);
-        queue.enqueue(createMessage(5L, (byte) 4), null);
-        queue.enqueue(createMessage(6L, (byte) 10), null);
+        queue.enqueue(createMessage(4L, (byte) 0), null, null);
+        queue.enqueue(createMessage(5L, (byte) 4), null, null);
+        queue.enqueue(createMessage(6L, (byte) 10), null, null);
 
         // Enqueue messages out of order
-        queue.enqueue(createMessage(7L, (byte) 4), null);
-        queue.enqueue(createMessage(8L, (byte) 10), null);
-        queue.enqueue(createMessage(9L, (byte) 0), null);
+        queue.enqueue(createMessage(7L, (byte) 4), null, null);
+        queue.enqueue(createMessage(8L, (byte) 10), null, null);
+        queue.enqueue(createMessage(9L, (byte) 0), null, null);
 
         // Register subscriber
         queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class));

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Fri Apr  3 22:21:05 2015
@@ -263,7 +263,7 @@ public abstract class QueueEntryImplTest
             when(reference.getMessage()).thenReturn(message);
             when(message.newReference()).thenReturn(reference);
             when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
-            QueueEntryImpl entry = (QueueEntryImpl) queueEntryList.add(message);
+            QueueEntryImpl entry = (QueueEntryImpl) queueEntryList.add(message, null);
             entries[i] = entry;
         }
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java Fri Apr  3 22:21:05 2015
@@ -56,7 +56,7 @@ public abstract class QueueEntryListTest
     public void testAddSpecificMessage()
     {
         final QueueEntryList list = getTestList();
-        list.add(getTestMessageToAdd());
+        list.add(getTestMessageToAdd(), null);
 
         final QueueEntryIterator iter = list.iterator();
         int count = 0;
@@ -77,7 +77,7 @@ public abstract class QueueEntryListTest
     {
         final QueueEntryList list = getTestList();
         final ServerMessage message = createServerMessage(666l);
-        list.add(message);
+        list.add(message, null);
 
         final QueueEntryIterator iter = list.iterator();
         int count = 0;
@@ -220,8 +220,8 @@ public abstract class QueueEntryListTest
         QueueEntryList list = getTestList(true);
         int i = 0;
 
-        QueueEntry queueEntry1 = list.add(createServerMessage(i++));
-        QueueEntry queueEntry2 = list.add(createServerMessage(i++));
+        QueueEntry queueEntry1 = list.add(createServerMessage(i++), null);
+        QueueEntry queueEntry2 = list.add(createServerMessage(i++), null);
 
         assertSame(queueEntry2, list.next(queueEntry1));
         assertNull(list.next(queueEntry2));

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java Fri Apr  3 22:21:05 2015
@@ -28,11 +28,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.Action;
@@ -60,13 +62,13 @@ public class QueueMessageRecoveryTest ex
 
         queue.open();
 
-        queue.recover(createMockMessage(0));
-        queue.enqueue(createMockMessage(4), null);
-        queue.enqueue(createMockMessage(5), null);
-        queue.recover(createMockMessage(1));
-        queue.recover(createMockMessage(2));
-        queue.enqueue(createMockMessage(6), null);
-        queue.recover(createMockMessage(3));
+        queue.recover(createMockMessage(0), createEnqueueRecord(0, queue));
+        queue.enqueue(createMockMessage(4), null, null);
+        queue.enqueue(createMockMessage(5), null, null);
+        queue.recover(createMockMessage(1), createEnqueueRecord(1, queue));
+        queue.recover(createMockMessage(2), createEnqueueRecord(2, queue));
+        queue.enqueue(createMockMessage(6), null, null);
+        queue.recover(createMockMessage(3), createEnqueueRecord(3, queue));
 
         assertEquals(4, messageList.size());
         for(int i = 0; i < 4; i++)
@@ -76,7 +78,7 @@ public class QueueMessageRecoveryTest ex
 
         queue.completeRecovery();
 
-        queue.enqueue(createMockMessage(7), null);
+        queue.enqueue(createMockMessage(7), null, null);
 
         assertEquals(8, messageList.size());
 
@@ -123,7 +125,7 @@ public class QueueMessageRecoveryTest ex
                 {
                     for(int i = 0; i < size; i++)
                     {
-                        queue.recover(createMockMessage(i));
+                        queue.recover(createMockMessage(i), createEnqueueRecord(i, queue));
                     }
                     queue.completeRecovery();
                 }
@@ -137,7 +139,7 @@ public class QueueMessageRecoveryTest ex
             {
                 for(int i = 0; i < size; i++)
                 {
-                    queue.enqueue(createMockMessage(size + i), null);
+                    queue.enqueue(createMockMessage(size + i), null, null);
                 }
             }
         }, "publishing thread");
@@ -156,6 +158,24 @@ public class QueueMessageRecoveryTest ex
         }
     }
 
+    private MessageEnqueueRecord createEnqueueRecord(final int messageNumber, final TestQueue queue)
+    {
+        return new MessageEnqueueRecord()
+        {
+            @Override
+            public UUID getQueueId()
+            {
+                return queue.getId();
+            }
+
+            @Override
+            public long getMessageNumber()
+            {
+                return messageNumber;
+            }
+        };
+    }
+
 
     private ServerMessage createMockMessage(final long i)
     {
@@ -189,7 +209,7 @@ public class QueueMessageRecoveryTest ex
         }
 
         @Override
-        protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action)
+        protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord record)
         {
             synchronized(_messageList)
             {

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java Fri Apr  3 22:21:05 2015
@@ -23,6 +23,7 @@ import org.junit.Assert;
 
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.SortedQueueEntry.Colour;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 
 /**
  * Test extension of SortedQueueEntryList that provides data structure validation tests.
@@ -42,10 +43,10 @@ public class SelfValidatingSortedQueueEn
     }
 
     @Override /** Overridden to automatically check queue properties before and after. */
-    public SortedQueueEntry add(final ServerMessage message)
+    public SortedQueueEntry add(final ServerMessage message, final MessageEnqueueRecord enqueueRecord)
     {
         assertQueueProperties(); //before add
-        final SortedQueueEntry result = super.add(message);
+        final SortedQueueEntry result = super.add(message, enqueueRecord);
         assertQueueProperties(); //after add
         return result;
     }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java Fri Apr  3 22:21:05 2015
@@ -76,7 +76,7 @@ public class SimpleQueueEntryImplTest ex
         when(reference.getMessage()).thenReturn(message);
         when(message.newReference()).thenReturn(reference);
         when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
-        return (QueueEntryImpl) queueEntryList.add(message);
+        return (QueueEntryImpl) queueEntryList.add(message, null);
     }
 
     public void testCompareTo()

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Fri Apr  3 22:21:05 2015
@@ -129,7 +129,7 @@ public class SortedQueueEntryListTest ex
         for(final String key : keys)
         {
             final ServerMessage msg = generateTestMessage(messageId++, key);
-            _sqel.add(msg);
+            _sqel.add(msg, null);
         }
 
     }
@@ -224,7 +224,7 @@ public class SortedQueueEntryListTest ex
         while(messageId < 200)
         {
             final ServerMessage msg = generateTestMessage(messageId++, "samekey");
-            _sqel.add(msg);
+            _sqel.add(msg, null);
         }
 
         final QueueEntryIterator iter = getTestList().iterator();
@@ -245,7 +245,7 @@ public class SortedQueueEntryListTest ex
         while(messageId < 200)
         {
             final ServerMessage msg = generateTestMessage(messageId++, null);
-            _sqel.add(msg);
+            _sqel.add(msg, null);
         }
 
         final QueueEntryIterator iter = getTestList().iterator();
@@ -267,7 +267,7 @@ public class SortedQueueEntryListTest ex
         {
             final ServerMessage msg = generateTestMessage(messageId, textKey);
             messageId++;
-            _sqel.add(msg);
+            _sqel.add(msg, null);
         }
 
         final QueueEntryIterator iter = getTestList().iterator();
@@ -290,7 +290,7 @@ public class SortedQueueEntryListTest ex
         {
             final ServerMessage msg = generateTestMessage(messageId, textkeys[i]);
             messageId++;
-            _sqel.add(msg);
+            _sqel.add(msg, null);
         }
 
         final QueueEntryIterator iter = getTestList().iterator();
@@ -308,13 +308,13 @@ public class SortedQueueEntryListTest ex
         _sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
 
         ServerMessage msg = generateTestMessage(1, "A");
-        _sqel.add(msg);
+        _sqel.add(msg, null);
 
         SortedQueueEntry entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "A", 1);
 
         msg = generateTestMessage(2, "B");
-        _sqel.add(msg);
+        _sqel.add(msg, null);
 
         entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "A", 1);
@@ -328,13 +328,13 @@ public class SortedQueueEntryListTest ex
         _sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
 
         ServerMessage msg = generateTestMessage(1, "B");
-        _sqel.add(msg);
+        _sqel.add(msg, null);
 
         SortedQueueEntry entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "B", 1);
 
         msg = generateTestMessage(2, "A");
-        _sqel.add(msg);
+        _sqel.add(msg, null);
 
         entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "A", 2);
@@ -348,12 +348,12 @@ public class SortedQueueEntryListTest ex
         _sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
 
         ServerMessage msg = generateTestMessage(1, "A");
-        _sqel.add(msg);
+        _sqel.add(msg, null);
         SortedQueueEntry entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "A", 1);
 
         msg = generateTestMessage(2, "C");
-        _sqel.add(msg);
+        _sqel.add(msg, null);
 
         entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "A", 1);
@@ -362,7 +362,7 @@ public class SortedQueueEntryListTest ex
         validateEntry(entry, "C", 2);
 
         msg = generateTestMessage(3, "B");
-        _sqel.add(msg);
+        _sqel.add(msg, null);
 
         entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "A", 1);
@@ -379,13 +379,13 @@ public class SortedQueueEntryListTest ex
         _sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
 
         ServerMessage msg = generateTestMessage(1, "B");
-        _sqel.add(msg);
+        _sqel.add(msg, null);
 
         SortedQueueEntry entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "B", 1);
 
         msg = generateTestMessage(2, "D");
-        _sqel.add(msg);
+        _sqel.add(msg, null);
 
         entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "B", 1);
@@ -394,7 +394,7 @@ public class SortedQueueEntryListTest ex
         validateEntry(entry, "D", 2);
 
         msg = generateTestMessage(3, "C");
-        _sqel.add(msg);
+        _sqel.add(msg, null);
 
         entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "B", 1);
@@ -406,7 +406,7 @@ public class SortedQueueEntryListTest ex
         validateEntry(entry, "D", 2);
 
         msg = generateTestMessage(4, "A");
-        _sqel.add(msg);
+        _sqel.add(msg, null);
 
         entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "A", 4);

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java Fri Apr  3 22:21:05 2015
@@ -103,7 +103,7 @@ public class SortedQueueEntryTest extend
         when(reference.getMessage()).thenReturn(message);
         when(message.newReference()).thenReturn(reference);
         when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
-        return _queueEntryList.add(message);
+        return _queueEntryList.add(message, null);
     }
 
     public void testCompareTo()

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Fri Apr  3 22:21:05 2015
@@ -81,7 +81,7 @@ public class StandardQueueEntryListTest
             when(message.newReference()).thenReturn(ref);
             when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
 
-            final QueueEntry bleh = _sqel.add(message);
+            final QueueEntry bleh = _sqel.add(message, null);
             assertNotNull("QE should not have been null", bleh);
         }
     }
@@ -173,7 +173,7 @@ public class StandardQueueEntryListTest
             when(ref.getMessage()).thenReturn(message);
             when(message.newReference()).thenReturn(ref);
             when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
-            QueueEntry bleh = sqel.add(message);
+            QueueEntry bleh = sqel.add(message, null);
             assertNotNull("QE should not have been null", bleh);
             entriesMap.put(i,bleh);
         }
@@ -276,7 +276,7 @@ public class StandardQueueEntryListTest
             when(reference.getMessage()).thenReturn(message);
             when(message.newReference()).thenReturn(reference);
             when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
-            entries[i] = (OrderedQueueEntry) queueEntryList.add(message);
+            entries[i] = (OrderedQueueEntry) queueEntryList.add(message, null);
         }
 
         // test getNext for not acquired entries



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message