qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1709878 - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ broker-core/src/main/java/org/apache/qpid/server/store/ broker-core/...
Date Wed, 21 Oct 2015 16:29:37 GMT
Author: rgodfrey
Date: Wed Oct 21 16:29:37 2015
New Revision: 1709878

URL: http://svn.apache.org/viewvc?rev=1709878&view=rev
Log:
QPID-6750 : Simplify the implementation of the futures, avoid using futures when they are
not necessary

Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
    qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
(original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
Wed Oct 21 16:29:37 2015
@@ -760,21 +760,36 @@ public abstract class AbstractBDBMessage
      *
      * @throws org.apache.qpid.server.store.StoreException If the operation fails for any
reason.
      */
-    private ListenableFuture<Void> commitTranImpl(final Transaction tx, boolean syncCommit)
throws StoreException
+    private void commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
     {
         if (tx == null)
         {
             throw new StoreException("Fatal internal error: transactional is null at commitTran");
         }
 
-        ListenableFuture<Void> result = getEnvironmentFacade().commit(tx, syncCommit);
+        getEnvironmentFacade().commit(tx, syncCommit);
 
         getLogger().debug("commitTranImpl completed {} transaction {}",
                           syncCommit ? "synchronous" : "asynchronous", tx);
 
+
+    }
+
+    private <X> ListenableFuture<X> commitTranAsyncImpl(final Transaction tx,
X val) throws StoreException
+    {
+        if (tx == null)
+        {
+            throw new StoreException("Fatal internal error: transactional is null at commitTran");
+        }
+
+        ListenableFuture<X> result = getEnvironmentFacade().commitAsync(tx, val);
+
+        getLogger().debug("commitTranAsynImpl completed transaction {}", tx);
+
         return result;
     }
 
+
     /**
      * Abandons all operations performed within a given transaction.
      *
@@ -1148,7 +1163,7 @@ public abstract class AbstractBDBMessage
             }
         }
 
-        synchronized ListenableFuture<Void> flushToStore()
+        synchronized void flushToStore()
         {
             if (_messageDataRef != null)
             {
@@ -1166,11 +1181,10 @@ public abstract class AbstractBDBMessage
                         throw getEnvironmentFacade().handleDatabaseException("failed to begin
transaction", e);
                     }
                     store(txn);
-                    getEnvironmentFacade().commit(txn, true);
+                    getEnvironmentFacade().commit(txn, false);
 
                 }
             }
-            return Futures.immediateFuture(null);
         }
 
         @Override
@@ -1312,12 +1326,12 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public ListenableFuture<Void> commitTranAsync() throws StoreException
+        public <X> ListenableFuture<X> commitTranAsync(final X val) throws StoreException
         {
             checkMessageStoreOpen();
             doPreCommitActions();
             AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
-            ListenableFuture<Void> futureResult = AbstractBDBMessageStore.this.commitTranImpl(_txn,
false);
+            ListenableFuture<X> futureResult = AbstractBDBMessageStore.this.commitTranAsyncImpl(_txn,
val);
             doPostCommitActions();
             return futureResult;
         }

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
(original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
Wed Oct 21 16:29:37 2015
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
@@ -30,15 +32,10 @@ import java.util.concurrent.atomic.Atomi
 
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
 import com.sleepycat.je.Transaction;
-import org.apache.qpid.server.store.StoreException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.store.StoreException;
-
 public class CoalescingCommiter implements Committer
 {
     private final CommitThread _commitThread;
@@ -73,98 +70,55 @@ public class CoalescingCommiter implemen
     }
 
     @Override
-    public ListenableFuture<Void> commit(Transaction tx, boolean syncCommit)
+    public void commit(Transaction tx, boolean syncCommit)
     {
-        ThreadNotifyingSettableFuture future = new ThreadNotifyingSettableFuture();
-        BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx,
syncCommit, future);
-        commitFuture.commit();
+        if(syncCommit)
+        {
+            SynchronousCommitThreadJob job = new SynchronousCommitThreadJob();
+            _commitThread.addJob(job, true);
+            job.awaitCompletion();
+        }
+
+    }
+
+    @Override
+    public <X> ListenableFuture<X> commitAsync(Transaction tx, X val)
+    {
+        ThreadNotifyingSettableFuture<X> future = new ThreadNotifyingSettableFuture<X>();
+        BDBCommitFutureResult<X> commitFuture = new BDBCommitFutureResult<X>(val,
future);
+        _commitThread.addJob(commitFuture, false);
         return future;
     }
 
-    private static final class BDBCommitFutureResult
+
+    private static final class BDBCommitFutureResult<X> implements CommitThreadJob
     {
-        private static final Logger LOGGER = LoggerFactory.getLogger(BDBCommitFutureResult.class);
+        private final X _value;
+        private final ThreadNotifyingSettableFuture<X> _future;
 
-        private final CommitThread _commitThread;
-        private final Transaction _tx;
-        private final boolean _syncCommit;
-        private final ThreadNotifyingSettableFuture _future;
-
-        public BDBCommitFutureResult(CommitThread commitThread,
-                                     Transaction tx,
-                                     boolean syncCommit,
-                                     final ThreadNotifyingSettableFuture future)
-        {
-            _commitThread = commitThread;
-            _tx = tx;
-            _syncCommit = syncCommit;
+        public BDBCommitFutureResult(X value,
+                                     final ThreadNotifyingSettableFuture<X> future)
+        {
+            _value = value;
             _future = future;
         }
 
         public void complete()
         {
-            if (LOGGER.isDebugEnabled())
-            {
-                LOGGER.debug("complete() called for transaction " + _tx);
-            }
-            _future.set(null);
+            _future.set(_value);
         }
 
         public void abort(RuntimeException databaseException)
         {
             _future.setException(databaseException);
         }
+    }
 
-        public void commit() throws DatabaseException
-        {
-            _commitThread.addJob(this, _syncCommit);
-
-            if(!_syncCommit)
-            {
-                if(LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("CommitAsync was requested, returning immediately.");
-                }
-                return;
-            }
-
-            boolean interrupted = false;
-            try
-            {
-                while (true)
-                {
-                    try
-                    {
-                        _future.get();
-                        break;
-                    }
-                    catch (InterruptedException e)
-                    {
-                        interrupted = true;
-                    }
-                }
-                if (interrupted)
-                {
-                    Thread.currentThread().interrupt();
-                }
+    private interface CommitThreadJob
+    {
+        void complete();
 
-            }
-            catch (ExecutionException e)
-            {
-                if(e.getCause() instanceof RuntimeException)
-                {
-                    throw (RuntimeException)e.getCause();
-                }
-                else if(e.getCause() instanceof Error)
-                {
-                    throw (Error)e.getCause();
-                }
-                else
-                {
-                    throw new StoreException(e.getCause());
-                }
-            }
-        }
+        void abort(RuntimeException e);
     }
 
     /**
@@ -178,12 +132,15 @@ public class CoalescingCommiter implemen
     private static class CommitThread extends Thread
     {
         private static final Logger LOGGER = LoggerFactory.getLogger(CommitThread.class);
+        private static final int JOB_QUEUE_NOTIFY_THRESHOLD = 8;
 
         private final AtomicBoolean _stopped = new AtomicBoolean(false);
-        private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>();
+        private final Queue<CommitThreadJob> _jobQueue = new ConcurrentLinkedQueue<>();
         private final Object _lock = new Object();
         private final EnvironmentFacade _environmentFacade;
 
+        private final List<CommitThreadJob> _inProcessJobs = new ArrayList<>(256);
+
         public CommitThread(String name, EnvironmentFacade environmentFacade)
         {
             super(name);
@@ -210,7 +167,7 @@ public class CoalescingCommiter implemen
                         {
                             // Periodically wake up and check, just in case we
                             // missed a notification. Don't want to lock the broker hard.
-                            _lock.wait(1000);
+                            _lock.wait(500);
                         }
                         catch (InterruptedException e)
                         {
@@ -223,8 +180,13 @@ public class CoalescingCommiter implemen
 
         private void processJobs()
         {
-            int size = _jobQueue.size();
+            CommitThreadJob job;
+            while((job = _jobQueue.poll()) != null)
+            {
+                _inProcessJobs.add(job);
+            }
 
+            int completedJobsIndex = 0;
             try
             {
                 long startTime = 0;
@@ -241,14 +203,10 @@ public class CoalescingCommiter implemen
                     LOGGER.debug("flushLog completed in " + duration  + " ms");
                 }
 
-                for(int i = 0; i < size; i++)
+                while(completedJobsIndex < _inProcessJobs.size())
                 {
-                    BDBCommitFutureResult commit = _jobQueue.poll();
-                    if (commit == null)
-                    {
-                        break;
-                    }
-                    commit.complete();
+                    _inProcessJobs.get(completedJobsIndex).complete();
+                    completedJobsIndex++;
                 }
 
             }
@@ -258,13 +216,9 @@ public class CoalescingCommiter implemen
                 {
                     LOGGER.error("Exception during environment log flush", e);
 
-                    for(int i = 0; i < size; i++)
+                    for(; completedJobsIndex < _inProcessJobs.size(); completedJobsIndex++)
                     {
-                        BDBCommitFutureResult commit = _jobQueue.poll();
-                        if (commit == null)
-                        {
-                            break;
-                        }
+                        CommitThreadJob commit = _inProcessJobs.get(completedJobsIndex);
                         commit.abort(e);
                     }
                 }
@@ -282,6 +236,10 @@ public class CoalescingCommiter implemen
                     }
                 }
             }
+            finally
+            {
+                _inProcessJobs.clear();
+            }
         }
 
         private boolean hasJobs()
@@ -289,14 +247,14 @@ public class CoalescingCommiter implemen
             return !_jobQueue.isEmpty();
         }
 
-        public void addJob(BDBCommitFutureResult commit, final boolean sync)
+        public void addJob(CommitThreadJob commit, final boolean sync)
         {
             if (_stopped.get())
             {
                 throw new IllegalStateException("Commit thread is stopped");
             }
             _jobQueue.add(commit);
-            if(sync)
+            if(sync || _jobQueue.size() >= JOB_QUEUE_NOTIFY_THRESHOLD)
             {
                 synchronized (_lock)
                 {
@@ -310,7 +268,7 @@ public class CoalescingCommiter implemen
             synchronized (_lock)
             {
                 _stopped.set(true);
-                BDBCommitFutureResult commit;
+                CommitThreadJob commit;
 
                 try
                 {
@@ -340,25 +298,31 @@ public class CoalescingCommiter implemen
         }
     }
 
-    private final class ThreadNotifyingSettableFuture extends AbstractFuture<Void>
+    private final class ThreadNotifyingSettableFuture<X> extends AbstractFuture<X>
     {
         @Override
-        public Void get(final long timeout, final TimeUnit unit)
+        public X get(final long timeout, final TimeUnit unit)
                 throws InterruptedException, TimeoutException, ExecutionException
         {
-            _commitThread.explicitNotify();
+            if(!isDone())
+            {
+                _commitThread.explicitNotify();
+            }
             return super.get(timeout, unit);
         }
 
         @Override
-        public Void get() throws InterruptedException, ExecutionException
+        public X get() throws InterruptedException, ExecutionException
         {
-            _commitThread.explicitNotify();
+            if(!isDone())
+            {
+                _commitThread.explicitNotify();
+            }
             return super.get();
         }
 
         @Override
-        protected boolean set(final Void value)
+        protected boolean set(final X value)
         {
             return super.set(value);
         }
@@ -376,4 +340,51 @@ public class CoalescingCommiter implemen
             _commitThread.explicitNotify();
         }
     }
+
+    private class SynchronousCommitThreadJob implements CommitThreadJob
+    {
+        private boolean _done;
+        private RuntimeException _exception;
+
+        @Override
+        public synchronized void complete()
+        {
+            _done = true;
+            notifyAll();
+        }
+
+        @Override
+        public synchronized void abort(final RuntimeException e)
+        {
+            _done = true;
+            _exception = e;
+            notifyAll();
+        }
+
+
+        public synchronized void awaitCompletion()
+        {
+            boolean interrupted = false;
+            while(!_done)
+            {
+                try
+                {
+                    wait();
+                }
+                catch (InterruptedException e)
+                {
+                    interrupted = true;
+                }
+            }
+            if(interrupted)
+            {
+                Thread.currentThread().interrupt();
+            }
+            if(_exception != null)
+            {
+                throw _exception;
+            }
+        }
+
+    }
 }

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
(original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
Wed Oct 21 16:29:37 2015
@@ -27,7 +27,8 @@ public interface Committer
 {
     void start();
 
-    ListenableFuture<Void> commit(Transaction tx, boolean syncCommit);
+    void commit(Transaction tx, boolean syncCommit);
+    <X> ListenableFuture<X> commitAsync(Transaction tx, X val);
 
     void stop();
 }

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
(original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
Wed Oct 21 16:29:37 2015
@@ -55,7 +55,8 @@ public interface EnvironmentFacade
 
     Transaction beginTransaction(TransactionConfig transactionConfig);
 
-    ListenableFuture<Void> commit(com.sleepycat.je.Transaction tx, boolean sync);
+    void commit(Transaction tx, boolean sync);
+    <X> ListenableFuture<X> commitAsync(Transaction tx, X val);
 
     RuntimeException handleDatabaseException(String contextMessage, RuntimeException e);
 
@@ -69,4 +70,5 @@ public interface EnvironmentFacade
     void flushLog();
 
     void setCacheSize(long cacheSize);
+
 }

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
(original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
Wed Oct 21 16:29:37 2015
@@ -134,7 +134,7 @@ public class StandardEnvironmentFacade i
     }
 
     @Override
-    public ListenableFuture<Void> commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
+    public void commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
     {
         try
         {
@@ -148,7 +148,25 @@ public class StandardEnvironmentFacade i
 
             throw handleDatabaseException("Got DatabaseException on commit", de);
         }
-        return _committer.commit(tx, syncCommit);
+        _committer.commit(tx, syncCommit);
+    }
+
+    @Override
+    public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X
val)
+    {
+        try
+        {
+            tx.commitNoSync();
+        }
+        catch (DatabaseException de)
+        {
+            LOGGER.error("Got DatabaseException on commit, closing environment", de);
+
+            closeEnvironmentSafely();
+
+            throw handleDatabaseException("Got DatabaseException on commit", de);
+        }
+        return _committer.commitAsync(tx, val);
     }
 
     @Override

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Wed Oct 21 16:29:37 2015
@@ -271,7 +271,7 @@ public class ReplicatedEnvironmentFacade
     }
 
     @Override
-    public ListenableFuture<Void> commit(final Transaction tx, boolean syncCommit)
+    public void commit(final Transaction tx, boolean syncCommit)
     {
         try
         {
@@ -287,9 +287,31 @@ public class ReplicatedEnvironmentFacade
         if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync()
== SyncPolicy.NO_SYNC
                 && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
         {
-            return _coalescingCommiter.commit(tx, syncCommit);
+            _coalescingCommiter.commit(tx, syncCommit);
         }
-        return Futures.immediateFuture(null);
+
+    }
+
+    @Override
+    public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X
val)
+    {
+        try
+        {
+            // Using commit() instead of commitNoSync() for the HA store to allow
+            // the HA durability configuration to influence resulting behaviour.
+            tx.commit(_realMessageStoreDurability);
+        }
+        catch (DatabaseException de)
+        {
+            throw handleDatabaseException("Got DatabaseException on commit, closing environment",
de);
+        }
+
+        if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync()
== SyncPolicy.NO_SYNC
+            && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
+        {
+            return _coalescingCommiter.commitAsync(tx, val);
+        }
+        return Futures.immediateFuture(val);
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
Wed Oct 21 16:29:37 2015
@@ -797,9 +797,9 @@ public abstract class AbstractJDBCMessag
         }
     }
 
-    private ListenableFuture<Void> commitTranAsync(final ConnectionWrapper connWrapper)
throws StoreException
+    private <X> ListenableFuture<X> commitTranAsync(final ConnectionWrapper connWrapper,
final X val) throws StoreException
     {
-        final SettableFuture<Void> future = SettableFuture.create();
+        final SettableFuture<X> future = SettableFuture.create();
         _executor.submit(new Runnable()
                         {
                             @Override
@@ -808,7 +808,7 @@ public abstract class AbstractJDBCMessag
                                 try
                                 {
                                     commitTran(connWrapper);
-                                    future.set(null);
+                                    future.set(val);
                                 }
                                 catch (RuntimeException e)
                                 {
@@ -1172,11 +1172,11 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public ListenableFuture<Void> commitTranAsync()
+        public <X> ListenableFuture<X> commitTranAsync(final X val)
         {
             checkMessageStoreOpen();
             doPreCommitActions();
-            ListenableFuture<Void> futureResult = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
+            ListenableFuture<X> futureResult = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper,
val);
             storedSizeChange(_storeSizeIncrease);
             doPostCommitActions();
             return futureResult;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
Wed Oct 21 16:29:37 2015
@@ -62,9 +62,9 @@ public class MemoryMessageStore implemen
         private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
 
         @Override
-        public ListenableFuture<Void> commitTranAsync()
+        public <X> ListenableFuture<X> commitTranAsync(final X val)
         {
-            return Futures.immediateFuture(null);
+            return Futures.immediateFuture(val);
         }
 
         @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
Wed Oct 21 16:29:37 2015
@@ -47,8 +47,9 @@ public interface Transaction
     /**
      * Commits all operations performed within a given transactional context.
      *
+     * @param val
      */
-    ListenableFuture<Void> commitTranAsync();
+    <X> ListenableFuture<X> commitTranAsync(final X val);
 
     /**
      * Abandons all operations performed within a given transactional context.

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
Wed Oct 21 16:29:37 2015
@@ -101,7 +101,7 @@ public class AsyncAutoCommitTransaction
 
                 txn = _messageStore.newTransaction();
                 txn.dequeueMessage(record);
-                future = txn.commitTranAsync();
+                future = txn.commitTranAsync((Void) null);
 
                 txn = null;
             }
@@ -177,7 +177,7 @@ public class AsyncAutoCommitTransaction
             ListenableFuture<Void> future;
             if(txn != null)
             {
-                future = txn.commitTranAsync();
+                future = txn.commitTranAsync((Void) null);
                 txn = null;
             }
             else
@@ -208,7 +208,7 @@ public class AsyncAutoCommitTransaction
 
                 txn = _messageStore.newTransaction();
                 enqueueRecord = txn.enqueueMessage(queue, message);
-                future = txn.commitTranAsync();
+                future = txn.commitTranAsync((Void) null);
                 txn = null;
             }
             else
@@ -286,7 +286,7 @@ public class AsyncAutoCommitTransaction
             ListenableFuture<Void> future;
             if (txn != null)
             {
-                future = txn.commitTranAsync();
+                future = txn.commitTranAsync((Void) null);
                 txn = null;
             }
             else

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
Wed Oct 21 16:29:37 2015
@@ -26,15 +26,9 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.ForwardingListenableFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,7 +58,7 @@ public class LocalTransaction implements
     private final MessageStore _transactionLog;
     private volatile long _txnStartTime = 0L;
     private volatile long _txnUpdateTime = 0l;
-    private ListenableFuture<Void> _asyncTran;
+    private ListenableFuture<Runnable> _asyncTran;
 
     public LocalTransaction(MessageStore transactionLog)
     {
@@ -384,50 +378,25 @@ public class LocalTransaction implements
         sync();
         if(_transaction != null)
         {
-            final ListenableFuture<Void> underlying = _transaction.commitTranAsync();
 
-            /*
-              Note that this future is not a general purpose future and makes assumptions
about the fact that get() is
-              only called once (which is enforced by how sync() works.  The post transaction
actions must be performed
-              in the connection thread (i.e. the thread that the sync() is called from -
not the commit thread which is
-              where the actions would occur if we added a listener to the underlying future
-             */
-            _asyncTran = new ForwardingListenableFuture<Void>()
-            {
-
-                @Override
-                protected ListenableFuture<Void> delegate()
-                {
-                    return underlying;
-                }
-
-                @Override
-                public Void get(final long timeout, final TimeUnit unit)
-                        throws InterruptedException, TimeoutException, ExecutionException
-                {
-                    throw new UnsupportedOperationException();
-                }
-
-                @Override
-                public Void get() throws InterruptedException, ExecutionException
-                {
-                    final Void rval;
-                    try
-                    {
-                        rval = super.get();
-                        doPostTransactionActions();
-                        deferred.run();
-                    }
-                    finally
-                    {
-                        resetDetails();
-                    }
-                    return rval;
-                }
-
-
-
-            };
+            Runnable action = new Runnable()
+                                {
+                                    @Override
+                                    public void run()
+                                    {
+                                        try
+                                        {
+                                            doPostTransactionActions();
+                                            deferred.run();
+                                        }
+                                        finally
+                                        {
+                                            resetDetails();
+                                        }
+
+                                    }
+                                };
+            _asyncTran = _transaction.commitTranAsync(action);
 
         }
         else
@@ -491,7 +460,7 @@ public class LocalTransaction implements
                 {
                     try
                     {
-                        _asyncTran.get();
+                        _asyncTran.get().run();
                         break;
                     }
                     catch (InterruptedException e)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
Wed Oct 21 16:29:37 2015
@@ -466,7 +466,7 @@ public class AsynchronousMessageStoreRec
                                      + " is unknown, entry will be discarded");
                         Transaction txn = _store.newTransaction();
                         txn.dequeueMessage(record);
-                        txn.commitTranAsync();
+                        txn.commitTranAsync((Void) null);
                     }
                     return _continueRecovery.get();
                 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
Wed Oct 21 16:29:37 2015
@@ -207,7 +207,7 @@ public class SynchronousMessageStoreReco
                     _logger.warn("Message id " + messageId + " referenced in log as enqueued
in queue " + queueName + " is unknown, entry will be discarded");
                     Transaction txn = _store.newTransaction();
                     txn.dequeueMessage(record);
-                    txn.commitTranAsync();
+                    txn.commitTranAsync((Void) null);
                 }
             }
             else
@@ -215,7 +215,7 @@ public class SynchronousMessageStoreReco
                 _logger.warn("Message id " + messageId + " in log references queue with id
" + queueId + " which is not in the configuration, entry will be discarded");
                 Transaction txn = _store.newTransaction();
                 txn.dequeueMessage(record);
-                txn.commitTranAsync();
+                txn.commitTranAsync((Void) null);
             }
             return true;
         }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
(original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
Wed Oct 21 16:29:37 2015
@@ -22,7 +22,6 @@ import static org.mockito.Mockito.*;
 
 import java.util.Collections;
 
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
@@ -54,7 +53,7 @@ public class AsyncAutoCommitTransactionT
         super.setUp();
 
         when(_messageStore.newTransaction()).thenReturn(_storeTransaction);
-        when(_storeTransaction.commitTranAsync()).thenReturn(_future);
+        when(_storeTransaction.commitTranAsync((Void) null)).thenReturn(_future);
         when(_queue.isDurable()).thenReturn(true);
         when(_queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
     }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
(original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
Wed Oct 21 16:29:37 2015
@@ -103,9 +103,9 @@ class MockStoreTransaction implements Tr
         _state = TransactionState.COMMITTED;
     }
 
-    public ListenableFuture<Void> commitTranAsync()
+    public <X> ListenableFuture<X> commitTranAsync(final X val)
     {
-        return Futures.immediateFuture(null);
+        return Futures.immediateFuture(val);
     }
 
     public void abortTran()

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
(original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
Wed Oct 21 16:29:37 2015
@@ -144,7 +144,7 @@ public class SynchronousMessageStoreReco
 
         verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class), any(MessageEnqueueRecord.class));
         verify(transaction).dequeueMessage(argThat(new MessageEnqueueRecordMatcher(queue.getId(),
messageId)));
-        verify(transaction, times(1)).commitTranAsync();
+        verify(transaction, times(1)).commitTranAsync((Void) null);
     }
 
     public void testRecoveryOfMessageInstanceForNonExistingQueue()
@@ -182,7 +182,7 @@ public class SynchronousMessageStoreReco
         recoverer.recover(_virtualHost);
 
         verify(transaction).dequeueMessage(argThat(new MessageEnqueueRecordMatcher(queueId,messageId)));
-        verify(transaction, times(1)).commitTranAsync();
+        verify(transaction, times(1)).commitTranAsync((Void) null);
     }
 
     public void testRecoveryDeletesOrphanMessages()

Modified: qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
(original)
+++ qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
Wed Oct 21 16:29:37 2015
@@ -133,11 +133,11 @@ public abstract class GenericAbstractJDB
         }
 
         @Override
-        public ListenableFuture<Void> commitTranAsync()
+        public <X> ListenableFuture<X> commitTranAsync(final X val)
         {
             try
             {
-                return super.commitTranAsync();
+                return super.commitTranAsync(val);
             }
             finally
             {




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message