qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1671184 [4/5] - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/...
Date Fri, 03 Apr 2015 22:21:06 GMT
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Fri Apr  3 22:21:05 2015
@@ -36,6 +36,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class StandardQueueTest extends AbstractQueueTestBase
@@ -59,7 +60,7 @@ public class StandardQueueTest extends A
                                        EnumSet.of(ConsumerImpl.Option.ACQUIRES,
                                                   ConsumerImpl.Option.SEES_REQUEUES));
 
-        getQueue().enqueue(message, null);
+        getQueue().enqueue(message, null, null);
         consumer.close();
         assertTrue("Queue was not deleted when consumer was removed",
                    getQueue().isDeleted());
@@ -290,7 +291,8 @@ public class StandardQueueTest extends A
          * Entries with even message id are considered
          * dequeued!
          */
-        protected DequeuedQueueEntry createQueueEntry(final ServerMessage message)
+        protected DequeuedQueueEntry createQueueEntry(final ServerMessage message,
+                                                      final MessageEnqueueRecord enqueueRecord)
         {
             return new DequeuedQueueEntry(this, message);
         }
@@ -311,7 +313,7 @@ public class StandardQueueTest extends A
 
         public DequeuedQueueEntry(DequeuedQueueEntryList list, final ServerMessage message)
         {
-            super(list, message);
+            super(list, message, null);
             _message = message;
         }
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java Fri Apr  3 22:21:05 2015
@@ -114,9 +114,10 @@ public abstract class MessageStoreQuotaE
     protected EnqueueableMessage addMessage(long id)
     {
         StorableMessageMetaData metaData = createMetaData(id, MESSAGE_DATA.length);
-        StoredMessage<?> handle = _store.addMessage(metaData);
-        handle.addContent(0, ByteBuffer.wrap(MESSAGE_DATA));
-        TestMessage message = new TestMessage(id, handle);
+        MessageHandle<?> handle = _store.addMessage(metaData);
+        handle.addContent(ByteBuffer.wrap(MESSAGE_DATA));
+        StoredMessage<? extends StorableMessageMetaData> storedMessage = handle.allContentAdded();
+        TestMessage message = new TestMessage(id, storedMessage);
         return message;
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java Fri Apr  3 22:21:05 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -40,7 +41,7 @@ import org.apache.qpid.server.message.En
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.Transaction.EnqueueRecord;
 import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
 import org.apache.qpid.server.store.handler.MessageHandler;
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
@@ -50,6 +51,7 @@ public abstract class MessageStoreTestCa
 {
     private MessageStore _store;
     private ConfiguredObject<?> _parent;
+    private MessageStore.MessageStoreReader _storeReader;
 
     public void setUp() throws Exception
     {
@@ -60,6 +62,7 @@ public abstract class MessageStoreTestCa
         _store = createMessageStore();
 
         _store.openMessageStore(_parent);
+        _storeReader = _store.newMessageStoreReader();
     }
 
     protected abstract VirtualHost createVirtualHost();
@@ -73,52 +76,56 @@ public abstract class MessageStoreTestCa
 
     protected void reopenStore() throws Exception
     {
+        _storeReader.close();
         _store.closeMessageStore();
 
         _store = createMessageStore();
         _store.openMessageStore(_parent);
+        _storeReader = _store.newMessageStoreReader();
+
     }
 
     public void testAddAndRemoveRecordXid() throws Exception
     {
         long format = 1l;
-        Record enqueueRecord = getTestRecord(1);
-        Record dequeueRecord = getTestRecord(2);
-        Record[] enqueues = { enqueueRecord };
-        Record[] dequeues = { dequeueRecord };
+        EnqueueRecord enqueueRecord = getTestRecord(1);
+        TestRecord dequeueRecord = getTestRecord(2);
+        EnqueueRecord[] enqueues = { enqueueRecord };
+        TestRecord[] dequeues = { dequeueRecord };
         byte[] globalId = new byte[] { 1 };
         byte[] branchId = new byte[] { 2 };
 
         Transaction transaction = _store.newTransaction();
-        transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
+        final Transaction.StoredXidRecord record =
+                transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
         transaction.commitTran();
 
         reopenStore();
 
         DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class);
-        _store.visitDistributedTransactions(handler);
-        verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues);
+        _storeReader.visitDistributedTransactions(handler);
+        verify(handler, times(1)).handle(eq(record), argThat(new RecordMatcher(enqueues)), argThat(new DequeueRecordMatcher(dequeues)));
 
         transaction = _store.newTransaction();
-        transaction.removeXid(1l, globalId, branchId);
+        transaction.removeXid(record);
         transaction.commitTran();
 
         reopenStore();
 
         handler = mock(DistributedTransactionHandler.class);
-        _store.visitDistributedTransactions(handler);
-        verify(handler, never()).handle(format,globalId, branchId, enqueues, dequeues);
+        _storeReader.visitDistributedTransactions(handler);
+        verify(handler, never()).handle(eq(record), argThat(new RecordMatcher(enqueues)), argThat(new DequeueRecordMatcher(dequeues)));
     }
 
     public void testVisitMessages() throws Exception
     {
         long messageId = 1;
         int contentSize = 0;
-        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
-        enqueueMessage(message, "dummyQ");
+        final MessageHandle<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+        enqueueMessage(message.allContentAdded(), "dummyQ");
 
         MessageHandler handler = mock(MessageHandler.class);
-        _store.visitMessages(handler);
+        _storeReader.visitMessages(handler);
 
         verify(handler, times(1)).handle(argThat(new MessageMetaDataMatcher(messageId)));
 
@@ -176,14 +183,14 @@ public abstract class MessageStoreTestCa
         int contentSize = 0;
         for (int i = 0; i < 3; i++)
         {
-            final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
-            enqueueMessage(message, "dummyQ");
+            final MessageHandle<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
+            enqueueMessage(message.allContentAdded(), "dummyQ");
         }
 
         MessageHandler handler = mock(MessageHandler.class);
         when(handler.handle(any(StoredMessage.class))).thenReturn(true, false);
 
-        _store.visitMessages(handler);
+        _storeReader.visitMessages(handler);
 
         verify(handler, times(2)).handle(any(StoredMessage.class));
     }
@@ -193,14 +200,14 @@ public abstract class MessageStoreTestCa
         int contentSize = 0;
         for (int i = 0; i < 3; i++)
         {
-            final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
+            final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)).allContentAdded();
             enqueueMessage(message, "dummyQ");
 
         }
 
         reopenStore();
 
-        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize));
+        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize)).allContentAdded();
 
         enqueueMessage(message, "dummyQ");
 
@@ -212,7 +219,7 @@ public abstract class MessageStoreTestCa
     {
         long messageId = 1;
         int contentSize = 0;
-        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)).allContentAdded();
 
         EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message);
 
@@ -224,9 +231,8 @@ public abstract class MessageStoreTestCa
         transaction.commitTran();
 
         MessageInstanceHandler handler = mock(MessageInstanceHandler.class);
-        _store.visitMessageInstances(handler);
-
-        verify(handler, times(1)).handle(queueId, messageId);
+        _storeReader.visitMessageInstances(handler);
+        verify(handler, times(1)).handle(argThat(new EnqueueRecordMatcher(queueId, messageId)));
     }
 
     public void testVisitDistributedTransactions() throws Exception
@@ -234,19 +240,22 @@ public abstract class MessageStoreTestCa
         long format = 1l;
         byte[] branchId = new byte[] { 2 };
         byte[] globalId = new byte[] { 1 };
-        Record enqueueRecord = getTestRecord(1);
-        Record dequeueRecord = getTestRecord(2);
-        Record[] enqueues = { enqueueRecord };
-        Record[] dequeues = { dequeueRecord };
+        EnqueueRecord enqueueRecord = getTestRecord(1);
+        TestRecord dequeueRecord = getTestRecord(2);
+        EnqueueRecord[] enqueues = { enqueueRecord };
+        TestRecord[] dequeues = { dequeueRecord };
 
         Transaction transaction = _store.newTransaction();
-        transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
+        final Transaction.StoredXidRecord record =
+                transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
         transaction.commitTran();
 
         DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class);
-        _store.visitDistributedTransactions(handler);
+        _storeReader.visitDistributedTransactions(handler);
 
-        verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues);
+        verify(handler, times(1)).handle(eq(record),
+                                         argThat(new RecordMatcher(enqueues)),
+                                         argThat(new DequeueRecordMatcher(dequeues)));
 
     }
 
@@ -267,7 +276,7 @@ public abstract class MessageStoreTestCa
         txn.commitTran();
 
         QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
-        getStore().visitMessageInstances(filter);
+        _storeReader.visitMessageInstances(filter);
         Set<Long> enqueuedIds = filter.getEnqueuedIds();
 
         assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
@@ -298,7 +307,7 @@ public abstract class MessageStoreTestCa
         txn.commitTran();
 
         QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
-        getStore().visitMessageInstances(filter);
+        _storeReader.visitMessageInstances(filter);
         Set<Long> enqueuedIds = filter.getEnqueuedIds();
 
         assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
@@ -333,7 +342,7 @@ public abstract class MessageStoreTestCa
         txn.commitTran();
 
         QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
-        getStore().visitMessageInstances(filter);
+        _storeReader.visitMessageInstances(filter);
         Set<Long> enqueuedIds = filter.getEnqueuedIds();
 
         assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
@@ -345,10 +354,10 @@ public abstract class MessageStoreTestCa
     {
         long messageId = 1;
         int contentSize = 0;
-        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false));
+        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false)).allContentAdded();
 
         MessageHandler handler = mock(MessageHandler.class);
-        _store.visitMessages(handler);
+        _storeReader.visitMessages(handler);
 
         verify(handler, times(0)).handle(argThat(new MessageMetaDataMatcher(messageId)));
     }
@@ -357,11 +366,11 @@ public abstract class MessageStoreTestCa
     {
         long messageId = 1;
         int contentSize = 0;
-        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+        final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)).allContentAdded();
         enqueueMessage(message, "dummyQ");
 
         final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>();
-        _store.visitMessages(new MessageHandler()
+        _storeReader.visitMessages(new MessageHandler()
         {
 
             @Override
@@ -379,7 +388,7 @@ public abstract class MessageStoreTestCa
         retrievedMessage.remove();
 
         retrievedMessageRef.set(null);
-        _store.visitMessages(new MessageHandler()
+        _storeReader.visitMessages(new MessageHandler()
         {
 
             @Override
@@ -411,7 +420,7 @@ public abstract class MessageStoreTestCa
         return enqueueableMessage;
     }
 
-    private Record getTestRecord(long messageNumber)
+    private TestRecord getTestRecord(long messageNumber)
     {
         UUID queueId1 = UUIDGenerator.generateRandomUUID();
         TransactionLogResource queue1 = mock(TransactionLogResource.class);
@@ -422,13 +431,13 @@ public abstract class MessageStoreTestCa
         final StoredMessage<?> storedMessage = mock(StoredMessage.class);
         when(storedMessage.getMessageNumber()).thenReturn(messageNumber);
         when(message1.getStoredMessage()).thenReturn(storedMessage);
-        Record enqueueRecord = new TestRecord(queue1, message1);
+        TestRecord enqueueRecord = new TestRecord(queue1, message1);
         return enqueueRecord;
     }
 
     private EnqueueableMessage createEnqueueableMessage(long messageId1)
     {
-        final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0));
+        final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0)).allContentAdded();
         EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1);
         return enqueueableMessage1;
     }
@@ -468,9 +477,10 @@ public abstract class MessageStoreTestCa
         }
 
         @Override
-        public boolean handle(UUID queueId, long messageId)
+        public boolean handle(final MessageEnqueueRecord record)
         {
-            if (queueId.equals(_queueId))
+            long messageId = record.getMessageNumber();
+            if (record.getQueueId().equals(_queueId))
             {
                 if (_enqueuedIds.contains(messageId))
                 {
@@ -487,4 +497,103 @@ public abstract class MessageStoreTestCa
         }
     }
 
+    private class EnqueueRecordMatcher extends ArgumentMatcher<MessageEnqueueRecord>
+    {
+        private final UUID _queueId;
+        private final long _messageId;
+
+        public EnqueueRecordMatcher(final UUID queueId, final long messageId)
+        {
+            _queueId = queueId;
+            _messageId = messageId;
+        }
+
+        @Override
+        public boolean matches(final Object argument)
+        {
+            if(argument instanceof MessageEnqueueRecord)
+            {
+                MessageEnqueueRecord record = (MessageEnqueueRecord)argument;
+                return record.getQueueId().equals(_queueId) && record.getMessageNumber() == _messageId;
+            }
+            return false;
+        }
+    }
+
+
+    private class RecordMatcher extends ArgumentMatcher<Transaction.EnqueueRecord[]>
+    {
+
+        private final EnqueueRecord[] _expect;
+
+        public RecordMatcher(Transaction.EnqueueRecord[] expect)
+        {
+            _expect = expect;
+        }
+
+        @Override
+        public boolean matches(final Object argument)
+        {
+            if(argument.getClass().isArray() && Transaction.EnqueueRecord.class.isAssignableFrom(argument.getClass().getComponentType()))
+            {
+                Transaction.EnqueueRecord[] actual = (Transaction.EnqueueRecord[]) argument;
+                if(actual.length == _expect.length)
+                {
+                    for(int i = 0; i < actual.length; i++)
+                    {
+                        if(!actual[i].getResource().getId().equals(_expect[i].getResource().getId())
+                                || actual[i].getMessage().getMessageNumber() != _expect[i].getMessage().getMessageNumber())
+                        {
+                            return false;
+                        }
+                    }
+                    return true;
+                }
+                else
+                {
+                    return false;
+                }
+
+            }
+            return false;
+        }
+    }
+
+    private class DequeueRecordMatcher extends ArgumentMatcher<Transaction.DequeueRecord[]>
+    {
+
+        private final Transaction.DequeueRecord[] _expect;
+
+        public DequeueRecordMatcher(Transaction.DequeueRecord[] expect)
+        {
+            _expect = expect;
+        }
+
+        @Override
+        public boolean matches(final Object argument)
+        {
+            if(argument.getClass().isArray() && Transaction.DequeueRecord.class.isAssignableFrom(argument.getClass().getComponentType()))
+            {
+                Transaction.DequeueRecord[] actual = (Transaction.DequeueRecord[]) argument;
+                if(actual.length == _expect.length)
+                {
+                    for(int i = 0; i < actual.length; i++)
+                    {
+                        if(!actual[i].getEnqueueRecord().getQueueId().equals(_expect[i].getEnqueueRecord().getQueueId())
+                           || actual[i].getEnqueueRecord().getMessageNumber() != _expect[i].getEnqueueRecord().getMessageNumber())
+                        {
+                            return false;
+                        }
+                    }
+                    return true;
+                }
+                else
+                {
+                    return false;
+                }
+
+            }
+            return false;
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java Fri Apr  3 22:21:05 2015
@@ -35,7 +35,7 @@ public class TestMemoryMessageStore exte
     public int getMessageCount()
     {
         final AtomicInteger counter = new AtomicInteger();
-        visitMessages(new MessageHandler()
+        newMessageStoreReader().visitMessages(new MessageHandler()
                         {
                             @Override
                             public boolean handle(StoredMessage<?> storedMessage)

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java Fri Apr  3 22:21:05 2015
@@ -20,10 +20,12 @@
  */
 package org.apache.qpid.server.store;
 
+import java.util.UUID;
+
 import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.Transaction.EnqueueRecord;
 
-public class TestRecord implements Record
+public class TestRecord implements EnqueueRecord, Transaction.DequeueRecord, MessageEnqueueRecord
 {
     private TransactionLogResource _queue;
     private EnqueueableMessage _message;
@@ -68,11 +70,11 @@ public class TestRecord implements Recor
         {
             return false;
         }
-        if (!(obj instanceof Record))
+        if (!(obj instanceof EnqueueRecord))
         {
             return false;
         }
-        Record other = (Record) obj;
+        EnqueueRecord other = (EnqueueRecord) obj;
         if (_message == null && other.getMessage() != null)
         {
             return false;
@@ -88,4 +90,21 @@ public class TestRecord implements Recor
         return _queue.getId().equals(other.getResource().getId());
     }
 
-}
\ No newline at end of file
+    @Override
+    public MessageEnqueueRecord getEnqueueRecord()
+    {
+        return this;
+    }
+
+    @Override
+    public UUID getQueueId()
+    {
+        return _queue.getId();
+    }
+
+    @Override
+    public long getMessageNumber()
+    {
+        return _message.getMessageNumber();
+    }
+}

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java Fri Apr  3 22:21:05 2015
@@ -26,6 +26,7 @@ import java.util.Collections;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.util.FutureResult;
 import org.apache.qpid.server.store.Transaction;
@@ -42,7 +43,7 @@ public class AsyncAutoCommitTransactionT
     private BaseQueue _queue = mock(BaseQueue.class);
     private MessageStore _messageStore = mock(MessageStore.class);
     private Transaction _storeTransaction = mock(Transaction.class);
-    private Action _postTransactionAction = mock(Action.class);
+    private ServerTransaction.EnqueueAction _postTransactionAction = mock(ServerTransaction.EnqueueAction.class);
     private FutureResult _future = mock(FutureResult.class);
 
 
@@ -70,7 +71,7 @@ public class AsyncAutoCommitTransactionT
         asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
 
         verify(_storeTransaction).enqueueMessage(_queue, _message);
-        verify(_futureRecorder).recordFuture(_future, _postTransactionAction);
+        verify(_futureRecorder).recordFuture(eq(_future), any(Action.class));
         verifyZeroInteractions(_postTransactionAction);
     }
 
@@ -87,7 +88,7 @@ public class AsyncAutoCommitTransactionT
         asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction);
 
         verify(_storeTransaction).enqueueMessage(_queue, _message);
-        verify(_futureRecorder).recordFuture(_future, _postTransactionAction);
+        verify(_futureRecorder).recordFuture(eq(_future), any(Action.class));
         verifyZeroInteractions(_postTransactionAction);
     }
 
@@ -104,7 +105,7 @@ public class AsyncAutoCommitTransactionT
         asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
 
         verify(_storeTransaction).enqueueMessage(_queue, _message);
-        verify(_futureRecorder).recordFuture(_future, _postTransactionAction);
+        verify(_futureRecorder).recordFuture(eq(_future), any(Action.class));
         verifyZeroInteractions(_postTransactionAction);
     }
 
@@ -120,7 +121,7 @@ public class AsyncAutoCommitTransactionT
         asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
 
         verifyZeroInteractions(_storeTransaction);
-        verify(_postTransactionAction).postCommit();
+        verify(_postTransactionAction).postCommit((MessageEnqueueRecord)null);
         verifyZeroInteractions(_futureRecorder);
     }
 
@@ -136,7 +137,7 @@ public class AsyncAutoCommitTransactionT
         asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
 
         verifyZeroInteractions(_storeTransaction);
-        verify(_futureRecorder).recordFuture(FutureResult.IMMEDIATE_FUTURE, _postTransactionAction);
+        verify(_futureRecorder).recordFuture(eq(FutureResult.IMMEDIATE_FUTURE), any(Action.class));
         verifyZeroInteractions(_postTransactionAction);
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Fri Apr  3 22:21:05 2015
@@ -32,6 +32,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.MockMessageInstance;
 import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
@@ -225,7 +226,7 @@ public class AutoCommitTransactionTest e
         _message = createTestMessage(false);
         _queue = createTestAMQQueue(false);
         
-        _transaction.dequeue(_queue, _message, _action);
+        _transaction.dequeue((MessageEnqueueRecord)null, _action);
 
         assertEquals("Dequeue of non-persistent message must not cause message to be dequeued", 0, _storeTransaction.getNumberOfDequeuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -234,23 +235,6 @@ public class AutoCommitTransactionTest e
         
     }
 
-    /**
-     * Tests the dequeue of a persistent message from a single non durable queue.
-     * Asserts that a store transaction has not been started and post commit
-     * action fired.
-     */
-    public void testDequeueFromDurableQueueOfPersistentMessage() throws Exception
-    {
-        _message = createTestMessage(true);
-        _queue = createTestAMQQueue(true);
-        
-        _transaction.dequeue(_queue, _message, _action);
-
-        assertEquals("Dequeue of persistent message to durable queue must cause message to be dequeued",1, _storeTransaction.getNumberOfDequeuedMessages());
-        assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState());
-        assertFalse("Rollback action must not be fired", _action.isRollbackActionFired());
-        assertTrue("Post commit action must be fired", _action.isPostCommitActionFired());
-    }
 
     /**
      * Tests the case where the store operation throws an exception.
@@ -268,7 +252,7 @@ public class AutoCommitTransactionTest e
         
         try
         {
-            _transaction.dequeue(_queue, _message, _action);
+            _transaction.dequeue(mock(MessageEnqueueRecord.class), _action);
             fail("Exception not thrown");
         }
         catch (RuntimeException re)
@@ -387,7 +371,7 @@ public class AutoCommitTransactionTest e
         {
             final BaseQueue queue = createTestAMQQueue(queueDurableFlags[i]);
             final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
-            
+            final boolean hasRecord = queueDurableFlags[i] && messagePersistentFlags[i];
             queueEntries.add(new MockMessageInstance()
             {
 
@@ -402,7 +386,19 @@ public class AutoCommitTransactionTest e
                 {
                     return queue;
                 }
-                
+
+                @Override
+                public MessageEnqueueRecord getEnqueueRecord()
+                {
+                    if(hasRecord)
+                    {
+                        return mock(MessageEnqueueRecord.class);
+                    }
+                    else
+                    {
+                        return null;
+                    }
+                }
             });
         }
         

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Fri Apr  3 22:21:05 2015
@@ -32,6 +32,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.MockMessageInstance;
 import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
@@ -222,7 +223,7 @@ public class LocalTransactionTest extend
         _message = createTestMessage(false);
         _queue = createQueue(false);
 
-        _transaction.dequeue(_queue, _message, _action1);
+        _transaction.dequeue((MessageEnqueueRecord)null, _action1);
 
         assertEquals("Dequeue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -239,7 +240,7 @@ public class LocalTransactionTest extend
         _message = createTestMessage(true);
         _queue = createQueue(true);
         
-        _transaction.dequeue(_queue, _message, _action1);
+        _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1);
 
         assertEquals("Dequeue of non-persistent message must cause message to be dequeued", 1, _storeTransaction.getNumberOfDequeuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState());
@@ -261,7 +262,7 @@ public class LocalTransactionTest extend
         
         try
         {
-            _transaction.dequeue(_queue, _message, _action1);
+            _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1);
             fail("Exception not thrown");
         }
         catch (RuntimeException re)
@@ -404,7 +405,7 @@ public class LocalTransactionTest extend
         assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
         assertFalse("Post commit action must not be fired yet", _action1.isPostCommitActionFired());
         
-        _transaction.dequeue(_queue, _message, _action1);
+        _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1);
         assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState());
         assertFalse("Post commit action must not be fired yet", _action1.isPostCommitActionFired());
         
@@ -428,7 +429,7 @@ public class LocalTransactionTest extend
         assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
         assertFalse("Rollback action must not be fired yet", _action1.isRollbackActionFired());
 
-        _transaction.dequeue(_queue, _message, _action1);
+        _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1);
         
         assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState());
         assertFalse("Rollback action must not be fired yet", _action1.isRollbackActionFired());
@@ -451,7 +452,7 @@ public class LocalTransactionTest extend
         _queue = createQueue(true);
         
         _transaction.addPostTransactionAction(_action1);
-        _transaction.dequeue(_queue, _message, _action2);
+        _transaction.dequeue(mock(MessageEnqueueRecord.class), _action2);
         _transaction.commit();
         
         assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState());
@@ -473,7 +474,7 @@ public class LocalTransactionTest extend
         _queue = createQueue(true);
         
         _transaction.addPostTransactionAction(_action1);
-        _transaction.dequeue(_queue, _message, _action2);
+        _transaction.dequeue(mock(MessageEnqueueRecord.class), _action2);
         _transaction.rollback();
         
         assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState());
@@ -532,7 +533,7 @@ public class LocalTransactionTest extend
         _queue = createQueue(true);
 
         long startTime = System.currentTimeMillis();
-        _transaction.dequeue(_queue, _message, _action1);
+        _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1);
 
         assertTrue("Transaction start time should have been recorded", _transaction.getTransactionStartTime() >= startTime);
         assertEquals("Transaction update time should be the same as transaction start time", _transaction.getTransactionStartTime(), _transaction.getTransactionUpdateTime());
@@ -552,7 +553,7 @@ public class LocalTransactionTest extend
         final long transactionUpdateTimeAfterFirstEnqueue = _transaction.getTransactionUpdateTime();
 
         Thread.sleep(1);
-        _transaction.dequeue(_queue, _message, _action2);
+        _transaction.dequeue(mock(MessageEnqueueRecord.class), _action2);
 
         final long transactionStartTimeAfterFirstDequeue = _transaction.getTransactionStartTime();
         final long transactionUpdateTimeAfterFirstDequeue = _transaction.getTransactionUpdateTime();
@@ -611,7 +612,7 @@ public class LocalTransactionTest extend
         {
             final TransactionLogResource queue = createQueue(queueDurableFlags[i]);
             final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
-            
+            final boolean hasRecord = queueDurableFlags[i] && messagePersistentFlags[i];
             queueEntries.add(new MockMessageInstance()
             {
 
@@ -626,7 +627,12 @@ public class LocalTransactionTest extend
                 {
                     return queue;
                 }
-                
+
+                @Override
+                public MessageEnqueueRecord getEnqueueRecord()
+                {
+                    return hasRecord ? mock(MessageEnqueueRecord.class) : null;
+                }
             });
         }
         

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java Fri Apr  3 22:21:05 2015
@@ -20,18 +20,24 @@
  */
 package org.apache.qpid.server.txn;
 
-import org.apache.qpid.server.txn.ServerTransaction.Action;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 
 /** 
  * Mock implementation of a ServerTransaction Action
  * allowing its state to be observed.
  * 
  */
-class MockAction implements Action
+class MockAction implements ServerTransaction.EnqueueAction, ServerTransaction.Action
 {
     private boolean _rollbackFired = false;
     private boolean _postCommitFired = false;
 
+    public void postCommit(MessageEnqueueRecord... records)
+    {
+        _postCommitFired = true;
+    }
+
+    @Override
     public void postCommit()
     {
         _postCommitFired = true;
@@ -51,4 +57,4 @@ class MockAction implements Action
     {
         return _postCommitFired;
     }
-}
\ No newline at end of file
+}

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java Fri Apr  3 22:21:05 2015
@@ -20,8 +20,11 @@
  */
 package org.apache.qpid.server.txn;
 
+import java.util.UUID;
+
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.NullMessageStore;
 import org.apache.qpid.server.util.FutureResult;
@@ -60,7 +63,7 @@ class MockStoreTransaction implements Tr
         return _state;
     }
 
-    public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
+    public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
     {
         if (_throwExceptionOnQueueOp)
         {
@@ -69,6 +72,7 @@ class MockStoreTransaction implements Tr
         }
 
         _numberOfEnqueuedMessages++;
+        return new MockEnqueueRecord(queue.getId(), message.getMessageNumber());
     }
 
     public int getNumberOfDequeuedMessages()
@@ -81,7 +85,8 @@ class MockStoreTransaction implements Tr
         return _numberOfEnqueuedMessages;
     }
 
-    public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
+    @Override
+    public void dequeueMessage(final MessageEnqueueRecord enqueueRecord)
     {
         if (_throwExceptionOnQueueOp)
         {
@@ -110,8 +115,19 @@ class MockStoreTransaction implements Tr
     {
     }
 
-    public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+    @Override
+    public void removeXid(final StoredXidRecord record)
+    {
+
+    }
+
+    public StoredXidRecord recordXid(long format,
+                                     byte[] globalId,
+                                     byte[] branchId,
+                                     EnqueueRecord[] enqueues,
+                                     DequeueRecord[] dequeues)
     {
+        return null;
     }
 
     public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction)
@@ -126,4 +142,27 @@ class MockStoreTransaction implements Tr
             }
        };
     }
+
+    private static class MockEnqueueRecord implements MessageEnqueueRecord
+    {
+        private final UUID _queueId;
+        private final long _messageNumber;
+
+        public MockEnqueueRecord(final UUID queueId,
+                                 final long messageNumber)
+        {
+            _queueId = queueId;
+            _messageNumber = messageNumber;
+        }
+
+        public UUID getQueueId()
+        {
+            return _queueId;
+        }
+
+        public long getMessageNumber()
+        {
+            return _messageNumber;
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java Fri Apr  3 22:21:05 2015
@@ -161,6 +161,8 @@ public class AbstractVirtualHostTest ext
     {
         Map<String,Object> attributes = Collections.<String, Object>singletonMap(AbstractVirtualHost.NAME, getTestName());
         final MessageStore store = mock(MessageStore.class);
+        when(store.newMessageStoreReader()).thenReturn(mock(MessageStore.MessageStoreReader.class));
+
         AbstractVirtualHost host = new AbstractVirtualHost(attributes, _node)
         {
             @Override
@@ -200,7 +202,8 @@ public class AbstractVirtualHostTest ext
 
     public void testActivateInErrorStateAfterOpen() throws Exception
     {
-        Map<String,Object> attributes = Collections.<String, Object>singletonMap(AbstractVirtualHost.NAME, getTestName());
+        Map<String,Object> attributes = Collections.<String, Object>singletonMap(AbstractVirtualHost.NAME,
+                                                                                 getTestName());
         final MessageStore store = mock(MessageStore.class);
         doThrow(new RuntimeException("Cannot open store")).when(store).openMessageStore(any(ConfiguredObject.class));
         AbstractVirtualHost host = new AbstractVirtualHost(attributes, _node)
@@ -216,6 +219,7 @@ public class AbstractVirtualHostTest ext
         assertEquals("Unexpected state", State.ERRORED, host.getState());
 
         doNothing().when(store).openMessageStore(any(ConfiguredObject.class));
+        when(store.newMessageStoreReader()).thenReturn(mock(MessageStore.MessageStoreReader.class));
 
         host.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE));
         assertEquals("Unexpected state", State.ACTIVE, host.getState());
@@ -239,6 +243,7 @@ public class AbstractVirtualHostTest ext
         assertEquals("Unexpected state", State.ERRORED, host.getState());
 
         doNothing().when(store).openMessageStore(any(ConfiguredObject.class));
+        when(store.newMessageStoreReader()).thenReturn(mock(MessageStore.MessageStoreReader.class));
 
         host.start();
         assertEquals("Unexpected state", State.ACTIVE, host.getState());

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java Fri Apr  3 22:21:05 2015
@@ -24,7 +24,6 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
-import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -43,6 +42,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.NullMessageStore;
 import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -50,7 +50,7 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TestMessageMetaData;
 import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.Transaction.EnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
 import org.apache.qpid.server.store.handler.MessageHandler;
@@ -93,7 +93,7 @@ public class SynchronousMessageStoreReco
             @Override
             public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
             {
-                handler.handle(queue.getId(), messageId);
+                handler.handle(new TestMessageEnqueueRecord(queue.getId(), messageId));
             }
         };
 
@@ -104,7 +104,7 @@ public class SynchronousMessageStoreReco
         recoverer.recover(_virtualHost);
 
         ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage);
-        verify(queue, times(1)).recover(eq(message));
+        verify(queue, times(1)).recover(eq(message), any(MessageEnqueueRecord.class));
     }
 
     @SuppressWarnings("unchecked")
@@ -126,7 +126,7 @@ public class SynchronousMessageStoreReco
             @Override
             public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
             {
-                handler.handle(queue.getId(), messageId);
+                handler.handle(new TestMessageEnqueueRecord(queue.getId(), messageId));
             }
 
             @Override
@@ -142,8 +142,8 @@ public class SynchronousMessageStoreReco
                 recoverer = new SynchronousMessageStoreRecoverer();
         recoverer.recover(_virtualHost);
 
-        verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class));
-        verify(transaction).dequeueMessage(same(queue), argThat(new MessageNumberMatcher(messageId)));
+        verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class), any(MessageEnqueueRecord.class));
+        verify(transaction).dequeueMessage(argThat(new MessageEnqueueRecordMatcher(queue.getId(), messageId)));
         verify(transaction, times(1)).commitTranAsync();
     }
 
@@ -165,7 +165,7 @@ public class SynchronousMessageStoreReco
             @Override
             public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
             {
-                handler.handle(queueId, messageId);
+                handler.handle(new TestMessageEnqueueRecord(queueId, messageId));
             }
 
             @Override
@@ -181,7 +181,7 @@ public class SynchronousMessageStoreReco
                 recoverer = new SynchronousMessageStoreRecoverer();
         recoverer.recover(_virtualHost);
 
-        verify(transaction).dequeueMessage(argThat(new QueueIdMatcher(queueId)), argThat(new MessageNumberMatcher(messageId)));
+        verify(transaction).dequeueMessage(argThat(new MessageEnqueueRecordMatcher(queueId,messageId)));
         verify(transaction, times(1)).commitTranAsync();
     }
 
@@ -226,13 +226,13 @@ public class SynchronousMessageStoreReco
         long messageId = storedMessage.getMessageNumber();
 
         EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage);
-        Record enqueueRecord = createMockRecord(queue, enqueueableMessage);
+        EnqueueRecord enqueueRecord = createMockRecord(queue, enqueueableMessage);
 
         final long format = 1;
         final byte[] globalId = new byte[] {0};
         final byte[] branchId = new byte[] {0};
-        final Record[] enqueues = { enqueueRecord };
-        final Record[] dequeues = {};
+        final EnqueueRecord[] enqueues = { enqueueRecord };
+        final Transaction.DequeueRecord[] dequeues = {};
 
         MessageStore store = new NullMessageStore()
         {
@@ -251,7 +251,26 @@ public class SynchronousMessageStoreReco
             @Override
             public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
             {
-                handler.handle(format, globalId, branchId, enqueues, dequeues);
+                handler.handle(new Transaction.StoredXidRecord()
+                {
+                    @Override
+                    public long getFormat()
+                    {
+                        return format;
+                    }
+
+                    @Override
+                    public byte[] getGlobalId()
+                    {
+                        return globalId;
+                    }
+
+                    @Override
+                    public byte[] getBranchId()
+                    {
+                        return branchId;
+                    }
+                }, enqueues, dequeues);
             }
 
             @Override
@@ -275,22 +294,21 @@ public class SynchronousMessageStoreReco
         branch.commit();
 
         ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage);
-        verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull());
+        verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull(), any(MessageEnqueueRecord.class));
         verify(transaction).commitTran();
     }
 
     public void testRecoveryOfSingleDequeueWithDistributedTransaction()
     {
-        final AMQQueue<?> queue = createRegisteredMockQueue();
+        final UUID queueId = UUID.randomUUID();
+        final AMQQueue<?> queue = createRegisteredMockQueue(queueId);
 
 
         final Transaction transaction = mock(Transaction.class);
-
         final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(1);
         final long messageId = storedMessage.getMessageNumber();
 
-        EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage);
-        Record dequeueRecord = createMockRecord(queue, enqueueableMessage);
+        Transaction.DequeueRecord dequeueRecord = createMockDequeueRecord(queueId, messageId);
 
         QueueEntry queueEntry = mock(QueueEntry.class);
         when(queue.getMessageOnTheQueue(messageId)).thenReturn(queueEntry);
@@ -298,8 +316,8 @@ public class SynchronousMessageStoreReco
         final long format = 1;
         final byte[] globalId = new byte[] {0};
         final byte[] branchId = new byte[] {0};
-        final Record[] enqueues = {};
-        final Record[] dequeues = { dequeueRecord };
+        final EnqueueRecord[] enqueues = {};
+        final Transaction.DequeueRecord[] dequeues = { dequeueRecord };
 
         MessageStore store = new NullMessageStore()
         {
@@ -314,13 +332,32 @@ public class SynchronousMessageStoreReco
             {
                 // We need the message to be enqueued onto the queue so that later the distributed transaction
                 // can dequeue it.
-                handler.handle(queue.getId(), messageId);
+                handler.handle(new TestMessageEnqueueRecord(queue.getId(), messageId));
             }
 
             @Override
             public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
             {
-                handler.handle(format, globalId, branchId, enqueues, dequeues);
+                handler.handle(new Transaction.StoredXidRecord()
+                {
+                    @Override
+                    public long getFormat()
+                    {
+                        return format;
+                    }
+
+                    @Override
+                    public byte[] getGlobalId()
+                    {
+                        return globalId;
+                    }
+
+                    @Override
+                    public byte[] getBranchId()
+                    {
+                        return branchId;
+                    }
+                }, enqueues, dequeues);
             }
 
             @Override
@@ -348,14 +385,25 @@ public class SynchronousMessageStoreReco
     }
 
 
-    protected Record createMockRecord(AMQQueue<?> queue, EnqueueableMessage enqueueableMessage)
+    protected EnqueueRecord createMockRecord(AMQQueue<?> queue, EnqueueableMessage enqueueableMessage)
     {
-        Record enqueueRecord = mock(Record.class);
+        EnqueueRecord enqueueRecord = mock(EnqueueRecord.class);
         when(enqueueRecord.getMessage()).thenReturn(enqueueableMessage);
         when(enqueueRecord.getResource()).thenReturn(queue);
         return enqueueRecord;
     }
 
+
+    protected Transaction.DequeueRecord createMockDequeueRecord(UUID queueId, long messageNumber)
+    {
+        Transaction.DequeueRecord dequeueRecord = mock(Transaction.DequeueRecord.class);
+        MessageEnqueueRecord enqueueRecord = mock(MessageEnqueueRecord.class);
+        when(enqueueRecord.getMessageNumber()).thenReturn(messageNumber);
+        when(enqueueRecord.getQueueId()).thenReturn(queueId);
+        when(dequeueRecord.getEnqueueRecord()).thenReturn(enqueueRecord);
+        return dequeueRecord;
+    }
+
     protected EnqueueableMessage createMockEnqueueableMessage(long messageId,
             final StoredMessage<StorableMessageMetaData> storedMessage)
     {
@@ -378,8 +426,12 @@ public class SynchronousMessageStoreReco
 
     private AMQQueue<?> createRegisteredMockQueue()
     {
+        return createRegisteredMockQueue(UUID.randomUUID());
+    }
+
+    private AMQQueue<?> createRegisteredMockQueue(UUID queueId)
+    {
         AMQQueue<?> queue = mock(AMQQueue.class);
-        final UUID queueId = UUID.randomUUID();
         when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
         when(queue.getId()).thenReturn(queueId);
         when(queue.getName()).thenReturn("test-queue");
@@ -404,19 +456,47 @@ public class SynchronousMessageStoreReco
         }
     }
 
-    private final class MessageNumberMatcher extends ArgumentMatcher<EnqueueableMessage>
+    private final class MessageEnqueueRecordMatcher extends ArgumentMatcher<MessageEnqueueRecord>
     {
         private final long _messageId;
+        private final UUID _queueId;
 
-        private MessageNumberMatcher(long messageId)
+        private MessageEnqueueRecordMatcher(UUID queueId, long messageId)
         {
             _messageId = messageId;
+            _queueId = queueId;
         }
 
         @Override
         public boolean matches(Object argument)
         {
-            return argument instanceof EnqueueableMessage && ((EnqueueableMessage)argument).getMessageNumber() == _messageId;
+            return argument instanceof MessageEnqueueRecord
+                    && ((MessageEnqueueRecord)argument).getMessageNumber() == _messageId
+                    && ((MessageEnqueueRecord)argument).getQueueId().equals(_queueId);
+        }
+    }
+
+    private class TestMessageEnqueueRecord implements MessageEnqueueRecord
+    {
+        private final UUID _queueId;
+        private final long _messageId;
+
+        public TestMessageEnqueueRecord(final UUID queueId, final long messageId)
+        {
+            _queueId = queueId;
+            _messageId = messageId;
+        }
+
+        @Override
+        public UUID getQueueId()
+        {
+            return _queueId;
+        }
+
+        @Override
+        public long getMessageNumber()
+        {
+            return _messageId;
         }
     }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Fri Apr  3 22:21:05 2015
@@ -377,7 +377,7 @@ public class ConsumerTarget_0_10 extends
     private void forceDequeue(final MessageInstance entry, final boolean restoreCredit)
     {
         AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore());
-        dequeueTxn.dequeue(entry.getOwningResource(), entry.getMessage(),
+        dequeueTxn.dequeue(entry.getEnqueueRecord(),
                            new ServerTransaction.Action()
                            {
                                public void postCommit()

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Fri Apr  3 22:21:05 2015
@@ -76,12 +76,6 @@ public class MessageConverter_Internal_t
                     }
 
                     @Override
-                    public void addContent(int offsetInMessage, ByteBuffer src)
-                    {
-                        throw new UnsupportedOperationException();
-                    }
-
-                    @Override
                     public int getContent(int offsetInMessage, ByteBuffer dst)
                     {
                         int size = messageContent.length - offsetInMessage;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Fri Apr  3 22:21:05 2015
@@ -83,12 +83,6 @@ public class MessageConverter_v0_10 impl
                     }
 
                     @Override
-                    public void addContent(int offsetInMessage, ByteBuffer src)
-                    {
-                        throw new UnsupportedOperationException();
-                    }
-
-                    @Override
                     public int getContent(int offsetInMessage, ByteBuffer dst)
                     {
                         return serverMsg.getContent(dst, offsetInMessage);

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Apr  3 22:21:05 2015
@@ -514,7 +514,7 @@ public class ServerSession extends Sessi
 
     public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
     {
-        _transaction.dequeue(entry.getOwningResource(), entry.getMessage(),
+        _transaction.dequeue(entry.getEnqueueRecord(),
                              new ServerTransaction.Action()
                              {
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Apr  3 22:21:05 2015
@@ -40,6 +40,7 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.filter.ArrivalTimeFilter;
@@ -493,13 +494,14 @@ public class ServerSessionDelegate exten
     private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
                                                                    final MessageMetaData_0_10 messageMetaData, final MessageStore store)
     {
-        final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
+        final MessageHandle<MessageMetaData_0_10> addedMessage = store.addMessage(messageMetaData);
         ByteBuffer body = xfr.getBody();
         if(body != null)
         {
-            storeMessage.addContent(0, body);
+            addedMessage.addContent(body);
         }
-        return storeMessage;
+        final StoredMessage<MessageMetaData_0_10> storedMessage = addedMessage.allContentAdded();
+        return storedMessage;
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Apr  3 22:21:05 2015
@@ -96,6 +96,7 @@ import org.apache.qpid.server.protocol.C
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -429,22 +430,24 @@ public class AMQChannel
                                             contentHeader,
                                             getConnection().getLastReceivedTime());
 
-                final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
-                final AMQMessage amqMessage = createAMQMessage(handle);
-                MessageReference reference = amqMessage.newReference();
-                try
+                final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
+                int bodyCount = _currentMessage.getBodyCount();
+                if(bodyCount > 0)
                 {
-                    int bodyCount = _currentMessage.getBodyCount();
-                    if(bodyCount > 0)
+                    long bodyLengthReceived = 0;
+                    for(int i = 0 ; i < bodyCount ; i++)
                     {
-                        long bodyLengthReceived = 0;
-                        for(int i = 0 ; i < bodyCount ; i++)
-                        {
-                            ContentBody contentChunk = _currentMessage.getContentChunk(i);
-                            handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getPayload()));
-                            bodyLengthReceived += contentChunk.getSize();
-                        }
+                        ContentBody contentChunk = _currentMessage.getContentChunk(i);
+                        handle.addContent(ByteBuffer.wrap(contentChunk.getPayload()));
+                        bodyLengthReceived += contentChunk.getSize();
                     }
+                }
+                final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded();
+
+                final AMQMessage amqMessage = createAMQMessage(storedMessage);
+                MessageReference reference = amqMessage.newReference();
+                try
+                {
 
                     _currentMessage = null;
 
@@ -500,7 +503,7 @@ public class AMQChannel
                                         .createBasicAckBody(_confirmedMessageCounter, false);
                                 _connection.writeFrame(responseBody.generateFrame(_channelId));
                             }
-                            incrementUncommittedMessageSize(handle);
+                            incrementUncommittedMessageSize(storedMessage);
                             incrementOutstandingTxnsIfNecessary();
                         }
                     }
@@ -1512,7 +1515,7 @@ public class AMQChannel
                 try
                 {
                     entry.delete();
-                    txn.dequeue(queue, message,
+                    txn.dequeue(entry.getEnqueueRecord(),
                                 new ServerTransaction.Action()
                                 {
                                     @Override

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Apr  3 22:21:05 2015
@@ -189,7 +189,7 @@ public abstract class ConsumerTarget_0_8
 
             // The send may of course still fail, in which case, as
             // the message is unacked, it will be lost.
-            _txn.dequeue(entry.getOwningResource(), entry.getMessage(), NOOP);
+            _txn.dequeue(entry.getEnqueueRecord(), NOOP);
 
             ServerMessage message = entry.getMessage();
             MessageReference ref = message.newReference();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Fri Apr  3 22:21:05 2015
@@ -88,12 +88,6 @@ public class MessageConverter_Internal_t
             }
 
             @Override
-            public void addContent(int offsetInMessage, ByteBuffer src)
-            {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
             public int getContent(int offsetInMessage, ByteBuffer dst)
             {
                 int size = messageContent.length - offsetInMessage;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Fri Apr  3 22:21:05 2015
@@ -33,6 +33,7 @@ import org.apache.qpid.framing.MessagePu
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -105,22 +106,21 @@ public class AckTest extends QpidTestCas
 
             final MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis());
 
-            final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd);
+            final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd).allContentAdded();
 
             final StoredMessage storedMessage = result;
             final AMQMessage message = new AMQMessage(storedMessage);
             ServerTransaction txn = new AutoCommitTransaction(_messageStore);
             txn.enqueue(_queue, message,
-                        new ServerTransaction.Action()
+                        new ServerTransaction.EnqueueAction()
                         {
-                            public void postCommit()
+                            public void postCommit(MessageEnqueueRecord... records)
                             {
-                                _queue.enqueue(message,null);
+                                _queue.enqueue(message,null, null);
                             }
 
                             public void onRollback()
                             {
-                                //To change body of implemented methods use File | Settings | File Templates.
                             }
                         });
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Fri Apr  3 22:21:05 2015
@@ -178,7 +178,7 @@ public class AcknowledgeTest extends Qpi
     private void checkStoreContents(int messageCount)
     {
         MessageCounter counter = new MessageCounter();
-        _messageStore.visitMessages(counter);
+        _messageStore.newMessageStoreReader().visitMessages(counter);
         assertEquals("Message header count incorrect in the MetaDataMap", messageCount, counter.getCount());
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java Fri Apr  3 22:21:05 2015
@@ -26,9 +26,10 @@ import org.apache.qpid.framing.BasicCont
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MessagePublishInfo;
+import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.StoredMessage;
 
-public class MockStoredMessage implements StoredMessage<MessageMetaData>
+public class MockStoredMessage implements StoredMessage<MessageMetaData>, MessageHandle<MessageMetaData>
 {
     private long _messageId;
     private MessageMetaData _metaData;
@@ -72,12 +73,17 @@ public class MockStoredMessage implement
         return _messageId;
     }
 
-    public void addContent(int offsetInMessage, ByteBuffer src)
+    public void addContent(ByteBuffer src)
     {
         src = src.duplicate();
-        ByteBuffer dst = _content.duplicate();
-        dst.position(offsetInMessage);
-        dst.put(src);
+        _content.put(src);
+    }
+
+    @Override
+    public StoredMessage<MessageMetaData> allContentAdded()
+    {
+        _content.flip();
+        return this;
     }
 
     public int getContent(int offset, ByteBuffer dst)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Fri Apr  3 22:21:05 2015
@@ -135,7 +135,7 @@ public class QueueBrowserUsesNoAckTest e
     private void checkStoreContents(int messageCount)
     {
         MessageCounter counter = new MessageCounter();
-        _messageStore.visitMessages(counter);
+        _messageStore.newMessageStoreReader().visitMessages(counter);
 
         assertEquals("Message header count incorrect in the MetaDataMap", messageCount, counter.getCount());
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java Fri Apr  3 22:21:05 2015
@@ -60,7 +60,7 @@ public class ReferenceCountingTest exten
 
         final MessageMetaData mmd = new MessageMetaData(info, chb);
 
-        StoredMessage storedMessage = _store.addMessage(mmd);
+        StoredMessage storedMessage = _store.addMessage(mmd).allContentAdded();
         Transaction txn = _store.newTransaction();
         txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
         txn.commitTran();
@@ -78,7 +78,7 @@ public class ReferenceCountingTest exten
     private int getStoreMessageCount()
     {
         MessageCounter counter = new MessageCounter();
-        _store.visitMessages(counter);
+        _store.newMessageStoreReader().visitMessages(counter);
         return counter.getCount();
     }
 
@@ -99,7 +99,7 @@ public class ReferenceCountingTest exten
 
         final MessageMetaData mmd = new MessageMetaData(info, chb);
 
-        StoredMessage storedMessage = _store.addMessage(mmd);
+        StoredMessage storedMessage = _store.addMessage(mmd).allContentAdded();
         Transaction txn = _store.newTransaction();
         txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
         txn.commitTran();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Apr  3 22:21:05 2015
@@ -390,7 +390,7 @@ class ConsumerTarget_1_0 extends Abstrac
             if(outcome instanceof Accepted)
             {
                 _queueEntry.lockAcquisition();
-                txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(),
+                txn.dequeue(_queueEntry.getEnqueueRecord(),
                         new ServerTransaction.Action()
                         {
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Fri Apr  3 22:21:05 2015
@@ -235,12 +235,6 @@ public abstract class MessageConverter_t
                         }
 
                         @Override
-                        public void addContent(int offsetInMessage, ByteBuffer src)
-                        {
-                            throw new UnsupportedOperationException();
-                        }
-
-                        @Override
                         public int getContent(int offsetInMessage, ByteBuffer dst)
                         {
                             ByteBuffer buf = allData.duplicate();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Fri Apr  3 22:21:05 2015
@@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.type.Out
 import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.txn.ServerTransaction;
 
 public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination
@@ -45,16 +46,16 @@ public class QueueDestination extends Me
     public Outcome send(final Message_1_0 message, ServerTransaction txn)
     {
 
-        txn.enqueue(getQueue(),message, new ServerTransaction.Action()
+        txn.enqueue(getQueue(),message, new ServerTransaction.EnqueueAction()
         {
             MessageReference _reference = message.newReference();
 
 
-            public void postCommit()
+            public void postCommit(MessageEnqueueRecord... records)
             {
                 try
                 {
-                    getQueue().enqueue(message, null);
+                    getQueue().enqueue(message, null, records[0]);
                 }
                 finally
                 {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Fri Apr  3 22:21:05 2015
@@ -42,6 +42,7 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -150,17 +151,17 @@ public class ReceivingLink_1_0 implement
                     _sectionDecoder,
                     immutableSections);
 
-            StoredMessage<MessageMetaData_1_0> storedMessage = _vhost.getMessageStore().addMessage(mmd);
+            MessageHandle<MessageMetaData_1_0> handle = _vhost.getMessageStore().addMessage(mmd);
 
             boolean skipping = true;
             int offset = 0;
 
             for(ByteBuffer bareMessageBuf : immutableSections)
             {
-                storedMessage.addContent(offset, bareMessageBuf.duplicate());
+                handle.addContent(bareMessageBuf.duplicate());
                 offset += bareMessageBuf.remaining();
             }
-
+            final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
             Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
             MessageReference<Message_1_0> reference = message.newReference();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message