qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1671184 [2/5] - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/...
Date Fri, 03 Apr 2015 22:21:06 GMT
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java Fri Apr  3 22:21:05 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 
 public class StandardQueueEntryList extends OrderedQueueEntryList
 {
@@ -40,9 +41,10 @@ public class StandardQueueEntryList exte
     }
 
 
-    protected StandardQueueEntry createQueueEntry(ServerMessage<?> message)
+    protected StandardQueueEntry createQueueEntry(ServerMessage<?> message,
+                                                  final MessageEnqueueRecord enqueueRecord)
     {
-        return new StandardQueueEntry(this, message);
+        return new StandardQueueEntry(this, message, enqueueRecord);
     }
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Fri Apr  3 22:21:05 2015
@@ -35,14 +35,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -488,11 +485,11 @@ public abstract class AbstractJDBCMessag
     }
 
     @Override
-    public StoredMessage addMessage(StorableMessageMetaData metaData)
+    public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData)
     {
         checkMessageStoreOpen();
 
-        return new StoredJDBCMessage(getNextMessageId(), metaData);
+        return new StoredJDBCMessage<T>(getNextMessageId(), metaData);
 
     }
 
@@ -670,18 +667,18 @@ public abstract class AbstractJDBCMessag
 
     }
 
-    private void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource  queue, Long messageId) throws StoreException
+    private void dequeueMessage(ConnectionWrapper connWrapper, final UUID queueId,
+                                Long messageId) throws StoreException
     {
 
         Connection conn = connWrapper.getConnection();
 
-
         try
         {
             PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
             try
             {
-                stmt.setString(1, queue.getId().toString());
+                stmt.setString(1, queueId.toString());
                 stmt.setLong(2, messageId);
                 int results = stmt.executeUpdate();
 
@@ -689,14 +686,13 @@ public abstract class AbstractJDBCMessag
 
                 if(results != 1)
                 {
-                    throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName()
-                                             + " with id " + queue.getId());
+                    throw new StoreException("Unable to find message with id " + messageId
+                                             + " on queue with id " + queueId);
                 }
 
                 if (getLogger().isDebugEnabled())
                 {
-                    getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName()
-                                      + " with id " + queue.getId());
+                    getLogger().debug("Dequeuing message " + messageId + " on queue with id " + queueId);
                 }
             }
             finally
@@ -708,8 +704,8 @@ public abstract class AbstractJDBCMessag
         catch (SQLException e)
         {
             getLogger().error("Failed to dequeue: " + e.getMessage(), e);
-            throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName()
-                                     + " with id " + queue.getId() + " from database", e);
+            throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue with id "
+                                     + queueId + " from database", e);
         }
 
     }
@@ -766,7 +762,7 @@ public abstract class AbstractJDBCMessag
     }
 
     private List<Runnable> recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
-                                     Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException
+                                     Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues) throws StoreException
     {
         Connection conn = connWrapper.getConnection();
 
@@ -788,7 +784,7 @@ public abstract class AbstractJDBCMessag
             }
 
             List<Runnable> postActions = new ArrayList<>();
-            for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues)
+            for(Transaction.EnqueueRecord enqueue : enqueues)
             {
                 StoredMessage storedMessage = enqueue.getMessage().getStoredMessage();
                 if(storedMessage instanceof StoredJDBCMessage)
@@ -809,7 +805,7 @@ public abstract class AbstractJDBCMessag
                 if(enqueues != null)
                 {
                     stmt.setString(4, "E");
-                    for(Transaction.Record record : enqueues)
+                    for(Transaction.EnqueueRecord record : enqueues)
                     {
                         stmt.setString(5, record.getResource().getId().toString());
                         stmt.setLong(6, record.getMessage().getMessageNumber());
@@ -820,10 +816,10 @@ public abstract class AbstractJDBCMessag
                 if(dequeues != null)
                 {
                     stmt.setString(4, "D");
-                    for(Transaction.Record record : dequeues)
+                    for(Transaction.DequeueRecord record : dequeues)
                     {
-                        stmt.setString(5, record.getResource().getId().toString());
-                        stmt.setLong(6, record.getMessage().getMessageNumber());
+                        stmt.setString(5, record.getEnqueueRecord().getQueueId().toString());
+                        stmt.setLong(6, record.getEnqueueRecord().getMessageNumber());
                         stmt.executeUpdate();
                     }
                 }
@@ -1059,9 +1055,10 @@ public abstract class AbstractJDBCMessag
     }
 
 
-    private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage
+    private static class RecordImpl implements Transaction.EnqueueRecord, Transaction.DequeueRecord, TransactionLogResource, EnqueueableMessage
     {
 
+        private final JDBCEnqueueRecord _record;
         private long _messageNumber;
         private UUID _queueId;
 
@@ -1069,6 +1066,13 @@ public abstract class AbstractJDBCMessag
         {
             _messageNumber = messageNumber;
             _queueId = queueId;
+            _record = new JDBCEnqueueRecord(queueId, messageNumber);
+        }
+
+        @Override
+        public MessageEnqueueRecord getEnqueueRecord()
+        {
+            return _record;
         }
 
         @Override
@@ -1323,7 +1327,7 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
+        public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
         {
             checkMessageStoreOpen();
 
@@ -1349,15 +1353,17 @@ public abstract class AbstractJDBCMessag
                 });
             }
             AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
-
+            return new JDBCEnqueueRecord(queue.getId(), message.getMessageNumber());
         }
 
         @Override
-        public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
+        public void dequeueMessage(final MessageEnqueueRecord enqueueRecord)
         {
             checkMessageStoreOpen();
 
-            AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
+            AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper,
+                                                         enqueueRecord.getQueueId(),
+                                                         enqueueRecord.getMessageNumber());
         }
 
         @Override
@@ -1408,22 +1414,93 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public void removeXid(long format, byte[] globalId, byte[] branchId)
+        public void removeXid(final StoredXidRecord record)
         {
             checkMessageStoreOpen();
 
-            AbstractJDBCMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
+            AbstractJDBCMessageStore.this.removeXid(_connWrapper,
+                                                    record.getFormat(),
+                                                    record.getGlobalId(),
+                                                    record.getBranchId());
         }
 
         @Override
-        public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+        public StoredXidRecord recordXid(final long format,
+                                         final byte[] globalId,
+                                         final byte[] branchId,
+                                         EnqueueRecord[] enqueues,
+                                         DequeueRecord[] dequeues)
         {
             checkMessageStoreOpen();
 
             _postCommitActions.addAll(AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues));
+            return new JDBCStoredXidRecord(format, globalId, branchId);
         }
+
+
     }
 
+    private static class JDBCStoredXidRecord implements Transaction.StoredXidRecord
+    {
+        private final long _format;
+        private final byte[] _globalId;
+        private final byte[] _branchId;
+
+        public JDBCStoredXidRecord(final long format, final byte[] globalId, final byte[] branchId)
+        {
+            _format = format;
+            _globalId = globalId;
+            _branchId = branchId;
+        }
+
+        @Override
+        public long getFormat()
+        {
+            return _format;
+        }
+
+        @Override
+        public byte[] getGlobalId()
+        {
+            return _globalId;
+        }
+
+        @Override
+        public byte[] getBranchId()
+        {
+            return _branchId;
+        }
+
+
+        @Override
+        public boolean equals(final Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            final JDBCStoredXidRecord that = (JDBCStoredXidRecord) o;
+
+            return _format == that._format
+                   && Arrays.equals(_globalId, that._globalId)
+                   && Arrays.equals(_branchId, that._branchId);
+
+        }
+
+        @Override
+        public int hashCode()
+        {
+            int result = (int) (_format ^ (_format >>> 32));
+            result = 31 * result + Arrays.hashCode(_globalId);
+            result = 31 * result + Arrays.hashCode(_branchId);
+            return result;
+        }
+    }
 
     static interface MessageDataRef<T extends StorableMessageMetaData>
     {
@@ -1540,7 +1617,7 @@ public abstract class AbstractJDBCMessag
         }
     }
 
-    private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+    private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
     {
 
         private final long _messageId;
@@ -1597,7 +1674,7 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public void addContent(int offsetInMessage, ByteBuffer src)
+        public void addContent(ByteBuffer src)
         {
             src = src.slice();
             byte[] data = _messageDataRef.getData();
@@ -1622,6 +1699,12 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
+        public StoredMessage<T> allContentAdded()
+        {
+            return this;
+        }
+
+        @Override
         public int getContent(int offsetInMessage, ByteBuffer dst)
         {
             byte[] data = _messageDataRef.getData();
@@ -1809,257 +1892,233 @@ public abstract class AbstractJDBCMessag
     }
 
     @Override
-    public StoredMessage<?> getMessage(long messageId) throws StoreException
+    public MessageStoreReader newMessageStoreReader()
     {
-        checkMessageStoreOpen();
-
-        Connection conn = null;
-        StoredJDBCMessage message;
-        try
-        {
-            conn = newAutoCommitConnection();
-            try (PreparedStatement stmt = conn.prepareStatement(SELECT_ONE_FROM_META_DATA))
-            {
-                stmt.setLong(1, messageId);
-                try (ResultSet rs = stmt.executeQuery())
-                {
-                    if (rs.next())
-                    {
-                        byte[] dataAsBytes = getBlobAsBytes(rs, 2);
-                        ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
-                        buf.position(1);
-                        buf = buf.slice();
-                        MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
-                        StorableMessageMetaData metaData = type.createMetaData(buf);
-                        message = new StoredJDBCMessage(messageId, metaData, true);
-
-                    }
-                    else
-                    {
-                        message = null;
-                    }
-                }
-            }
-            return message;
-        }
-        catch (SQLException e)
-        {
-            throw new StoreException("Error encountered when visiting messages", e);
-        }
-        finally
-        {
-            JdbcUtils.closeConnection(conn, getLogger());
-        }
+        return new JDBCMessageStoreReader();
     }
 
-
-    @Override
-    public void visitMessages(MessageHandler handler) throws StoreException
+    private class JDBCMessageStoreReader implements MessageStoreReader
     {
-        checkMessageStoreOpen();
 
-        Connection conn = null;
-        try
+        @Override
+        public StoredMessage<?> getMessage(long messageId) throws StoreException
         {
-            conn = newAutoCommitConnection();
-            Statement stmt = conn.createStatement();
+            checkMessageStoreOpen();
+
+            Connection conn = null;
+            StoredJDBCMessage message;
             try
             {
-                ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
-                try
+                conn = newAutoCommitConnection();
+                try (PreparedStatement stmt = conn.prepareStatement(SELECT_ONE_FROM_META_DATA))
                 {
-                    while (rs.next())
+                    stmt.setLong(1, messageId);
+                    try (ResultSet rs = stmt.executeQuery())
                     {
-                        long messageId = rs.getLong(1);
-                        byte[] dataAsBytes = getBlobAsBytes(rs, 2);
-                        ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
-                        buf.position(1);
-                        buf = buf.slice();
-                        MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
-                        StorableMessageMetaData metaData = type.createMetaData(buf);
-                        StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
-                        if (!handler.handle(message))
+                        if (rs.next())
                         {
-                            break;
+                            byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+                            ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+                            buf.position(1);
+                            buf = buf.slice();
+                            MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+                            StorableMessageMetaData metaData = type.createMetaData(buf);
+                            message = new StoredJDBCMessage(messageId, metaData, true);
+
+                        }
+                        else
+                        {
+                            message = null;
                         }
                     }
                 }
-                finally
-                {
-                    rs.close();
-                }
+                return message;
+            }
+            catch (SQLException e)
+            {
+                throw new StoreException("Error encountered when visiting messages", e);
             }
             finally
             {
-                stmt.close();
+                JdbcUtils.closeConnection(conn, getLogger());
             }
         }
-        catch (SQLException e)
-        {
-            throw new StoreException("Error encountered when visiting messages", e);
-        }
-        finally
+
+        @Override
+        public void close()
         {
-            JdbcUtils.closeConnection(conn, getLogger());
+
         }
-    }
 
-    @Override
-    public void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException
-    {
-        checkMessageStoreOpen();
 
-        Connection conn = null;
-        try
+        @Override
+        public void visitMessages(MessageHandler handler) throws StoreException
         {
-            conn = newAutoCommitConnection();
-            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_QUEUE_ENTRY_FOR_QUEUE);
+            checkMessageStoreOpen();
+
+            Connection conn = null;
             try
             {
-                stmt.setString(1, queue.getId().toString());
-                ResultSet rs = stmt.executeQuery();
+                conn = newAutoCommitConnection();
+                Statement stmt = conn.createStatement();
                 try
                 {
-                    while(rs.next())
+                    ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
+                    try
                     {
-                        String id = rs.getString(1);
-                        long messageId = rs.getLong(2);
-                        if (!handler.handle(UUID.fromString(id), messageId))
+                        while (rs.next())
                         {
-                            break;
+                            long messageId = rs.getLong(1);
+                            byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+                            ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+                            buf.position(1);
+                            buf = buf.slice();
+                            MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+                            StorableMessageMetaData metaData = type.createMetaData(buf);
+                            StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+                            if (!handler.handle(message))
+                            {
+                                break;
+                            }
                         }
                     }
+                    finally
+                    {
+                        rs.close();
+                    }
                 }
                 finally
                 {
-                    rs.close();
+                    stmt.close();
                 }
             }
+            catch (SQLException e)
+            {
+                throw new StoreException("Error encountered when visiting messages", e);
+            }
             finally
             {
-                stmt.close();
+                JdbcUtils.closeConnection(conn, getLogger());
             }
         }
-        catch(SQLException e)
-        {
-            throw new StoreException("Error encountered when visiting message instances", e);
-        }
-        finally
-        {
-            JdbcUtils.closeConnection(conn, getLogger());
-        }
 
-    }
-
-    @Override
-    public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
-    {
-        checkMessageStoreOpen();
-
-        Connection conn = null;
-        try
+        @Override
+        public void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler)
+                throws StoreException
         {
-            conn = newAutoCommitConnection();
-            Statement stmt = conn.createStatement();
+            checkMessageStoreOpen();
+
+            Connection conn = null;
             try
             {
-                ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+                conn = newAutoCommitConnection();
+                PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_QUEUE_ENTRY_FOR_QUEUE);
                 try
                 {
-                    while(rs.next())
+                    stmt.setString(1, queue.getId().toString());
+                    ResultSet rs = stmt.executeQuery();
+                    try
                     {
-                        String id = rs.getString(1);
-                        long messageId = rs.getLong(2);
-                        if (!handler.handle(UUID.fromString(id), messageId))
+                        while (rs.next())
                         {
-                            break;
+                            String id = rs.getString(1);
+                            long messageId = rs.getLong(2);
+                            if (!handler.handle(new JDBCEnqueueRecord(UUID.fromString(id), messageId)))
+                            {
+                                break;
+                            }
                         }
                     }
+                    finally
+                    {
+                        rs.close();
+                    }
                 }
                 finally
                 {
-                    rs.close();
+                    stmt.close();
                 }
             }
+            catch (SQLException e)
+            {
+                throw new StoreException("Error encountered when visiting message instances", e);
+            }
             finally
             {
-                stmt.close();
+                JdbcUtils.closeConnection(conn, getLogger());
             }
-        }
-        catch(SQLException e)
-        {
-            throw new StoreException("Error encountered when visiting message instances", e);
-        }
-        finally
-        {
-            JdbcUtils.closeConnection(conn, getLogger());
-        }
-    }
 
-    @Override
-    public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
-    {
-        checkMessageStoreOpen();
+        }
 
-        Connection conn = null;
-        try
+        @Override
+        public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
         {
-            conn = newAutoCommitConnection();
-            List<Xid> xids = new ArrayList<Xid>();
+            checkMessageStoreOpen();
 
-            Statement stmt = conn.createStatement();
+            Connection conn = null;
             try
             {
-                ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
+                conn = newAutoCommitConnection();
+                Statement stmt = conn.createStatement();
                 try
                 {
-                    while(rs.next())
+                    ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+                    try
                     {
-
-                        long format = rs.getLong(1);
-                        byte[] globalId = rs.getBytes(2);
-                        byte[] branchId = rs.getBytes(3);
-                        xids.add(new Xid(format, globalId, branchId));
+                        while (rs.next())
+                        {
+                            String id = rs.getString(1);
+                            long messageId = rs.getLong(2);
+                            if (!handler.handle(new JDBCEnqueueRecord(UUID.fromString(id), messageId)))
+                            {
+                                break;
+                            }
+                        }
+                    }
+                    finally
+                    {
+                        rs.close();
                     }
                 }
                 finally
                 {
-                    rs.close();
+                    stmt.close();
                 }
             }
+            catch (SQLException e)
+            {
+                throw new StoreException("Error encountered when visiting message instances", e);
+            }
             finally
             {
-                stmt.close();
+                JdbcUtils.closeConnection(conn, getLogger());
             }
+        }
 
+        @Override
+        public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+        {
+            checkMessageStoreOpen();
 
-
-            for(Xid xid : xids)
+            Connection conn = null;
+            try
             {
-                List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
-                List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
-
-                PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
+                conn = newAutoCommitConnection();
+                List<Xid> xids = new ArrayList<Xid>();
 
+                Statement stmt = conn.createStatement();
                 try
                 {
-                    pstmt.setLong(1, xid.getFormat());
-                    pstmt.setBytes(2, xid.getGlobalId());
-                    pstmt.setBytes(3, xid.getBranchId());
-
-                    ResultSet rs = pstmt.executeQuery();
+                    ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
                     try
                     {
-                        while(rs.next())
+                        while (rs.next())
                         {
 
-                            String actionType = rs.getString(1);
-                            UUID queueId = UUID.fromString(rs.getString(2));
-                            long messageId = rs.getLong(3);
-
-                            RecordImpl record = new RecordImpl(queueId, messageId);
-                            List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
-                            records.add(record);
+                            long format = rs.getLong(1);
+                            byte[] globalId = rs.getBytes(2);
+                            byte[] branchId = rs.getBytes(3);
+                            xids.add(new Xid(format, globalId, branchId));
                         }
                     }
                     finally
@@ -2069,26 +2128,66 @@ public abstract class AbstractJDBCMessag
                 }
                 finally
                 {
-                    pstmt.close();
+                    stmt.close();
                 }
 
-                if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
-                                    enqueues.toArray(new RecordImpl[enqueues.size()]),
-                                    dequeues.toArray(new RecordImpl[dequeues.size()])))
+
+                for (Xid xid : xids)
                 {
-                    break;
+                    List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
+                    List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
+
+                    PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
+
+                    try
+                    {
+                        pstmt.setLong(1, xid.getFormat());
+                        pstmt.setBytes(2, xid.getGlobalId());
+                        pstmt.setBytes(3, xid.getBranchId());
+
+                        ResultSet rs = pstmt.executeQuery();
+                        try
+                        {
+                            while (rs.next())
+                            {
+
+                                String actionType = rs.getString(1);
+                                UUID queueId = UUID.fromString(rs.getString(2));
+                                long messageId = rs.getLong(3);
+
+                                RecordImpl record = new RecordImpl(queueId, messageId);
+                                List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
+                                records.add(record);
+                            }
+                        }
+                        finally
+                        {
+                            rs.close();
+                        }
+                    }
+                    finally
+                    {
+                        pstmt.close();
+                    }
+
+                    if (!handler.handle(new JDBCStoredXidRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId()),
+                                        enqueues.toArray(new RecordImpl[enqueues.size()]),
+                                        dequeues.toArray(new RecordImpl[dequeues.size()])))
+                    {
+                        break;
+                    }
                 }
-            }
 
-        }
-        catch (SQLException e)
-        {
-            throw new StoreException("Error encountered when visiting distributed transactions", e);
+            }
+            catch (SQLException e)
+            {
+                throw new StoreException("Error encountered when visiting distributed transactions", e);
 
-        }
-        finally
-        {
-            JdbcUtils.closeConnection(conn, getLogger());
+            }
+            finally
+            {
+                JdbcUtils.closeConnection(conn, getLogger());
+            }
         }
     }
 
@@ -2135,4 +2234,26 @@ public abstract class AbstractJDBCMessag
     }
 
 
+    private static class JDBCEnqueueRecord implements MessageEnqueueRecord
+    {
+        private final UUID _queueId;
+        private final long _messageNumber;
+
+        public JDBCEnqueueRecord(final UUID queueId,
+                                 final long messageNumber)
+        {
+            _queueId = queueId;
+            _messageNumber = messageNumber;
+        }
+
+        public UUID getQueueId()
+        {
+            return _queueId;
+        }
+
+        public long getMessageNumber()
+        {
+            return _messageNumber;
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Apr  3 22:21:05 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.store;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -65,7 +66,7 @@ public class MemoryMessageStore implemen
         }
 
         @Override
-        public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
+        public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
         {
 
             if(message.getStoredMessage() instanceof StoredMemoryMessage)
@@ -80,18 +81,24 @@ public class MemoryMessageStore implemen
                 _localEnqueueMap.put(queue.getId(), messageIds);
             }
             messageIds.add(message.getMessageNumber());
+            return new MemoryEnqueueRecord(queue.getId(), message.getMessageNumber());
         }
 
         @Override
-        public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
+        public void dequeueMessage(final MessageEnqueueRecord enqueueRecord)
         {
-            Set<Long> messageIds = _localDequeueMap.get(queue.getId());
+            dequeueMessage(enqueueRecord.getQueueId(), enqueueRecord.getMessageNumber());
+        }
+
+        private void dequeueMessage(final UUID queueId, final long messageNumber)
+        {
+            Set<Long> messageIds = _localDequeueMap.get(queueId);
             if (messageIds == null)
             {
                 messageIds = new HashSet<Long>();
-                _localDequeueMap.put(queue.getId(), messageIds);
+                _localDequeueMap.put(queueId, messageIds);
             }
-            messageIds.add(message.getMessageNumber());
+            messageIds.add(messageNumber);
         }
 
         @Override
@@ -110,36 +117,106 @@ public class MemoryMessageStore implemen
         }
 
         @Override
-        public void removeXid(long format, byte[] globalId, byte[] branchId)
+        public void removeXid(final StoredXidRecord record)
         {
-            _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId));
+            _localDistributedTransactionsRemoves.add(new Xid(record.getFormat(),
+                                                             record.getGlobalId(),
+                                                             record.getBranchId()));
         }
 
         @Override
-        public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+        public StoredXidRecord recordXid(final long format,
+                                         final byte[] globalId,
+                                         final byte[] branchId,
+                                         EnqueueRecord[] enqueues,
+                                         DequeueRecord[] dequeues)
         {
             _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues));
+            return new MemoryStoredXidRecord(format, globalId, branchId);
         }
+
+
     }
 
+    private static class MemoryStoredXidRecord implements Transaction.StoredXidRecord
+    {
+        private final long _format;
+        private final byte[] _globalId;
+        private final byte[] _branchId;
+
+        public MemoryStoredXidRecord(final long format, final byte[] globalId, final byte[] branchId)
+        {
+            _format = format;
+            _globalId = globalId;
+            _branchId = branchId;
+        }
+
+        @Override
+        public long getFormat()
+        {
+            return _format;
+        }
+
+        @Override
+        public byte[] getGlobalId()
+        {
+            return _globalId;
+        }
+
+        @Override
+        public byte[] getBranchId()
+        {
+            return _branchId;
+        }
+
+
+        @Override
+        public boolean equals(final Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            final MemoryStoredXidRecord that = (MemoryStoredXidRecord) o;
+
+            return _format == that._format
+                   && Arrays.equals(_globalId, that._globalId)
+                   && Arrays.equals(_branchId, that._branchId);
+
+        }
+
+        @Override
+        public int hashCode()
+        {
+            int result = (int) (_format ^ (_format >>> 32));
+            result = 31 * result + Arrays.hashCode(_globalId);
+            result = 31 * result + Arrays.hashCode(_branchId);
+            return result;
+        }
+    }
     private static final class DistributedTransactionRecords
     {
-        private Transaction.Record[] _enqueues;
-        private Transaction.Record[] _dequeues;
+        private Transaction.EnqueueRecord[] _enqueues;
+        private Transaction.DequeueRecord[] _dequeues;
 
-        public DistributedTransactionRecords(Transaction.Record[] enqueues, Transaction.Record[] dequeues)
+        public DistributedTransactionRecords(Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues)
         {
             super();
             _enqueues = enqueues;
             _dequeues = dequeues;
         }
 
-        public Transaction.Record[] getEnqueues()
+        public Transaction.EnqueueRecord[] getEnqueues()
         {
             return _enqueues;
         }
 
-        public Transaction.Record[] getDequeues()
+        public Transaction.DequeueRecord[] getDequeues()
         {
             return _dequeues;
         }
@@ -201,7 +278,7 @@ public class MemoryMessageStore implemen
     }
 
     @Override
-    public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
+    public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(final T metaData)
     {
         long id = getNextMessageId();
 
@@ -273,81 +350,121 @@ public class MemoryMessageStore implemen
     }
 
     @Override
-    public void visitMessages(final MessageHandler handler) throws StoreException
+    public MessageStoreReader newMessageStoreReader()
     {
-        for (StoredMemoryMessage message : _messages.values())
-        {
-            if(!handler.handle(message))
-            {
-                break;
-            }
-        }
+        return new MemoryMessageStoreReader();
     }
 
-    @Override
-    public StoredMessage<?> getMessage(final long messageId)
+
+    private static class MemoryEnqueueRecord implements MessageEnqueueRecord
     {
-        return _messages.get(messageId);
+        private final UUID _queueId;
+        private final long _messageNumber;
+
+        public MemoryEnqueueRecord(final UUID queueId,
+                                   final long messageNumber)
+        {
+            _queueId = queueId;
+            _messageNumber = messageNumber;
+        }
+
+        public UUID getQueueId()
+        {
+            return _queueId;
+        }
+
+        public long getMessageNumber()
+        {
+            return _messageNumber;
+        }
     }
 
-    @Override
-    public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+    private class MemoryMessageStoreReader implements MessageStoreReader
     {
-        synchronized (_transactionLock)
+        @Override
+        public StoredMessage<?> getMessage(final long messageId)
+        {
+            return _messages.get(messageId);
+        }
+
+        @Override
+        public void close()
         {
-            for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
+
+        }
+
+        @Override
+        public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+        {
+            synchronized (_transactionLock)
             {
-                UUID resourceId = enqueuedEntry.getKey();
-                for (Long messageId : enqueuedEntry.getValue())
+                for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
                 {
-                    if (!handler.handle(resourceId, messageId))
+                    UUID resourceId = enqueuedEntry.getKey();
+                    for (Long messageId : enqueuedEntry.getValue())
                     {
-                        return;
+                        if (!handler.handle(new MemoryEnqueueRecord(resourceId, messageId)))
+                        {
+                            return;
+                        }
                     }
                 }
             }
         }
-    }
 
-    @Override
-    public void visitMessageInstances(TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException
-    {
-        synchronized (_transactionLock)
+        @Override
+        public void visitMessageInstances(TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException
         {
-            Set<Long> ids = _messageInstances.get(queue.getId());
-            if(ids != null)
+            synchronized (_transactionLock)
             {
-                for (long id : ids)
+                Set<Long> ids = _messageInstances.get(queue.getId());
+                if(ids != null)
                 {
-                    if (!handler.handle(queue.getId(), id))
+                    for (long id : ids)
                     {
-                        return;
-                    }
+                        if (!handler.handle(new MemoryEnqueueRecord(queue.getId(), id)))
+                        {
+                            return;
+                        }
 
+                    }
                 }
             }
         }
-    }
 
 
-    @Override
-    public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
-    {
-        synchronized (_transactionLock)
+        @Override
+        public void visitMessages(final MessageHandler handler) throws StoreException
         {
-            for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
+            for (StoredMemoryMessage message : _messages.values())
             {
-                Xid xid = entry.getKey();
-                DistributedTransactionRecords records = entry.getValue();
-                if (!handler.handle(xid.getFormat(),
-                                    xid.getGlobalId(),
-                                    xid.getBranchId(),
-                                    records.getEnqueues(),
-                                    records.getDequeues()))
+                if(!handler.handle(message))
                 {
                     break;
                 }
             }
         }
+
+        @Override
+        public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+        {
+            synchronized (_transactionLock)
+            {
+                for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
+                {
+                    Xid xid = entry.getKey();
+                    DistributedTransactionRecords records = entry.getValue();
+                    if (!handler.handle(new MemoryStoredXidRecord(xid.getFormat(),
+                                                                  xid.getGlobalId(),
+                                                                  xid.getBranchId()),
+                                        records.getEnqueues(),
+                                        records.getDequeues()))
+                    {
+                        break;
+                    }
+                }
+            }
+        }
+
     }
 }

Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageEnqueueRecord.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageEnqueueRecord.java?rev=1671184&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageEnqueueRecord.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageEnqueueRecord.java Fri Apr  3 22:21:05 2015
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.UUID;
+
+public interface MessageEnqueueRecord
+{
+    UUID getQueueId();
+    long getMessageNumber();
+}

Propchange: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageEnqueueRecord.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java?rev=1671184&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java Fri Apr  3 22:21:05 2015
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.nio.ByteBuffer;
+
+public interface MessageHandle<M extends StorableMessageMetaData>
+{
+
+    void addContent(ByteBuffer src);
+
+    StoredMessage<M> allContentAdded();
+
+}

Propchange: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java Fri Apr  3 22:21:05 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store;
 
 
 import java.io.File;
+import java.util.UUID;
 
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
@@ -59,15 +60,7 @@ public interface MessageStore
      */
     void upgradeStoreStructure() throws StoreException;
 
-    void visitMessages(MessageHandler handler) throws StoreException;
-
-    void visitMessageInstances(MessageInstanceHandler handler) throws StoreException;
-    void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException;
-
-    void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException;
-
-    <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
-    StoredMessage<?> getMessage(long messageId);
+    <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData);
 
     /**
      * Is this store capable of persisting the data
@@ -84,4 +77,20 @@ public interface MessageStore
     void closeMessageStore();
 
     void onDelete(ConfiguredObject<?> parent);
+
+    MessageStoreReader newMessageStoreReader();
+
+    interface MessageStoreReader
+    {
+        void visitMessages(MessageHandler handler) throws StoreException;
+
+        void visitMessageInstances(MessageInstanceHandler handler) throws StoreException;
+        void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException;
+
+        void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException;
+
+        StoredMessage<?> getMessage(long messageId);
+        void close();
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Fri Apr  3 22:21:05 2015
@@ -28,7 +28,7 @@ import org.apache.qpid.server.store.hand
 import org.apache.qpid.server.store.handler.MessageHandler;
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 
-public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore, MessageStoreProvider
+public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore, MessageStoreProvider, MessageStore.MessageStoreReader
 {
     private ConfiguredObjectRecord[] _initialRecords;
 
@@ -89,7 +89,7 @@ public abstract class NullMessageStore i
     }
 
     @Override
-    public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+    public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData)
     {
         return null;
     }
@@ -145,6 +145,7 @@ public abstract class NullMessageStore i
         handler.end();
     }
 
+
     @Override
     public void visitMessages(MessageHandler handler) throws StoreException
     {
@@ -176,4 +177,16 @@ public abstract class NullMessageStore i
     {
         return null;
     }
+
+    @Override
+    public MessageStoreReader newMessageStoreReader()
+    {
+        return this;
+    }
+
+    @Override
+    public void close()
+    {
+
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Fri Apr  3 22:21:05 2015
@@ -23,7 +23,9 @@ package org.apache.qpid.server.store;
 
 import java.nio.ByteBuffer;
 
-public class StoredMemoryMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+import org.apache.qpid.transport.util.Functions;
+
+public class StoredMemoryMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
 {
     private final long _messageNumber;
     private ByteBuffer _content;
@@ -40,48 +42,45 @@ public class StoredMemoryMessage<T exten
         return _messageNumber;
     }
 
-    public void addContent(int offsetInMessage, ByteBuffer src)
+    public void addContent(ByteBuffer src)
     {
         if(_content == null)
         {
-            if(offsetInMessage == 0)
-            {
-                _content = src.slice();
-            }
-            else
-            {
-                final int contentSize = _metaData.getContentSize();
-                int size = (contentSize < offsetInMessage + src.remaining())
-                        ? offsetInMessage + src.remaining()
-                        : contentSize;
-                _content = ByteBuffer.allocate(size);
-                addContent(offsetInMessage, src);
-            }
+            _content = src.slice();
+            _content.position(_content.limit());
         }
         else
         {
-            if(_content.limit() >= offsetInMessage + src.remaining())
+            if(_content.remaining() >= src.remaining())
             {
-                _content.position(offsetInMessage);
-                _content.put(src);
-                _content.position(0);
+                _content.put(src.duplicate());
             }
             else
             {
                 final int contentSize = _metaData.getContentSize();
-                int size = (contentSize < offsetInMessage + src.remaining())
-                        ? offsetInMessage + src.remaining()
+                int size = (contentSize < _content.position() + src.remaining())
+                        ? _content.position() + src.remaining()
                         : contentSize;
                 ByteBuffer oldContent = _content;
+                oldContent.flip();
                 _content = ByteBuffer.allocate(size);
                 _content.put(oldContent);
-                _content.position(0);
-                addContent(offsetInMessage, src);
+                _content.put(src.duplicate());
             }
 
         }
     }
 
+    @Override
+    public StoredMessage<T> allContentAdded()
+    {
+        if(_content != null)
+        {
+            _content.flip();
+        }
+        return this;
+    }
+
     public int getContent(int offset, ByteBuffer dst)
     {
         if(_content == null)
@@ -142,4 +141,5 @@ public class StoredMemoryMessage<T exten
     {
         return false;
     }
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java Fri Apr  3 22:21:05 2015
@@ -28,8 +28,6 @@ public interface StoredMessage<M extends
 
     public long getMessageNumber();
 
-    void addContent(int offsetInMessage, ByteBuffer src);
-
     int getContent(int offsetInMessage, ByteBuffer dst);
 
     ByteBuffer getContent(int offsetInMessage, int size);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java Fri Apr  3 22:21:05 2015
@@ -25,24 +25,17 @@ import org.apache.qpid.server.util.Futur
 
 public interface Transaction
 {
+
     /**
      * Places a message onto a specified queue, in a given transactional context.
      *
      *
-     *
-     * @param queue     The queue to place the message on.
+     *  @param queue     The queue to place the message on.
      * @param message
      */
-    void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message);
-
-    /**
-     * Extracts a message from a specified queue, in a given transactional context.
-     *
-     * @param queue     The queue to place the message on.
-     * @param message The message to dequeue.
-     */
-    void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message);
+    MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message);
 
+    void dequeueMessage(MessageEnqueueRecord enqueueRecord);
 
     /**
      * Commits all operations performed within a given transactional context.
@@ -63,14 +56,28 @@ public interface Transaction
     void abortTran();
 
 
-    public static interface Record
+    interface EnqueueRecord
     {
         TransactionLogResource getResource();
         EnqueueableMessage getMessage();
     }
 
-    void removeXid(long format, byte[] globalId, byte[] branchId);
+    interface DequeueRecord
+    {
+        MessageEnqueueRecord getEnqueueRecord();
+    }
+
+    void removeXid(StoredXidRecord record);
+
+
+    StoredXidRecord recordXid(long format, byte[] globalId, byte[] branchId, EnqueueRecord[] enqueues,
+                   DequeueRecord[] dequeues);
 
-    void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues,
-                   Transaction.Record[] dequeues);
+    interface StoredXidRecord
+    {
+        long getFormat();
+        byte[] getGlobalId();
+        byte[] getBranchId();
+
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java Fri Apr  3 22:21:05 2015
@@ -20,11 +20,13 @@
  */
 package org.apache.qpid.server.store.handler;
 
-import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.Transaction.DequeueRecord;
+import org.apache.qpid.server.store.Transaction.EnqueueRecord;
 
 public interface DistributedTransactionHandler
 {
 
-    boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues);
+    boolean handle(Transaction.StoredXidRecord storedXid, EnqueueRecord[] enqueues, DequeueRecord[] dequeues);
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java Fri Apr  3 22:21:05 2015
@@ -20,10 +20,11 @@
  */
 package org.apache.qpid.server.store.handler;
 
-import java.util.UUID;
+
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 
 public interface MessageInstanceHandler
 {
-    boolean handle(UUID queueId, long messageId);
+    boolean handle(MessageEnqueueRecord record);
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Fri Apr  3 22:21:05 2015
@@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.util.FutureResult;
 import org.apache.qpid.server.store.Transaction;
@@ -88,21 +88,21 @@ public class AsyncAutoCommitTransaction
 
     }
 
-    public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+    public void dequeue(MessageEnqueueRecord record, Action postTransactionAction)
     {
         Transaction txn = null;
         try
         {
             FutureResult future;
-            if(queue.getMessageDurability().persist(message.isPersistent()))
+            if(record != null)
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
+                    _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId());
                 }
 
                 txn = _messageStore.newTransaction();
-                txn.dequeueMessage(queue, message);
+                txn.dequeueMessage(record);
                 future = txn.commitTranAsync();
 
                 txn = null;
@@ -121,6 +121,7 @@ public class AsyncAutoCommitTransaction
 
     }
 
+
     private void addFuture(final FutureResult future, final Action action)
     {
         if(action != null)
@@ -160,14 +161,13 @@ public class AsyncAutoCommitTransaction
         {
             for(MessageInstance entry : queueEntries)
             {
-                ServerMessage message = entry.getMessage();
-                TransactionLogResource queue = entry.getOwningResource();
+                MessageEnqueueRecord record = entry.getEnqueueRecord();
 
-                if(queue.getMessageDurability().persist(message.isPersistent()))
+                if(record != null)
                 {
                     if (_logger.isDebugEnabled())
                     {
-                        _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
+                        _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId());
                     }
 
                     if(txn == null)
@@ -175,7 +175,7 @@ public class AsyncAutoCommitTransaction
                         txn = _messageStore.newTransaction();
                     }
 
-                    txn.dequeueMessage(queue, message);
+                    txn.dequeueMessage(record);
                 }
 
             }
@@ -200,12 +200,13 @@ public class AsyncAutoCommitTransaction
     }
 
 
-    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction)
     {
         Transaction txn = null;
         try
         {
             FutureResult future;
+            final MessageEnqueueRecord enqueueRecord;
             if(queue.getMessageDurability().persist(message.isPersistent()))
             {
                 if (_logger.isDebugEnabled())
@@ -214,30 +215,65 @@ public class AsyncAutoCommitTransaction
                 }
 
                 txn = _messageStore.newTransaction();
-                txn.enqueueMessage(queue, message);
+                enqueueRecord = txn.enqueueMessage(queue, message);
                 future = txn.commitTranAsync();
                 txn = null;
             }
             else
             {
                 future = FutureResult.IMMEDIATE_FUTURE;
+                enqueueRecord = null;
             }
-            addEnqueueFuture(future, postTransactionAction, message.isPersistent());
+            final EnqueueAction underlying = postTransactionAction;
+            addEnqueueFuture(future, new Action()
+            {
+                @Override
+                public void postCommit()
+                {
+                    underlying.postCommit(enqueueRecord);
+                }
+
+                @Override
+                public void onRollback()
+                {
+                    underlying.postCommit(enqueueRecord);
+                }
+            }, message.isPersistent());
             postTransactionAction = null;
-        }finally
+        }
+        finally
         {
-            rollbackIfNecessary(postTransactionAction, txn);
+            final EnqueueAction underlying = postTransactionAction;
+
+            rollbackIfNecessary(new Action()
+            {
+                @Override
+                public void postCommit()
+                {
+
+                }
+
+                @Override
+                public void onRollback()
+                {
+                    if(underlying != null)
+                    {
+                        underlying.onRollback();
+                    }
+                }
+            }, txn);
         }
 
 
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction)
     {
         Transaction txn = null;
         try
         {
-
+            final MessageEnqueueRecord[] records = new MessageEnqueueRecord[queues.size()];
+            int i = 0;
             for(BaseQueue queue : queues)
             {
                 if (queue.getMessageDurability().persist(message.isPersistent()))
@@ -250,10 +286,11 @@ public class AsyncAutoCommitTransaction
                     {
                         txn = _messageStore.newTransaction();
                     }
-                    txn.enqueueMessage(queue, message);
+                    records[i] = txn.enqueueMessage(queue, message);
 
 
                 }
+                i++;
             }
 
             FutureResult future;
@@ -266,13 +303,49 @@ public class AsyncAutoCommitTransaction
             {
                 future = FutureResult.IMMEDIATE_FUTURE;
             }
-            addEnqueueFuture(future, postTransactionAction, message.isPersistent());
+            final EnqueueAction underlying = postTransactionAction;
+            addEnqueueFuture(future, new Action()
+            {
+                @Override
+                public void postCommit()
+                {
+                    if(underlying != null)
+                    {
+                        underlying.postCommit(records);
+                    }
+                }
+
+                @Override
+                public void onRollback()
+                {
+                     underlying.onRollback();
+                }
+            }, message.isPersistent());
             postTransactionAction = null;
 
 
-        }finally
+        }
+        finally
         {
-            rollbackIfNecessary(postTransactionAction, txn);
+            final EnqueueAction underlying = postTransactionAction;
+
+            rollbackIfNecessary(new Action()
+            {
+                @Override
+                public void postCommit()
+                {
+
+                }
+
+                @Override
+                public void onRollback()
+                {
+                    if(underlying != null)
+                    {
+                        underlying.onRollback();
+                    }
+                }
+            }, txn);
         }
 
     }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Fri Apr  3 22:21:05 2015
@@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -73,20 +73,20 @@ public class AutoCommitTransaction imple
         immediateAction.postCommit();
     }
 
-    public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+    public void dequeue(MessageEnqueueRecord record, Action postTransactionAction)
     {
         Transaction txn = null;
         try
         {
-            if(queue.getMessageDurability().persist(message.isPersistent()))
+            if(record != null)
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
+                    _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId());
                 }
 
                 txn = _messageStore.newTransaction();
-                txn.dequeueMessage(queue, message);
+                txn.dequeueMessage(record);
                 txn.commitTran();
                 txn = null;
             }
@@ -100,6 +100,7 @@ public class AutoCommitTransaction imple
 
     }
 
+
     public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
     {
         Transaction txn = null;
@@ -107,14 +108,12 @@ public class AutoCommitTransaction imple
         {
             for(MessageInstance entry : queueEntries)
             {
-                ServerMessage message = entry.getMessage();
-                TransactionLogResource queue = entry.getOwningResource();
-
-                if(queue.getMessageDurability().persist(message.isPersistent()))
+                MessageEnqueueRecord enqueueRecord = entry.getEnqueueRecord();
+                if(enqueueRecord != null)
                 {
                     if (_logger.isDebugEnabled())
                     {
-                        _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
+                        _logger.debug("Dequeue of message number " + enqueueRecord.getMessageNumber() + " from transaction log. Queue : " + enqueueRecord.getQueueId());
                     }
 
                     if(txn == null)
@@ -122,7 +121,7 @@ public class AutoCommitTransaction imple
                         txn = _messageStore.newTransaction();
                     }
 
-                    txn.dequeueMessage(queue, message);
+                    txn.dequeueMessage(enqueueRecord);
                 }
 
             }
@@ -142,11 +141,12 @@ public class AutoCommitTransaction imple
     }
 
 
-    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction)
     {
         Transaction txn = null;
         try
         {
+            final MessageEnqueueRecord record;
             if(queue.getMessageDurability().persist(message.isPersistent()))
             {
                 if (_logger.isDebugEnabled())
@@ -155,27 +155,52 @@ public class AutoCommitTransaction imple
                 }
 
                 txn = _messageStore.newTransaction();
-                txn.enqueueMessage(queue, message);
+                record = txn.enqueueMessage(queue, message);
                 txn.commitTran();
                 txn = null;
             }
-            postTransactionAction.postCommit();
+            else
+            {
+                record = null;
+            }
+            if(postTransactionAction != null)
+            {
+                postTransactionAction.postCommit(record);
+            }
             postTransactionAction = null;
         }
         finally
         {
-            rollbackIfNecessary(postTransactionAction, txn);
+            final EnqueueAction underlying = postTransactionAction;
+            rollbackIfNecessary(new Action()
+            {
+                @Override
+                public void postCommit()
+                {
+
+                }
+
+                @Override
+                public void onRollback()
+                {
+                    if(underlying != null)
+                    {
+                        underlying.onRollback();
+                    }
+                }
+            }, txn);
         }
 
 
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction)
     {
         Transaction txn = null;
         try
         {
-
+            MessageEnqueueRecord[] enqueueRecords = new MessageEnqueueRecord[queues.size()];
+            int i = 0;
             for(BaseQueue queue : queues)
             {
                 if (queue.getMessageDurability().persist(message.isPersistent()))
@@ -188,11 +213,11 @@ public class AutoCommitTransaction imple
                     {
                         txn = _messageStore.newTransaction();
                     }
-                    txn.enqueueMessage(queue, message);
+                    enqueueRecords[i] = txn.enqueueMessage(queue, message);
 
 
                 }
-
+                i++;
             }
             if (txn != null)
             {
@@ -200,13 +225,34 @@ public class AutoCommitTransaction imple
                 txn = null;
             }
 
-            postTransactionAction.postCommit();
+            if(postTransactionAction != null)
+            {
+                postTransactionAction.postCommit(enqueueRecords);
+            }
             postTransactionAction = null;
 
 
-        }finally
+        }
+        finally
         {
-            rollbackIfNecessary(postTransactionAction, txn);
+            final EnqueueAction underlying = postTransactionAction;
+            rollbackIfNecessary(new Action()
+            {
+                @Override
+                public void postCommit()
+                {
+
+                }
+
+                @Override
+                public void onRollback()
+                {
+                    if(underlying != null)
+                    {
+                        underlying.onRollback();
+                    }
+                }
+            }, txn);
         }
 
     }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1671184&r1=1671183&r2=1671184&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java Fri Apr  3 22:21:05 2015
@@ -25,6 +25,7 @@ import org.apache.qpid.server.message.En
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -74,26 +75,27 @@ public class DistributedTransaction impl
         }
     }
 
-    public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+    public void dequeue(MessageEnqueueRecord record, Action postTransactionAction)
     {
         if(_branch != null)
         {
-            _branch.dequeue(queue, message);
+            _branch.dequeue(record);
             _branch.addPostTransactionAction(postTransactionAction);
         }
         else
         {
-            _autoCommitTransaction.dequeue(queue, message, postTransactionAction);
+            _autoCommitTransaction.dequeue(record, postTransactionAction);
         }
     }
 
+
     public void dequeue(Collection<MessageInstance> messages, Action postTransactionAction)
     {
         if(_branch != null)
         {
             for(MessageInstance entry : messages)
             {
-                _branch.dequeue(entry.getOwningResource(), entry.getMessage());
+                _branch.dequeue(entry.getEnqueueRecord());
             }
             _branch.addPostTransactionAction(postTransactionAction);
         }
@@ -103,12 +105,33 @@ public class DistributedTransaction impl
         }
     }
 
-    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, final EnqueueAction postTransactionAction)
     {
         if(_branch != null)
         {
-            _branch.enqueue(queue, message);
-            _branch.addPostTransactionAction(postTransactionAction);
+            final MessageEnqueueRecord[] enqueueRecords = new MessageEnqueueRecord[1];
+                _branch.enqueue(queue, message, new org.apache.qpid.server.util.Action<MessageEnqueueRecord>()
+                {
+                    @Override
+                    public void performAction(final MessageEnqueueRecord record)
+                    {
+                        enqueueRecords[0] = record;
+                    }
+                });
+            addPostTransactionAction(new Action()
+            {
+                @Override
+                public void postCommit()
+                {
+                    postTransactionAction.postCommit(enqueueRecords);
+                }
+
+                @Override
+                public void onRollback()
+                {
+                    postTransactionAction.onRollback();
+                }
+            });
         }
         else
         {
@@ -117,15 +140,39 @@ public class DistributedTransaction impl
     }
 
     public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message,
-                        Action postTransactionAction)
+                        final EnqueueAction postTransactionAction)
     {
         if(_branch != null)
         {
+            final MessageEnqueueRecord[] enqueueRecords = new MessageEnqueueRecord[queues.size()];
+            int i = 0;
             for(BaseQueue queue : queues)
             {
-                _branch.enqueue(queue, message);
+                final int pos = i;
+                _branch.enqueue(queue, message, new org.apache.qpid.server.util.Action<MessageEnqueueRecord>()
+                {
+                    @Override
+                    public void performAction(final MessageEnqueueRecord record)
+                    {
+                        enqueueRecords[pos] = record;
+                    }
+                });
+                i++;
             }
-            _branch.addPostTransactionAction(postTransactionAction);
+            addPostTransactionAction(new Action()
+            {
+                @Override
+                public void postCommit()
+                {
+                    postTransactionAction.postCommit(enqueueRecords);
+                }
+
+                @Override
+                public void onRollback()
+                {
+                    postTransactionAction.onRollback();
+                }
+            });
         }
         else
         {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message