qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject svn commit: r1604946 - in /qpid/trunk/qpid/java/bdbstore: jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/ src/main/java/org/apache/qpid/server/store/berkeleydb/ src/main/java...
Date Mon, 23 Jun 2014 21:54:05 GMT
Author: orudyy
Date: Mon Jun 23 21:54:04 2014
New Revision: 1604946

URL: http://svn.apache.org/r1604946
Log:
QPID-5715: Set BDB message store durability only once on opening of virtual host. Restore
original code of coalescing committer.

Modified:
    qpid/trunk/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
    qpid/trunk/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java

Modified: qpid/trunk/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java?rev=1604946&r1=1604945&r2=1604946&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
Mon Jun 23 21:54:04 2014
@@ -43,6 +43,7 @@ import org.apache.qpid.server.jmx.Manage
 import org.apache.qpid.server.jmx.ManagedObjectRegistry;
 import org.apache.qpid.server.model.IllegalStateTransitionException;
 import org.apache.qpid.server.model.RemoteReplicationNode;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
 
@@ -123,14 +124,24 @@ public class BDBHAMessageStoreManagerMBe
     @Override
     public String getDurability() throws IOException, JMException
     {
-        return _virtualHostNode.getDurability();
+        BDBHAVirtualHost<?> host = (BDBHAVirtualHost<?>)_virtualHostNode.getVirtualHost();
+        if (host != null)
+        {
+            return host.getDurability();
+        }
+        return null;
     }
 
 
     @Override
     public boolean getCoalescingSync() throws IOException, JMException
     {
-        return true;
+        BDBHAVirtualHost<?> host = (BDBHAVirtualHost<?>)_virtualHostNode.getVirtualHost();
+        if (host != null)
+        {
+            return host.isCoalescingSync();
+        }
+        return false;
     }
 
     @Override

Modified: qpid/trunk/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java?rev=1604946&r1=1604945&r2=1604946&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
Mon Jun 23 21:54:04 2014
@@ -41,6 +41,7 @@ import junit.framework.TestCase;
 import org.apache.qpid.server.jmx.ManagedObjectRegistry;
 import org.apache.qpid.server.model.IllegalStateTransitionException;
 import org.apache.qpid.server.model.RemoteReplicationNode;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
 
@@ -114,11 +115,22 @@ public class BDBHAMessageStoreManagerMBe
 
     public void testDurability() throws Exception
     {
-        when(_virtualHostNode.getDurability()).thenReturn(TEST_DURABILITY);
+        BDBHAVirtualHost virtualHost = mock(BDBHAVirtualHost.class);
+        when(_virtualHostNode.getVirtualHost()).thenReturn(virtualHost);
+        when(virtualHost.getDurability()).thenReturn(TEST_DURABILITY);
 
         assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY));
     }
 
+    public void testIsCoalescingSync() throws Exception
+    {
+        BDBHAVirtualHost virtualHost = mock(BDBHAVirtualHost.class);
+        when(_virtualHostNode.getVirtualHost()).thenReturn(virtualHost);
+        when(virtualHost.isCoalescingSync()).thenReturn(true);
+
+        assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC));
+    }
+
     public void testNodeState() throws Exception
     {
         when(_virtualHostNode.getRole()).thenReturn(TEST_NODE_STATE);

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java?rev=1604946&r1=1604945&r2=1604946&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
Mon Jun 23 21:54:04 2014
@@ -22,9 +22,6 @@ package org.apache.qpid.server.store.ber
 
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.log4j.Logger;
@@ -36,90 +33,55 @@ import com.sleepycat.je.Transaction;
 
 public class CoalescingCommiter implements Committer
 {
-    private final CommitTask _commitTask;
-    private final ExecutorService _taskExecutor;
+    private final CommitThread _commitThread;
 
-    public CoalescingCommiter(final String name, EnvironmentFacade environmentFacade)
+    public CoalescingCommiter(String name, EnvironmentFacade environmentFacade)
     {
-        _commitTask = new CommitTask(environmentFacade);
-        _taskExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
-        {
-            @Override
-            public Thread newThread(Runnable r)
-            {
-                Thread t = new Thread(r, "Commit-Thread-" + name);
-                t.setDaemon(true);
-                return t;
-            }
-        });
+        _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade);
     }
 
     @Override
     public void start()
     {
-        if (_commitTask.start())
-        {
-            _taskExecutor.submit(_commitTask);
-        }
+        _commitThread.start();
     }
 
     @Override
     public void stop()
     {
-        _commitTask.stop();
-    }
-
-    @Override
-    public void close()
-    {
+        _commitThread.close();
         try
         {
-            _commitTask.close();
+            _commitThread.join();
         }
-        finally
+        catch (InterruptedException ie)
         {
-            _taskExecutor.shutdown();
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Commit thread has not shutdown", ie);
         }
     }
 
     @Override
     public StoreFuture commit(Transaction tx, boolean syncCommit)
     {
-        if (isStarted())
-        {
-            BDBCommitFuture commitFuture = new BDBCommitFuture(_commitTask, tx, syncCommit);
-            try
-            {
-                commitFuture.commit();
-                return commitFuture;
-            }
-            catch(IllegalStateException e)
-            {
-                // IllegalStateException is thrown when commit thread is stopped whilst commit
is called
-            }
-        }
-
-        return StoreFuture.IMMEDIATE_FUTURE;
-    }
-
-    public boolean isStarted()
-    {
-        return !_commitTask.isStopped();
+        BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+        commitFuture.commit();
+        return commitFuture;
     }
 
     private static final class BDBCommitFuture implements StoreFuture
     {
         private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
 
-        private final CommitTask _commitTask;
+        private final CommitThread _commitThread;
         private final Transaction _tx;
         private final boolean _syncCommit;
         private RuntimeException _databaseException;
         private boolean _complete;
 
-        public BDBCommitFuture(CommitTask commitTask, Transaction tx, boolean syncCommit)
+        public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
         {
-            _commitTask = commitTask;
+            _commitThread = commitThread;
             _tx = tx;
             _syncCommit = syncCommit;
         }
@@ -145,7 +107,7 @@ public class CoalescingCommiter implemen
 
         public void commit() throws DatabaseException
         {
-            _commitTask.addJob(this, _syncCommit);
+            _commitThread.addJob(this, _syncCommit);
 
             if(!_syncCommit)
             {
@@ -180,7 +142,7 @@ public class CoalescingCommiter implemen
 
             while (!isComplete())
             {
-                _commitTask.explicitNotify();
+                _commitThread.explicitNotify();
                 try
                 {
                     wait(250);
@@ -189,24 +151,6 @@ public class CoalescingCommiter implemen
                 {
                     throw new RuntimeException(e);
                 }
-
-                if (!_commitTask.isClosed() && _commitTask.isStopped() &&
!isComplete())
-                {
-                    // coalesing sync is not required anymore
-                    // flush log and mark transaction as completed
-                    try
-                    {
-                        _commitTask.flushLog();
-                    }
-                    catch(DatabaseException e)
-                    {
-                        _databaseException = e;
-                    }
-                    finally
-                    {
-                        complete();
-                    }
-                }
             }
 
             if(LOGGER.isDebugEnabled())
@@ -218,38 +162,28 @@ public class CoalescingCommiter implemen
     }
 
     /**
-     * Implements a {@link Runnable} which batches and commits a queue of {@link BDBCommitFuture}
operations. The commit operations
+     * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations.
The commit operations
      * themselves are responsible for adding themselves to the queue and waiting for the
commit to happen before
      * continuing, but it is the responsibility of this thread to tell the commit operations
when they have been
      * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort}
methods.
      *
      * <p/><table id="crc"><caption>CRC Card</caption> <tr><th>
Responsibilities <th> Collaborations </table>
      */
-    private static class CommitTask implements Runnable
+    private static class CommitThread extends Thread
     {
-        private static final Logger LOGGER = Logger.getLogger(CommitTask.class);
+        private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
 
-        private final AtomicBoolean _stopped = new AtomicBoolean(true);
-        private final AtomicBoolean _closed = new AtomicBoolean(false);
+        private final AtomicBoolean _stopped = new AtomicBoolean(false);
         private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
         private final Object _lock = new Object();
         private final EnvironmentFacade _environmentFacade;
 
-        public CommitTask(EnvironmentFacade environmentFacade)
+        public CommitThread(String name, EnvironmentFacade environmentFacade)
         {
+            super(name);
             _environmentFacade = environmentFacade;
         }
 
-        public boolean isClosed()
-        {
-            return _closed.get();
-        }
-
-        public boolean isStopped()
-        {
-            return _stopped.get();
-        }
-
         public void explicitNotify()
         {
             synchronized (_lock)
@@ -258,7 +192,6 @@ public class CoalescingCommiter implemen
             }
         }
 
-        @Override
         public void run()
         {
             while (!_stopped.get())
@@ -280,12 +213,6 @@ public class CoalescingCommiter implemen
                 }
                 processJobs();
             }
-
-            // process remaining jobs if such were added whilst stopped
-            if (hasJobs())
-            {
-                processJobs();
-            }
         }
 
         private void processJobs()
@@ -300,7 +227,11 @@ public class CoalescingCommiter implemen
                     startTime = System.currentTimeMillis();
                 }
 
-                flushLog();
+                Environment environment = _environmentFacade.getEnvironment();
+                if (environment != null && environment.isValid())
+                {
+                    environment.flushLog(true);
+                }
 
                 if(LOGGER.isDebugEnabled())
                 {
@@ -351,15 +282,6 @@ public class CoalescingCommiter implemen
             }
         }
 
-        private void flushLog()
-        {
-            Environment environment = _environmentFacade.getEnvironment();
-            if (environment != null && environment.isValid())
-            {
-                environment.flushLog(true);
-            }
-        }
-
         private boolean hasJobs()
         {
             return !_jobQueue.isEmpty();
@@ -381,44 +303,24 @@ public class CoalescingCommiter implemen
             }
         }
 
-        public boolean start()
-        {
-            return _stopped.compareAndSet(true, false);
-        }
-
-        public void stop()
+        public void close()
         {
-            if (_stopped.compareAndSet(false, true))
+            RuntimeException e = new RuntimeException("Commit thread has been closed, transaction
aborted");
+            synchronized (_lock)
             {
-                synchronized (_lock)
+                _stopped.set(true);
+                BDBCommitFuture commit = null;
+                int abortedCommits = 0;
+                while ((commit = _jobQueue.poll()) != null)
                 {
-                    _lock.notifyAll();
+                    abortedCommits++;
+                    commit.abort(e);
                 }
-                _jobQueue.clear();
-            }
-        }
-
-        public void close()
-        {
-            if (_closed.compareAndSet(false, true))
-            {
-                RuntimeException e = new RuntimeException("Commit thread has been closed");
-                synchronized (_lock)
+                if (LOGGER.isDebugEnabled() && abortedCommits > 0)
                 {
-                    _stopped.set(true);
-                    BDBCommitFuture commit = null;
-                    int abortedCommits = 0;
-                    while ((commit = _jobQueue.poll()) != null)
-                    {
-                        abortedCommits++;
-                        commit.abort(e);
-                    }
-                    if (LOGGER.isDebugEnabled() && abortedCommits > 0)
-                    {
-                        LOGGER.debug(abortedCommits + " commit(s) were aborted during close.");
-                    }
-                    _lock.notifyAll();
+                    LOGGER.debug(abortedCommits + " commit(s) were aborted during close.");
                 }
+                _lock.notifyAll();
             }
         }
     }

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java?rev=1604946&r1=1604945&r2=1604946&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
Mon Jun 23 21:54:04 2014
@@ -31,8 +31,4 @@ public interface Committer
     StoreFuture commit(Transaction tx, boolean syncCommit);
 
     void stop();
-
-    void close();
-
-    boolean isStarted();
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1604946&r1=1604945&r2=1604946&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
Mon Jun 23 21:54:04 2014
@@ -30,7 +30,6 @@ import com.sleepycat.je.Sequence;
 import com.sleepycat.je.SequenceConfig;
 
 import org.apache.log4j.Logger;
-import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.store.StoreFuture;
 
 import com.sleepycat.je.Database;
@@ -129,10 +128,7 @@ public class StandardEnvironmentFacade i
     {
         try
         {
-            if (_committer != null)
-            {
-                _committer.close();
-            }
+            _committer.stop();
 
             closeSequences();
             closeDatabases();

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1604946&r1=1604945&r2=1604946&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Mon Jun 23 21:54:04 2014
@@ -104,7 +104,7 @@ public class ReplicatedEnvironmentFacade
 
     static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.SYNC;
     static final SyncPolicy REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC;
-    static final ReplicaAckPolicy REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY = ReplicaAckPolicy.SIMPLE_MAJORITY;
+    public static final ReplicaAckPolicy REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY = ReplicaAckPolicy.SIMPLE_MAJORITY;
 
     @SuppressWarnings("serial")
     private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new
HashMap<String, String>()
@@ -155,13 +155,13 @@ public class ReplicatedEnvironmentFacade
     private final AtomicReference<ReplicationGroupListener> _replicationGroupListener
= new AtomicReference<ReplicationGroupListener>();
     private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
     private final Durability _defaultDurability;
-    private final CoalescingCommiter _coalescingCommiter;
+    private final AtomicReference<Durability> _messageStoreDurability = new AtomicReference<Durability>();
 
+    private volatile Durability _realMessageStoreDurability = null;
+    private volatile CoalescingCommiter _coalescingCommiter = null;
     private volatile ReplicatedEnvironment _environment;
     private volatile long _joinTime;
     private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
-    private volatile SyncPolicy _messageStoreLocalTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
-    private volatile SyncPolicy _messageStoreRemoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
 
     private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new
ConcurrentHashMap<>();
@@ -190,7 +190,6 @@ public class ReplicatedEnvironmentFacade
         _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-"
+ _prettyGroupNodeName));
         _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()
+ 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
 
-        _coalescingCommiter  = new CoalescingCommiter(_configuration.getGroupName(), this);
 
         // create environment in a separate thread to avoid renaming of the current thread
by JE
         _environment = createEnvironment(true);
@@ -201,10 +200,15 @@ public class ReplicatedEnvironmentFacade
     @Override
     public Transaction beginTransaction()
     {
+        if (_messageStoreDurability.get() == null)
+        {
+            throw new IllegalStateException("Message store durability is not set");
+        }
+
         try
         {
             TransactionConfig transactionConfig = new TransactionConfig();
-            transactionConfig.setDurability(getMessageStoreTransactionDurability());
+            transactionConfig.setDurability(getRealMessageStoreDurability());
             return _environment.beginTransaction(null, transactionConfig);
         }
         catch(DatabaseException e)
@@ -220,13 +224,19 @@ public class ReplicatedEnvironmentFacade
         {
             // Using commit() instead of commitNoSync() for the HA store to allow
             // the HA durability configuration to influence resulting behaviour.
-            tx.commit();
+            tx.commit(_realMessageStoreDurability);
         }
         catch (DatabaseException de)
         {
             throw handleDatabaseException("Got DatabaseException on commit, closing environment",
de);
         }
-        return _coalescingCommiter.commit(tx, syncCommit);
+
+        if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync()
== SyncPolicy.NO_SYNC
+                && _messageStoreDurability.get().getLocalSync() == SyncPolicy.SYNC)
+        {
+            return _coalescingCommiter.commit(tx, syncCommit);
+        }
+        return StoreFuture.IMMEDIATE_FUTURE;
     }
 
     @Override
@@ -248,7 +258,10 @@ public class ReplicatedEnvironmentFacade
 
                 try
                 {
-                    _coalescingCommiter.close();
+                    if (_coalescingCommiter != null)
+                    {
+                        _coalescingCommiter.stop();
+                    }
                     closeSequences();
                     closeDatabases();
                 }
@@ -506,26 +519,19 @@ public class ReplicatedEnvironmentFacade
         return (String)_configuration.getHelperHostPort();
     }
 
-    Durability getMessageStoreTransactionDurability()
+    Durability getRealMessageStoreDurability()
     {
-        SyncPolicy localSync = getMessageStoreLocalTransactionSyncronizationPolicy();
-        if ( localSync == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY && _coalescingCommiter.isStarted())
-        {
-            localSync = SyncPolicy.NO_SYNC;
-        }
-        SyncPolicy replicaSync = getMessageStoreRemoteTransactionSyncronizationPolicy();
-        return new Durability(localSync, replicaSync, getReplicaAcknowledgmentPolicy());
+        return _realMessageStoreDurability;
     }
 
-    public Durability getDurability()
+    public Durability getMessageStoreDurability()
     {
-        return new Durability(getMessageStoreLocalTransactionSyncronizationPolicy(),
-                getMessageStoreRemoteTransactionSyncronizationPolicy(), getReplicaAcknowledgmentPolicy());
+        return _messageStoreDurability.get();
     }
 
     public boolean isCoalescingSync()
     {
-        return _coalescingCommiter.isStarted();
+        return _coalescingCommiter != null;
     }
 
     public String getNodeState()
@@ -1087,39 +1093,24 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    public SyncPolicy getMessageStoreLocalTransactionSyncronizationPolicy()
-    {
-        return _messageStoreLocalTransactionSyncronizationPolicy;
-    }
-
-    public SyncPolicy getMessageStoreRemoteTransactionSyncronizationPolicy()
-    {
-        return _messageStoreRemoteTransactionSyncronizationPolicy;
-    }
-
-    public ReplicaAckPolicy getReplicaAcknowledgmentPolicy()
-    {
-        return REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY;
-    }
-
-    public void setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy localTransactionSyncronizationPolicy)
+    public void setMessageStoreDurability(SyncPolicy localTransactionSynchronizationPolicy,
SyncPolicy remoteTransactionSynchronizationPolicy, ReplicaAckPolicy replicaAcknowledgmentPolicy)
     {
-        _messageStoreLocalTransactionSyncronizationPolicy = localTransactionSyncronizationPolicy;
-        if (localTransactionSyncronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
+        if (_messageStoreDurability.compareAndSet(null, new Durability(localTransactionSynchronizationPolicy,
remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy )))
         {
-            _coalescingCommiter.start();
+            if (localTransactionSynchronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
+            {
+                localTransactionSynchronizationPolicy = SyncPolicy.NO_SYNC;
+                _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(),
this);
+                _coalescingCommiter.start();
+            }
+            _realMessageStoreDurability = new Durability(localTransactionSynchronizationPolicy,
remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
         }
         else
         {
-            _coalescingCommiter.stop();
+            throw new IllegalStateException("Message store durability is already set to "
+ _messageStoreDurability.get());
         }
     }
 
-    public void setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy remoteTransactionSyncronizationPolicy)
-    {
-        _messageStoreRemoteTransactionSyncronizationPolicy = remoteTransactionSyncronizationPolicy;
-    }
-
     private void populateExistingRemoteReplicationNodes()
     {
         ReplicationGroup group = _environment.getGroup();

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java?rev=1604946&r1=1604945&r2=1604946&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
Mon Jun 23 21:54:04 2014
@@ -31,7 +31,7 @@ public interface BDBHAVirtualHost<X exte
     String REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = "remoteTransactionSynchronizationPolicy";
     String LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = "localTransactionSynchronizationPolicy";
     String COALESCING_SYNC = "coalescingSync";
-    String REPLICA_ACKNOWLEDGMENT_POLICY = "replicaAcknowledgmentPolicy";
+    String DURABILITY = "durability";
 
     @ManagedAttribute( defaultValue = "SYNC")
     String getLocalTransactionSynchronizationPolicy();
@@ -40,8 +40,8 @@ public interface BDBHAVirtualHost<X exte
     String getRemoteTransactionSynchronizationPolicy();
 
     @DerivedAttribute
-    String getReplicaAcknowledgmentPolicy();
+    boolean isCoalescingSync();
 
     @DerivedAttribute
-    boolean isCoalescingSync();
+    String getDurability();
 }

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java?rev=1604946&r1=1604945&r2=1604946&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
Mon Jun 23 21:54:04 2014
@@ -42,10 +42,10 @@ public class BDBHAVirtualHostImpl extend
 
     private final BDBConfigurationStore _configurationStore;
 
-    @ManagedAttributeField(afterSet="setLocalTransactionSynchronizationPolicyOnEnvironment")
+    @ManagedAttributeField
     private String _localTransactionSynchronizationPolicy;
 
-    @ManagedAttributeField(afterSet="setRemoteTransactionSynchronizationPolicyOnEnvironment")
+    @ManagedAttributeField
     private String _remoteTransactionSynchronizationPolicy;
 
     @ManagedObjectFactoryConstructor
@@ -74,14 +74,13 @@ public class BDBHAVirtualHostImpl extend
         return _remoteTransactionSynchronizationPolicy;
     }
 
-
     @Override
-    public String getReplicaAcknowledgmentPolicy()
+    public String getDurability()
     {
         ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
         if (facade != null)
         {
-            return facade.getReplicaAcknowledgmentPolicy().name();
+            return String.valueOf(facade.getMessageStoreDurability());
         }
         return null;
     }
@@ -89,20 +88,21 @@ public class BDBHAVirtualHostImpl extend
     @Override
     public boolean isCoalescingSync()
     {
-        ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
-        if (facade != null)
-        {
-            return facade.isCoalescingSync();
-        }
-        return false;
+        return _localTransactionSynchronizationPolicy.equals(SyncPolicy.SYNC.name());
     }
 
     @Override
     public void onOpen()
     {
+        ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
+        if (facade != null)
+        {
+            facade.setMessageStoreDurability(
+                    SyncPolicy.valueOf(getLocalTransactionSynchronizationPolicy()),
+                    SyncPolicy.valueOf(getRemoteTransactionSynchronizationPolicy()),
+                    ReplicatedEnvironmentFacade.REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY);
+        }
         super.onOpen();
-        setRemoteTransactionSynchronizationPolicyOnEnvironment();
-        setLocalTransactionSynchronizationPolicyOnEnvironment();
     }
 
     @Override
@@ -135,24 +135,6 @@ public class BDBHAVirtualHostImpl extend
         }
     }
 
-    protected void setLocalTransactionSynchronizationPolicyOnEnvironment()
-    {
-        ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
-        if (facade != null)
-        {
-            facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSynchronizationPolicy()));
-        }
-    }
-
-    protected void setRemoteTransactionSynchronizationPolicyOnEnvironment()
-    {
-        ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
-        if (facade != null)
-        {
-            facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSynchronizationPolicy()));
-        }
-    }
-
     private ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade()
     {
         return (ReplicatedEnvironmentFacade) _configurationStore.getEnvironmentFacade();

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java?rev=1604946&r1=1604945&r2=1604946&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
Mon Jun 23 21:54:04 2014
@@ -48,9 +48,6 @@ public interface BDBHAVirtualHostNode<X 
     @ManagedAttribute(mandatory=true)
     String getHelperAddress();
 
-    @DerivedAttribute
-    String getDurability();
-
     @ManagedAttribute(defaultValue = "false")
     boolean isDesignatedPrimary();
 

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1604946&r1=1604945&r2=1604946&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
Mon Jun 23 21:54:04 2014
@@ -161,17 +161,6 @@ public class BDBHAVirtualHostNodeImpl ex
     }
 
     @Override
-    public String getDurability()
-    {
-        ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
-        if (environmentFacade != null)
-        {
-            return environmentFacade.getDurability().toString();
-        }
-        return null;
-    }
-
-    @Override
     public boolean isDesignatedPrimary()
     {
         return _designatedPrimary;

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java?rev=1604946&r1=1604945&r2=1604946&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
Mon Jun 23 21:54:04 2014
@@ -477,7 +477,9 @@ public class BDBHAVirtualHostNodeTest ex
         virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY,
"SYNC");
         virtualHost.setAttributes(virtualHostAttributes);
 
-        awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, false);
+        virtualHost.stop();
+        virtualHost.start();
+
         assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC",
virtualHost.getLocalTransactionSynchronizationPolicy());
         assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy());
         assertFalse("CoalescingSync is not OFF", virtualHost.isCoalescingSync());

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1604946&r1=1604945&r2=1604946&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
Mon Jun 23 21:54:04 2014
@@ -40,7 +40,6 @@ import org.apache.qpid.util.FileUtils;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.Durability;
-import com.sleepycat.je.Durability.SyncPolicy;
 import com.sleepycat.je.Environment;
 import com.sleepycat.je.Transaction;
 import com.sleepycat.je.rep.NodeState;
@@ -163,18 +162,24 @@ public class ReplicatedEnvironmentFacade
         assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort());
     }
 
-    public void testGetDurability() throws Exception
+    public void testSetMessageStoreDurability() throws Exception
     {
         ReplicatedEnvironmentFacade master = createMaster();
-        assertEquals("Unexpected message store durability", TEST_DURABILITY, master.getMessageStoreTransactionDurability());
-        assertEquals("Unexpected durability", TEST_DURABILITY, master.getDurability());
-        assertFalse("Coalescing syn before policy set to SYNC", master.isCoalescingSync());
-        master.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.SYNC);
-        assertTrue("Coalescing syn after policy set to SYNC", master.isCoalescingSync());
-        assertEquals("Unexpected message store durability after committer start", "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY",
master.getMessageStoreTransactionDurability().toString());
-        master.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
-        assertEquals("Unexpected message store durability after committer stop", "WRITE_NO_SYNC,NO_SYNC,SIMPLE_MAJORITY",
master.getMessageStoreTransactionDurability().toString());
-        assertFalse("Coalescing syn after policy set to WRITE_NO_SYNC", master.isCoalescingSync());
+        assertEquals("Unexpected message store durability",
+                new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC,
Durability.ReplicaAckPolicy.SIMPLE_MAJORITY),
+                master.getRealMessageStoreDurability());
+        assertEquals("Unexpected durability", TEST_DURABILITY, master.getMessageStoreDurability());
+        assertTrue("Unexpected coalescing syn", master.isCoalescingSync());
+
+        try
+        {
+            master.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(),
TEST_DURABILITY.getReplicaAck());
+            fail("Cannot set message store durability twice");
+        }
+        catch(IllegalStateException e)
+        {
+            // pass
+        }
     }
 
     public void testGetNodeState() throws Exception
@@ -619,26 +624,6 @@ public class ReplicatedEnvironmentFacade
         assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
     }
 
-    public void testSetLocalTransactionSyncronizationPolicy() throws Exception
-    {
-        ReplicatedEnvironmentFacade facade = createMaster();
-        assertEquals("Unexpected local transaction synchronization policy before change",
-                ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreLocalTransactionSyncronizationPolicy());
-        facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
-        assertEquals("Unexpected local transaction synchronization policy after change",
-                SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreLocalTransactionSyncronizationPolicy());
-    }
-
-    public void testSetRemoteTransactionSyncronizationPolicy() throws Exception
-    {
-        ReplicatedEnvironmentFacade facade = createMaster();
-        assertEquals("Unexpected remote transaction synchronization policy before change",
-                ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreRemoteTransactionSyncronizationPolicy());
-        facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.WRITE_NO_SYNC);
-        assertEquals("Unexpected remote transaction synchronization policy after change",
-                SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreRemoteTransactionSyncronizationPolicy());
-    }
-
     public void testBeginTransaction() throws Exception
     {
         ReplicatedEnvironmentFacade facade = createMaster();
@@ -696,6 +681,7 @@ public class ReplicatedEnvironmentFacade
         ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
         ref.setStateChangeListener(stateChangeListener);
         ref.setReplicationGroupListener(replicationGroupListener);
+        ref.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(),
TEST_DURABILITY.getReplicaAck());
         _nodes.put(nodeName, ref);
         return ref;
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message