qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1234111 - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker/src/main/java/org/apache/qpid/server/transport/ broker/src/main/java/org/apache/qpid/server/txn/ common/src/main/java/org/apache/qpi...
Date Fri, 20 Jan 2012 20:28:00 GMT
Author: rgodfrey
Date: Fri Jan 20 20:27:59 2012
New Revision: 1234111

URL: http://svn.apache.org/viewvc?rev=1234111&view=rev
Log:
QPID-3774 : allow out of order completion of persistent enqueues / dequeues

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
      - copied, changed from r1231491, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Fri Jan 20 20:27:59 2012
@@ -30,6 +30,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -94,6 +95,8 @@ public class BDBMessageStore implements 
 {
     private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
 
+    private static final int LOCK_RETRY_ATTEMPTS = 5;
+
     static final int DATABASE_FORMAT_VERSION = 5;
     private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
     public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
@@ -893,91 +896,161 @@ public class BDBMessageStore implements 
     {
 
         // _log.debug("public void removeMessage(Long messageId = " + messageId): called");
-
+        boolean complete = false;
         com.sleepycat.je.Transaction tx = null;
 
         Cursor cursor = null;
+        Random rand = null;
+        int attempts = 0;
         try
         {
-            tx = _environment.beginTransaction(null, null);
+            do
+            {
+                tx = null;
+                cursor = null;
+                try
+                {
+                    tx = _environment.beginTransaction(null, null);
 
-            //remove the message meta data from the store
-            DatabaseEntry key = new DatabaseEntry();
-            LongBinding.longToEntry(messageId, key);
+                    //remove the message meta data from the store
+                    DatabaseEntry key = new DatabaseEntry();
+                    LongBinding.longToEntry(messageId, key);
 
-            if (_log.isDebugEnabled())
-            {
-                _log.debug("Removing message id " + messageId);
-            }
+                    if (_log.isDebugEnabled())
+                    {
+                        _log.debug("Removing message id " + messageId);
+                    }
 
 
-            OperationStatus status = _messageMetaDataDb.delete(tx, key);
-            if (status == OperationStatus.NOTFOUND)
-            {
-                _log.info("Message not found (attempt to remove failed - probably application
initiated rollback) " +
-                messageId);
-            }
+                    OperationStatus status = _messageMetaDataDb.delete(tx, key);
+                    if (status == OperationStatus.NOTFOUND)
+                    {
+                        _log.info("Message not found (attempt to remove failed - probably
application initiated rollback) " +
+                        messageId);
+                    }
 
-            if (_log.isDebugEnabled())
-            {
-                _log.debug("Deleted metadata for message " + messageId);
-            }
+                    if (_log.isDebugEnabled())
+                    {
+                        _log.debug("Deleted metadata for message " + messageId);
+                    }
 
-            //now remove the content data from the store if there is any.
+                    //now remove the content data from the store if there is any.
 
-            DatabaseEntry contentKeyEntry = new DatabaseEntry();
-            MessageContentKey_5 mck = new MessageContentKey_5(messageId,0);
+                    DatabaseEntry contentKeyEntry = new DatabaseEntry();
+                    MessageContentKey_5 mck = new MessageContentKey_5(messageId,0);
 
-            TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
-            contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+                    TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
+                    contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
 
-            //Use a partial record for the value to prevent retrieving the
-            //data itself as we only need the key to identify what to remove.
-            DatabaseEntry value = new DatabaseEntry();
-            value.setPartial(0, 0, true);
+                    //Use a partial record for the value to prevent retrieving the
+                    //data itself as we only need the key to identify what to remove.
+                    DatabaseEntry value = new DatabaseEntry();
+                    value.setPartial(0, 0, true);
 
-            cursor = _messageContentDb.openCursor(tx, null);
+                    cursor = _messageContentDb.openCursor(tx, null);
 
-            status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
-            while (status == OperationStatus.SUCCESS)
-            {
-                mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
+                    status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
+                    while (status == OperationStatus.SUCCESS)
+                    {
+                        mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
 
-                if(mck.getMessageId() != messageId)
-                {
-                    //we have exhausted all chunks for this message id, break
-                    break;
+                        if(mck.getMessageId() != messageId)
+                        {
+                            //we have exhausted all chunks for this message id, break
+                            break;
+                        }
+                        else
+                        {
+                            status = cursor.delete();
+
+                            if(status == OperationStatus.NOTFOUND)
+                            {
+                                cursor.close();
+                                cursor = null;
+
+                                tx.abort();
+                                throw new AMQStoreException("Content chunk offset" + mck.getOffset()
+ " not found for message " + messageId);
+                            }
+
+                            if (_log.isDebugEnabled())
+                            {
+                                _log.debug("Deleted content chunk offset " + mck.getOffset()
+ " for message " + messageId);
+                            }
+                        }
+
+                        status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
+                    }
+
+                    cursor.close();
+
+                    cursor = null;
+
+                    commit(tx, sync);
+                    complete = true;
                 }
-                else
+                catch (LockConflictException e)
                 {
-                    status = cursor.delete();
+                    try
+                    {
+                        if(cursor != null)
+                        {
+                            cursor.close();
+                        }
+                    }
+                    catch(DatabaseException e1)
+                    {
+                        _log.warn("Unable to close cursor after LockConflictException", e1);
+                        // rethrow the original log conflict exception, the secondary exception
should already have
+                        // been logged.
+                        throw e;
+                    }
+                    try
+                    {
+                        if(tx != null)
+                        {
+                            tx.abort();
+                        }
+                    }
+                    catch(DatabaseException e2)
+                    {
+                        _log.warn("Unable to abort transaction after LockConflictExcption",
e2);
+                        // rethrow the original log conflict exception, the secondary exception
should already have
+                        // been logged.
+                        throw e;
+                    }
+
 
-                    if(status == OperationStatus.NOTFOUND)
+                    _log.warn("Lock timeout exception. Retrying (attempt "
+                              + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
+
+                    if(++attempts < LOCK_RETRY_ATTEMPTS)
                     {
-                        cursor.close();
-                        cursor = null;
+                        if(rand == null)
+                        {
+                            rand = new Random();
+                        }
 
-                        tx.abort();
-                        throw new AMQStoreException("Content chunk offset" + mck.getOffset()
+ " not found for message " + messageId);
-                    }
+                        try
+                        {
+                            Thread.sleep(500l + (long)(500l * rand.nextDouble()));
+                        }
+                        catch (InterruptedException e1)
+                        {
 
-                    if (_log.isDebugEnabled())
+                        }
+                    }
+                    else
                     {
-                        _log.debug("Deleted content chunk offset " + mck.getOffset() + "
for message " + messageId);
+                        // rethrow the lock conflict exception since we could not solve by
retrying
+                        throw e;
                     }
                 }
-
-                status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
             }
-
-            cursor.close();
-            cursor = null;
-
-            commit(tx, sync);
+            while(!complete);
         }
         catch (DatabaseException e)
         {
-            e.printStackTrace();
+            _log.error("Unexpected BDB exception", e);
 
             if (tx != null)
             {
@@ -1009,7 +1082,7 @@ public class BDBMessageStore implements 
                 }
                 catch (DatabaseException e)
                 {
-                    throw new AMQStoreException("Error closing database connection: " + e.getMessage(),
e);
+                    throw new AMQStoreException("Error closing cursor: " + e.getMessage(),
e);
                 }
             }
         }
@@ -2073,7 +2146,7 @@ public class BDBMessageStore implements 
                         {
                             // RHM-7 Periodically wake up and check, just in case we
                             // missed a notification. Don't want to lock the broker hard.
-                            _lock.wait(250);
+                            _lock.wait(1000);
                         }
                         catch (InterruptedException e)
                         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
Fri Jan 20 20:27:59 2012
@@ -27,6 +27,7 @@ import java.security.Principal;
 import java.text.MessageFormat;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
@@ -63,7 +64,7 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.subscription.Subscription_0_10;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -84,18 +85,20 @@ import org.apache.qpid.transport.Session
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ServerSession extends Session implements AuthorizationHolder, SessionConfig,
AMQSessionModel, LogSubject
+public class ServerSession extends Session 
+        implements AuthorizationHolder, SessionConfig, 
+                   AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
 {
     private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
     
     private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
     private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
+    private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
 
     private final UUID _id;
     private ConnectionConfig _connectionConfig;
     private long _createTime = System.currentTimeMillis();
     private LogActor _actor = GenericActor.getInstance(this);
-    private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
 
     private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue,
Boolean>();
 
@@ -147,7 +150,7 @@ public class ServerSession extends Sessi
     {
         super(connection, delegate, name, expiry);
         _connectionConfig = connConfig;        
-        _transaction = new AutoCommitTransaction(this.getMessageStore());
+        _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
         _logSubject = new ChannelLogSubject(this);
         _id = getConfigStore().createId();
         getConfigStore().addConfiguredObject(this);
@@ -184,16 +187,7 @@ public class ServerSession extends Sessi
             invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
         }
         getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
-        PostEnqueueAction postTransactionAction;
-        if(isTransactional())
-        {
-           postTransactionAction = new PostEnqueueAction(queues, message) ;
-        }
-        else
-        {
-            postTransactionAction = _postEnqueueAction;
-            postTransactionAction.setState(queues, message);
-        }
+        PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message,
isTransactional()) ;
         _transaction.enqueue(queues,message, postTransactionAction, 0L);
         incrementOutstandingTxnsIfNecessary();
         updateTransactionalActivity();
@@ -221,12 +215,12 @@ public class ServerSession extends Sessi
     public void accept(RangeSet ranges)
     {
         dispositionChange(ranges, new MessageDispositionAction()
-                                      {
-                                          public void performAction(MessageDispositionChangeListener
listener)
-                                          {
-                                              listener.onAccept();
-                                          }
-                                      });
+        {
+            public void performAction(MessageDispositionChangeListener listener)
+            {
+                listener.onAccept();
+            }
+        });
     }
 
 
@@ -444,10 +438,7 @@ public class ServerSession extends Sessi
 
     public boolean isTransactional()
     {
-        // this does not look great but there should only be one "non-transactional"
-        // transactional context, while there could be several transactional ones in
-        // theory
-        return !(_transaction instanceof AutoCommitTransaction);
+        return _transaction.isTransactional();
     }
     
     public boolean inTransaction()
@@ -765,6 +756,7 @@ public class ServerSession extends Sessi
         {
             subscription_0_10.flushCreditState(false);
         }
+        awaitCommandCompletion();
     }
 
     private class PostEnqueueAction implements ServerTransaction.Action
@@ -774,17 +766,12 @@ public class ServerSession extends Sessi
         private ServerMessage _message;
         private final boolean _transactional;
 
-        public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message)
+        public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message,
final boolean transactional)
         {
-            _transactional = true;
+            _transactional = transactional;
             setState(queues, message);
         }
 
-        public PostEnqueueAction()
-        {
-            _transactional = false;
-        }
-
         public void setState(List<? extends BaseQueue> queues, ServerMessage message)
         {
             _message = message;
@@ -830,4 +817,76 @@ public class ServerSession extends Sessi
     {
         return _blocking.get();
     }
+
+    private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
+
+    public void completeAsyncCommands()
+    {
+        AsyncCommand cmd;
+        while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
+        {
+            cmd.complete();
+            _unfinishedCommandsQueue.poll();
+        }
+        while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
+        {
+            cmd = _unfinishedCommandsQueue.poll();
+            cmd.awaitReadyForCompletion();
+            cmd.complete();
+        }
+    }
+
+
+    public void awaitCommandCompletion()
+    {
+        AsyncCommand cmd;
+        while((cmd = _unfinishedCommandsQueue.poll()) != null)
+        {
+            cmd.awaitReadyForCompletion();
+            cmd.complete();
+        }
+    }
+
+
+    public Object getAsyncCommandMark()
+    {
+        return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
+    }
+
+    public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action
action)
+    {
+        _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
+    }
+
+    private static class AsyncCommand
+    {
+        private final MessageStore.StoreFuture _future;
+        private ServerTransaction.Action _action;
+
+        public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action
action)
+        {
+            _future = future;
+            _action = action;
+        }
+
+        void awaitReadyForCompletion()
+        {
+            _future.waitForCompletion();
+        }
+
+        void complete()
+        {
+            if(!_future.isComplete())
+            {
+                _future.waitForCompletion();
+            }
+            _action.postCommit();
+            _action = null;
+        }
+
+        boolean isReadyForCompletion()
+        {
+            return _future.isComplete();
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
Fri Jan 20 20:27:59 2012
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.transport;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +54,7 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.*;
 
@@ -81,9 +81,22 @@ public class ServerSessionDelegate exten
 
             if(!session.isClosing())
             {
-                super.command(session, method);
+                Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark();
+                super.command(session, method, false);
+                Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
+                if(newOutstanding == null || newOutstanding == asyncCommandMark)
+                {
+                    session.processed(method);    
+                }
+                
+                if(newOutstanding != null)
+                {
+                    ((ServerSession)session).completeAsyncCommands();
+                }
+
                 if (method.isSync())
                 {
+                    ((ServerSession)session).awaitCommandCompletion();
                     session.flushProcessed();
                 }
             }
@@ -98,7 +111,13 @@ public class ServerSessionDelegate exten
     @Override
     public void messageAccept(Session session, MessageAccept method)
     {
-        ((ServerSession)session).accept(method.getTransfers());
+        final ServerSession serverSession = (ServerSession) session;
+        serverSession.accept(method.getTransfers());
+        if(!serverSession.isTransactional())
+        {
+            serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE,
+                                       new CommandProcessedAction(serverSession, method));
+        }
     }
 
     @Override
@@ -252,7 +271,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void messageTransfer(Session ssn, MessageTransfer xfr)
+    public void messageTransfer(Session ssn, final MessageTransfer xfr)
     {
         final Exchange exchange = getExchangeForMessage(ssn, xfr);
 
@@ -294,12 +313,13 @@ public class ServerSessionDelegate exten
             exchangeInUse = exchange;
         }
 
+        final ServerSession serverSession = (ServerSession) ssn;
         if(!queues.isEmpty())
         {
             final MessageStore store = getVirtualHost(ssn).getMessageStore();
             final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr,
messageMetaData, store);
-            MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
-            ((ServerSession) ssn).enqueue(message, queues);
+            MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
+            serverSession.enqueue(message, queues);
             storeMessage.flushToStore();
         }
         else
@@ -313,13 +333,19 @@ public class ServerSessionDelegate exten
             }
             else
             {
-                ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(),
messageMetaData.getRoutingKey()));
+                serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(),
messageMetaData.getRoutingKey()));
             }
         }
 
 
-
-        ssn.processed(xfr);
+        if(serverSession.isTransactional())
+        {
+            serverSession.processed(xfr);
+        }
+        else
+        {
+            serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession,
xfr));
+        }
     }
 
     private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer
xfr,
@@ -404,6 +430,13 @@ public class ServerSessionDelegate exten
 
 
     @Override
+    public void executionSync(final Session ssn, final ExecutionSync sync)
+    {
+        ((ServerSession)ssn).awaitCommandCompletion();
+        super.executionSync(ssn, sync);
+    }
+
+    @Override
     public void exchangeDeclare(Session session, ExchangeDeclare method)
     {
         String exchangeName = method.getExchange();
@@ -1269,4 +1302,25 @@ public class ServerSessionDelegate exten
         final ServerConnection scon = (ServerConnection) session.getConnection();
         SecurityManager.setThreadSubject(scon.getAuthorizedSubject());
     }
+
+    private static class CommandProcessedAction implements ServerTransaction.Action
+    {
+        private final ServerSession _serverSession;
+        private final Method _method;
+
+        public CommandProcessedAction(final ServerSession serverSession, final Method xfr)
+        {
+            _serverSession = serverSession;
+            _method = xfr;
+        }
+
+        public void postCommit()
+        {
+            _serverSession.processed(_method);
+        }
+
+        public void onRollback()
+        {
+        }
+    }
 }

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
(from r1231491, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java&r1=1231491&r2=1234111&rev=1234111&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
Fri Jan 20 20:27:59 2012
@@ -20,10 +20,6 @@
  */
 package org.apache.qpid.server.txn;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQStoreException;
@@ -32,6 +28,10 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStore.StoreFuture;
+
+import java.util.Collection;
+import java.util.List;
 
 /**
  * An implementation of ServerTransaction where each enqueue/dequeue
@@ -40,15 +40,23 @@ import org.apache.qpid.server.store.Mess
  * Since there is no long-lived transaction, the commit and rollback methods of
  * this implementation are empty.
  */
-public class AutoCommitTransaction implements ServerTransaction
+public class AsyncAutoCommitTransaction implements ServerTransaction
 {
-    protected static final Logger _logger = Logger.getLogger(AutoCommitTransaction.class);
+    protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class);
 
     private final MessageStore _messageStore;
+    private final FutureRecorder _futureRecorder;
 
-    public AutoCommitTransaction(MessageStore transactionLog)
+    public static interface FutureRecorder
+    {
+        public void recordFuture(StoreFuture future, Action action);
+
+    }
+
+    public AsyncAutoCommitTransaction(MessageStore transactionLog, FutureRecorder recorder)
     {
         _messageStore = transactionLog;
+        _futureRecorder = recorder;
     }
 
     public long getTransactionStartTime()
@@ -62,7 +70,8 @@ public class AutoCommitTransaction imple
      */
     public void addPostTransactionAction(final Action immediateAction)
     {
-        immediateAction.postCommit();
+        addFuture(MessageStore.IMMEDIATE_FUTURE, immediateAction);
+
     }
 
     public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
@@ -70,6 +79,7 @@ public class AutoCommitTransaction imple
         MessageStore.Transaction txn = null;
         try
         {
+            MessageStore.StoreFuture future;
             if(message.isPersistent() && queue.isDurable())
             {
                 if (_logger.isDebugEnabled())
@@ -79,10 +89,15 @@ public class AutoCommitTransaction imple
 
                 txn = _messageStore.newTransaction();
                 txn.dequeueMessage(queue, message);
-                txn.commitTran();
+                future = txn.commitTranAsync();
+                
                 txn = null;
             }
-            postTransactionAction.postCommit();
+            else
+            {
+                future = MessageStore.IMMEDIATE_FUTURE;
+            }
+            addFuture(future, postTransactionAction);
             postTransactionAction = null;
         }
         catch (AMQException e)
@@ -97,6 +112,11 @@ public class AutoCommitTransaction imple
 
     }
 
+    private void addFuture(final MessageStore.StoreFuture future, final Action action)
+    {
+        _futureRecorder.recordFuture(future, action);
+    }
+
     public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
     {
         MessageStore.Transaction txn = null;
@@ -123,12 +143,17 @@ public class AutoCommitTransaction imple
                 }
 
             }
+            MessageStore.StoreFuture future;
             if(txn != null)
             {
-                txn.commitTran();
+                future = txn.commitTranAsync();
                 txn = null;
             }
-            postTransactionAction.postCommit();
+            else
+            {
+                future = MessageStore.IMMEDIATE_FUTURE;    
+            }
+            addFuture(future, postTransactionAction);
             postTransactionAction = null;
         }
         catch (AMQException e)
@@ -149,6 +174,7 @@ public class AutoCommitTransaction imple
         MessageStore.Transaction txn = null;
         try
         {
+            MessageStore.StoreFuture future;
             if(message.isPersistent() && queue.isDurable())
             {
                 if (_logger.isDebugEnabled())
@@ -158,10 +184,14 @@ public class AutoCommitTransaction imple
 
                 txn = _messageStore.newTransaction();
                 txn.enqueueMessage(queue, message);
-                txn.commitTran();
+                future = txn.commitTranAsync();
                 txn = null;
             }
-            postTransactionAction.postCommit();
+            else
+            {
+                future = MessageStore.IMMEDIATE_FUTURE;
+            }
+            addFuture(future, postTransactionAction);
             postTransactionAction = null;
         }
         catch (AMQException e)
@@ -205,13 +235,17 @@ public class AutoCommitTransaction imple
                 }
                 
             }
+            MessageStore.StoreFuture future;
             if (txn != null)
             {
-                txn.commitTran();
+                future = txn.commitTranAsync();
                 txn = null;
             }
-
-            postTransactionAction.postCommit();
+            else
+            {
+                future = MessageStore.IMMEDIATE_FUTURE;
+            }
+            addFuture(future, postTransactionAction);
             postTransactionAction = null;
 
 
@@ -231,7 +265,17 @@ public class AutoCommitTransaction imple
 
     public void commit(final Runnable immediatePostTransactionAction)
     {
-        immediatePostTransactionAction.run();
+        addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
+        {
+            public void postCommit()
+            {
+                immediatePostTransactionAction.run();
+            }
+
+            public void onRollback()
+            {
+            }
+        });
     }    
     
     public void commit()
@@ -242,6 +286,11 @@ public class AutoCommitTransaction imple
     {
     }
 
+    public boolean isTransactional()
+    {
+        return false;
+    }
+
     private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction
txn)
     {
         if (txn != null)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
Fri Jan 20 20:27:59 2012
@@ -242,6 +242,11 @@ public class AutoCommitTransaction imple
     {
     }
 
+    public boolean isTransactional()
+    {
+        return false;
+    }
+
     private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction
txn)
     {
         if (txn != null)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
Fri Jan 20 20:27:59 2012
@@ -309,7 +309,12 @@ public class LocalTransaction implements
     private void resetDetails()
     {
         _transaction = null;
-	_postTransactionActions.clear();
+	    _postTransactionActions.clear();
         _txnStartTime = 0L;
     }
+
+    public boolean isTransactional()
+    {
+        return true;
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
Fri Jan 20 20:27:59 2012
@@ -43,6 +43,8 @@ import java.util.List;
  */
 public interface ServerTransaction
 {
+
+
     /**
      * Represents an action to be performed on transaction commit or rollback
      */
@@ -110,4 +112,6 @@ public interface ServerTransaction
      * be executed immediately after the underlying transaction has rolled-back. 
      */
     void rollback();
+
+    boolean isTransactional();
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
Fri Jan 20 20:27:59 2012
@@ -45,10 +45,15 @@ public class SessionDelegate
         method.dispatch(ssn, this);
     }
 
-    public void command(Session ssn, Method method) {
+    public void command(Session ssn, Method method)
+    {
+        command(ssn, method, !method.hasPayload());
+    }
+    public void command(Session ssn, Method method, boolean processed) 
+    {
         ssn.identify(method);
         method.dispatch(ssn, this);
-        if (!method.hasPayload())
+        if (processed)
         {
             ssn.processed(method);
         }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
Fri Jan 20 20:27:59 2012
@@ -70,6 +70,11 @@ public class AcknowledgeTest extends Qpi
         // These should all end up being prefetched by session
         sendMessage(_consumerSession, _queue, 1);
 
+        if(!transacted)
+        {
+            ((AMQSession)_consumerSession).sync();
+        }
+
         assertEquals("Wrong number of messages on queue", 1,
                      ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
     }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
Fri Jan 20 20:27:59 2012
@@ -721,7 +721,7 @@ public class DurableSubscriptionTest ext
         msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
         msg.setBooleanProperty("Match", false);
         producer.send(msg);
-
+        ((AMQSession)session).sync();
         // should be 1 or 2 messages on queue now
         // (1 for the java broker due to use of server side selectors, and 2 for the cpp
broker due to client side selectors only)
         AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose");



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message