curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject git commit: InterProcessSemaphoreMutex changed to use InterProcessSemaphoreV2. In the process, a bug in InterProcessSemaphoreV2 was noticed that was been long fixed in the older InterProcessSemaphore. If the session drops in the process of creating the l
Date Fri, 06 Sep 2013 06:06:41 GMT
Updated Branches:
  refs/heads/CURATOR-48 [created] 86e442a1d


InterProcessSemaphoreMutex changed to use InterProcessSemaphoreV2. In the process, a bug in
InterProcessSemaphoreV2 was
noticed that was been long fixed in the older InterProcessSemaphore. If the session drops
in the process of creating the lock
node, the acquire fails. The old InterProcessSemaphore worked around this by using a retry
loop to try to recreate the lock
node. InterProcessSemaphoreV2 now does this as well.

Also, as part of this checkin, testWaitingProcessKilledServer has been tightened and made
to be more reliable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/86e442a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/86e442a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/86e442a1

Branch: refs/heads/CURATOR-48
Commit: 86e442a1d9a0907c12f21787ba26bc6be9b80e7b
Parents: aa17424
Author: randgalt <randgalt@apache.org>
Authored: Thu Sep 5 23:05:11 2013 -0700
Committer: randgalt <randgalt@apache.org>
Committed: Thu Sep 5 23:05:11 2013 -0700

----------------------------------------------------------------------
 .../locks/InterProcessSemaphoreMutex.java       |   4 +-
 .../recipes/locks/InterProcessSemaphoreV2.java  | 268 ++++++++++--------
 .../locks/TestInterProcessMutexBase.java        | 272 ++++++++++---------
 3 files changed, 296 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86e442a1/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
index 29416d5..88b5f5d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class InterProcessSemaphoreMutex implements InterProcessLock
 {
-    private final InterProcessSemaphore semaphore;
+    private final InterProcessSemaphoreV2 semaphore;
     private volatile Lease lease;
 
     /**
@@ -37,7 +37,7 @@ public class InterProcessSemaphoreMutex implements InterProcessLock
      */
     public InterProcessSemaphoreMutex(CuratorFramework client, String path)
     {
-        this.semaphore = new InterProcessSemaphore(client, path, 1);
+        this.semaphore = new InterProcessSemaphoreV2(client, path, 1);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86e442a1/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 5e5acc4..e24b019 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -16,13 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.Closeables;
+import org.apache.curator.RetryLoop;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.PathAndBytesable;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.shared.SharedCountListener;
 import org.apache.curator.framework.recipes.shared.SharedCountReader;
 import org.apache.curator.framework.state.ConnectionState;
@@ -41,43 +44,43 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * <p>
- *     A counting semaphore that works across JVMs. All processes
- *     in all JVMs that use the same lock path will achieve an inter-process limited set
of leases.
- *     Further, this semaphore is mostly "fair" - each user will get a lease in the order
requested
- *     (from ZK's point of view).
+ * A counting semaphore that works across JVMs. All processes
+ * in all JVMs that use the same lock path will achieve an inter-process limited set of leases.
+ * Further, this semaphore is mostly "fair" - each user will get a lease in the order requested
+ * (from ZK's point of view).
  * </p>
- *
+ * <p/>
  * <p>
- *     There are two modes for determining the max leases for the semaphore. In the first
mode the
- *     max leases is a convention maintained by the users of a given path. In the second
mode a
- *     {@link SharedCountReader} is used as the method for semaphores of a given path to
determine
- *     the max leases.
+ * There are two modes for determining the max leases for the semaphore. In the first mode
the
+ * max leases is a convention maintained by the users of a given path. In the second mode
a
+ * {@link SharedCountReader} is used as the method for semaphores of a given path to determine
+ * the max leases.
  * </p>
- *
+ * <p/>
  * <p>
- *     If a {@link SharedCountReader} is <b>not</b> used, no internal checks
are done to prevent
- *     Process A acting as if there are 10 leases and Process B acting as if there are 20.
Therefore,
- *     make sure that all instances in all processes use the same numberOfLeases value.
+ * If a {@link SharedCountReader} is <b>not</b> used, no internal checks are
done to prevent
+ * Process A acting as if there are 10 leases and Process B acting as if there are 20. Therefore,
+ * make sure that all instances in all processes use the same numberOfLeases value.
  * </p>
- *
+ * <p/>
  * <p>
- *     The various acquire methods return {@link Lease} objects that represent acquired leases.
Clients
- *     must take care to close lease objects  (ideally in a <code>finally</code>
- *     block) else the lease will be lost. However, if the client session drops (crash, etc.),
- *     any leases held by the client are automatically closed and made available to other
clients.
+ * The various acquire methods return {@link Lease} objects that represent acquired leases.
Clients
+ * must take care to close lease objects  (ideally in a <code>finally</code>
+ * block) else the lease will be lost. However, if the client session drops (crash, etc.),
+ * any leases held by the client are automatically closed and made available to other clients.
  * </p>
- *
+ * <p/>
  * <p>
- *     Thanks to Ben Bangert (ben@groovie.org) for the algorithm used.
+ * Thanks to Ben Bangert (ben@groovie.org) for the algorithm used.
  * </p>
  */
 public class InterProcessSemaphoreV2
 {
-    private final Logger                log = LoggerFactory.getLogger(getClass());
-    private final InterProcessMutex     lock;
-    private final CuratorFramework      client;
-    private final String                leasesPath;
-    private final Watcher               watcher = new Watcher()
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final InterProcessMutex lock;
+    private final CuratorFramework client;
+    private final String leasesPath;
+    private final Watcher watcher = new Watcher()
     {
         @Override
         public void process(WatchedEvent event)
@@ -86,16 +89,16 @@ public class InterProcessSemaphoreV2
         }
     };
 
-    private volatile byte[]             nodeData;
-    private volatile int                maxLeases;
+    private volatile byte[] nodeData;
+    private volatile int maxLeases;
 
-    private static final String     LOCK_PARENT = "locks";
-    private static final String     LEASE_PARENT = "leases";
-    private static final String     LEASE_BASE_NAME = "lease-";
+    private static final String LOCK_PARENT = "locks";
+    private static final String LEASE_PARENT = "leases";
+    private static final String LEASE_BASE_NAME = "lease-";
 
     /**
-     * @param client the client
-     * @param path path for the semaphore
+     * @param client    the client
+     * @param path      path for the semaphore
      * @param maxLeases the max number of leases to allow for this instance
      */
     public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases)
@@ -105,8 +108,8 @@ public class InterProcessSemaphoreV2
 
     /**
      * @param client the client
-     * @param path path for the semaphore
-     * @param count the shared count to use for the max leases
+     * @param path   path for the semaphore
+     * @param count  the shared count to use for the max leases
      */
     public InterProcessSemaphoreV2(CuratorFramework client, String path, SharedCountReader
count)
     {
@@ -123,22 +126,22 @@ public class InterProcessSemaphoreV2
         if ( count != null )
         {
             count.addListener
-            (
-                new SharedCountListener()
-                {
-                    @Override
-                    public void countHasChanged(SharedCountReader sharedCount, int newCount)
throws Exception
+                (
+                    new SharedCountListener()
                     {
-                        InterProcessSemaphoreV2.this.maxLeases = newCount;
-                    }
+                        @Override
+                        public void countHasChanged(SharedCountReader sharedCount, int newCount)
throws Exception
+                        {
+                            InterProcessSemaphoreV2.this.maxLeases = newCount;
+                        }
 
-                    @Override
-                    public void stateChanged(CuratorFramework client, ConnectionState newState)
-                    {
-                        // no need to handle this here - clients should set their own connection
state listener
+                        @Override
+                        public void stateChanged(CuratorFramework client, ConnectionState
newState)
+                        {
+                            // no need to handle this here - clients should set their own
connection state listener
+                        }
                     }
-                }
-            );
+                );
         }
     }
 
@@ -148,7 +151,7 @@ public class InterProcessSemaphoreV2
      *
      * @param nodeData node data
      */
-    public void     setNodeData(byte[] nodeData)
+    public void setNodeData(byte[] nodeData)
     {
         this.nodeData = (nodeData != null) ? Arrays.copyOf(nodeData, nodeData.length) : null;
     }
@@ -159,7 +162,7 @@ public class InterProcessSemaphoreV2
      * @return list of nodes
      * @throws Exception ZK errors, interruptions, etc.
      */
-    public Collection<String>   getParticipantNodes() throws Exception
+    public Collection<String> getParticipantNodes() throws Exception
     {
         return client.getChildren().forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
     }
@@ -169,7 +172,7 @@ public class InterProcessSemaphoreV2
      *
      * @param leases leases to close
      */
-    public void     returnAll(Collection<Lease> leases)
+    public void returnAll(Collection<Lease> leases)
     {
         for ( Lease l : leases )
         {
@@ -182,7 +185,7 @@ public class InterProcessSemaphoreV2
      *
      * @param lease lease to close
      */
-    public void     returnLease(Lease lease)
+    public void returnLease(Lease lease)
     {
         Closeables.closeQuietly(lease);
     }
@@ -190,7 +193,7 @@ public class InterProcessSemaphoreV2
     /**
      * <p>Acquire a lease. If no leases are available, this method blocks until either
the maximum
      * number of leases is increased or another client/process closes a lease.</p>
-     *
+     * <p/>
      * <p>The client must close the lease when it is done with it. You should do this
in a
      * <code>finally</code> block.</p>
      *
@@ -207,7 +210,7 @@ public class InterProcessSemaphoreV2
      * <p>Acquire <code>qty</code> leases. If there are not enough leases
available, this method
      * blocks until either the maximum number of leases is increased enough or other clients/processes
      * close enough leases.</p>
-     *
+     * <p/>
      * <p>The client must close the leases when it is done with them. You should do
this in a
      * <code>finally</code> block. NOTE: You can use {@link #returnAll(Collection)}
for this.</p>
      *
@@ -224,7 +227,7 @@ public class InterProcessSemaphoreV2
      * <p>Acquire a lease. If no leases are available, this method blocks until either
the maximum
      * number of leases is increased or another client/process closes a lease. However, this
method
      * will only block to a maximum of the time parameters given.</p>
-     *
+     * <p/>
      * <p>The client must close the lease when it is done with it. You should do this
in a
      * <code>finally</code> block.</p>
      *
@@ -245,11 +248,11 @@ public class InterProcessSemaphoreV2
      * close enough leases. However, this method will only block to a maximum of the time
      * parameters given. If time expires before all leases are acquired, the subset of acquired
      * leases are automatically closed.</p>
-     *
+     * <p/>
      * <p>The client must close the leases when it is done with them. You should do
this in a
      * <code>finally</code> block. NOTE: You can use {@link #returnAll(Collection)}
for this.</p>
      *
-     * @param qty number of leases to acquire
+     * @param qty  number of leases to acquire
      * @param time time to wait
      * @param unit time unit
      * @return the new leases or null if time ran out
@@ -257,77 +260,49 @@ public class InterProcessSemaphoreV2
      */
     public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
     {
-        long                startMs = System.currentTimeMillis();
-        boolean             hasWait = (unit != null);
-        long                waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit)
: 0;
+        long startMs = System.currentTimeMillis();
+        boolean hasWait = (unit != null);
+        long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
 
         Preconditions.checkArgument(qty > 0, "qty cannot be 0");
 
-        ImmutableList.Builder<Lease>    builder = ImmutableList.builder();
-        boolean                         success = false;
+        ImmutableList.Builder<Lease> builder = ImmutableList.builder();
+        boolean success = false;
         try
         {
             while ( qty-- > 0 )
             {
-                if ( !client.isStarted() )
+                int retryCount = 0;
+                long startMillis = System.currentTimeMillis();
+                boolean isDone = false;
+                while ( !isDone )
                 {
-                    return null;
-                }
-
-                if ( hasWait )
-                {
-                    long    thisWaitMs = getThisWaitMs(startMs, waitMs);
-                    if ( !lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS) )
+                    switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs) )
                     {
-                        return null;
-                    }
-                }
-                else
-                {
-                    lock.acquire();
-                }
-                try
-                {
-                    PathAndBytesable<String>    createBuilder = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
-                    String                      path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath,
LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
-                    String                      nodeName = ZKPaths.getNodeFromPath(path);
-                    builder.add(makeLease(path));
+                        case CONTINUE:
+                        {
+                            isDone = true;
+                            break;
+                        }
 
-                    synchronized(this)
-                    {
-                        for(;;)
+                        case RETURN_NULL:
                         {
-                            List<String>    children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
-                            if ( !children.contains(nodeName) )
-                            {
-                                log.error("Sequential path not found: " + path);
-                                throw new KeeperException.NoNodeException("Sequential path
not found: " + path);
-                            }
+                            return null;
+                        }
 
-                            if ( children.size() <= maxLeases )
-                            {
-                                break;
-                            }
-                            if ( hasWait )
-                            {
-                                long    thisWaitMs = getThisWaitMs(startMs, waitMs);
-                                if ( thisWaitMs <= 0 )
-                                {
-                                    return null;
-                                }
-                                wait(thisWaitMs);
-                            }
-                            else
+                        case RETRY_DUE_TO_MISSING_NODE:
+                        {
+                            // gets thrown by internalAcquire1Lease when it can't find the
lock node
+                            // this can happen when the session expires, etc. So, if the
retry allows, just try it all again
+                            if ( !client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,
System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                             {
-                                wait();
+                                throw new KeeperException.NoNodeException("Sequential path
not found - possible session loss");
                             }
+                            // try again
+                            break;
                         }
                     }
                 }
-                finally
-                {
-                    lock.release();
-                }
             }
             success = true;
         }
@@ -342,9 +317,80 @@ public class InterProcessSemaphoreV2
         return builder.build();
     }
 
+    private enum InternalAcquireResult
+    {
+        CONTINUE,
+        RETURN_NULL,
+        RETRY_DUE_TO_MISSING_NODE
+    }
+
+    private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease>
builder, long startMs, boolean hasWait, long waitMs) throws Exception
+    {
+        if ( client.getState() != CuratorFrameworkState.STARTED )
+        {
+            return InternalAcquireResult.RETURN_NULL;
+        }
+
+        if ( hasWait )
+        {
+            long thisWaitMs = getThisWaitMs(startMs, waitMs);
+            if ( !lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS) )
+            {
+                return InternalAcquireResult.RETURN_NULL;
+            }
+        }
+        else
+        {
+            lock.acquire();
+        }
+        try
+        {
+            PathAndBytesable<String> createBuilder = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
+            String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath,
LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
+            String nodeName = ZKPaths.getNodeFromPath(path);
+            builder.add(makeLease(path));
+
+            synchronized(this)
+            {
+                for(;;)
+                {
+                    List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
+                    if ( !children.contains(nodeName) )
+                    {
+                        log.error("Sequential path not found: " + path);
+                        return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
+                    }
+
+                    if ( children.size() <= maxLeases )
+                    {
+                        break;
+                    }
+                    if ( hasWait )
+                    {
+                        long thisWaitMs = getThisWaitMs(startMs, waitMs);
+                        if ( thisWaitMs <= 0 )
+                        {
+                            return InternalAcquireResult.RETURN_NULL;
+                        }
+                        wait(thisWaitMs);
+                    }
+                    else
+                    {
+                        wait();
+                    }
+                }
+            }
+        }
+        finally
+        {
+            lock.release();
+        }
+        return InternalAcquireResult.CONTINUE;
+    }
+
     private long getThisWaitMs(long startMs, long waitMs)
     {
-        long        elapsedMs = System.currentTimeMillis() - startMs;
+        long elapsedMs = System.currentTimeMillis() - startMs;
         return waitMs - elapsedMs;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86e442a1/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
index c321607..73b530c 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.collect.Lists;
@@ -25,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.BaseClassForTests;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.KillSession;
 import org.apache.curator.test.TestingServer;
@@ -45,22 +47,22 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public abstract class TestInterProcessMutexBase extends BaseClassForTests
 {
-    private volatile CountDownLatch         waitLatchForBar = null;
-    private volatile CountDownLatch         countLatchForBar = null;
+    private volatile CountDownLatch waitLatchForBar = null;
+    private volatile CountDownLatch countLatchForBar = null;
 
-    protected abstract InterProcessLock      makeLock(CuratorFramework client);
+    protected abstract InterProcessLock makeLock(CuratorFramework client);
 
     @Test
-    public void     testWaitingProcessKilledServer() throws Exception
+    public void testWaitingProcessKilledServer() throws Exception
     {
-        final Timing            timing = new Timing();
-        final CuratorFramework  client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        final Timing timing = new Timing();
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
         try
         {
             client.start();
 
-            final CountDownLatch            latch = new CountDownLatch(1);
-            ConnectionStateListener         listener = new ConnectionStateListener()
+            final CountDownLatch latch = new CountDownLatch(1);
+            ConnectionStateListener listener = new ConnectionStateListener()
             {
                 @Override
                 public void stateChanged(CuratorFramework client, ConnectionState newState)
@@ -73,8 +75,8 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
             };
             client.getConnectionStateListenable().addListener(listener);
 
-            final AtomicBoolean                 isFirst = new AtomicBoolean(true);
-            ExecutorCompletionService<Object>   service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
+            final AtomicBoolean isFirst = new AtomicBoolean(true);
+            ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
             for ( int i = 0; i < 2; ++i )
             {
                 service.submit
@@ -126,9 +128,9 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
     }
 
     @Test
-    public void     testKilledSession() throws Exception
+    public void testKilledSession() throws Exception
     {
-        final Timing        timing = new Timing();
+        final Timing timing = new Timing();
 
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
         client.start();
@@ -140,34 +142,34 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
             final Semaphore semaphore = new Semaphore(0);
             ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
             service.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
+                (
+                    new Callable<Object>()
                     {
-                        mutex1.acquire();
-                        semaphore.release();
-                        Thread.sleep(1000000);
-                        return null;
+                        @Override
+                        public Object call() throws Exception
+                        {
+                            mutex1.acquire();
+                            semaphore.release();
+                            Thread.sleep(1000000);
+                            return null;
+                        }
                     }
-                }
-            );
+                );
 
             service.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
+                (
+                    new Callable<Object>()
                     {
-                        mutex2.acquire();
-                        semaphore.release();
-                        Thread.sleep(1000000);
-                        return null;
+                        @Override
+                        public Object call() throws Exception
+                        {
+                            mutex2.acquire();
+                            semaphore.release();
+                            Thread.sleep(1000000);
+                            return null;
+                        }
                     }
-                }
-            );
+                );
 
             Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
             KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
@@ -180,7 +182,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
     }
 
     @Test
-    public void     testWithNamespace() throws Exception
+    public void testWithNamespace() throws Exception
     {
         CuratorFramework client = CuratorFrameworkFactory.builder().
             connectString(server.getConnectString()).
@@ -202,57 +204,57 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
     }
 
     @Test
-    public void     testReentrantSingleLock() throws Exception
+    public void testReentrantSingleLock() throws Exception
     {
-        final int           THREAD_QTY = 10;
-        
+        final int THREAD_QTY = 10;
+
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
         client.start();
         try
         {
-            final AtomicBoolean     hasLock = new AtomicBoolean(false);
-            final AtomicBoolean     isFirst = new AtomicBoolean(true);
-            final Semaphore         semaphore = new Semaphore(1);
-            final InterProcessLock  mutex = makeLock(client);
+            final AtomicBoolean hasLock = new AtomicBoolean(false);
+            final AtomicBoolean isFirst = new AtomicBoolean(true);
+            final Semaphore semaphore = new Semaphore(1);
+            final InterProcessLock mutex = makeLock(client);
 
-            List<Future<Object>>    threads = Lists.newArrayList();
-            ExecutorService         service = Executors.newCachedThreadPool();          
 
+            List<Future<Object>> threads = Lists.newArrayList();
+            ExecutorService service = Executors.newCachedThreadPool();
             for ( int i = 0; i < THREAD_QTY; ++i )
             {
-                Future<Object>          t = service.submit
-                (
-                    new Callable<Object>()
-                    {
-                        @Override
-                        public Object call() throws Exception
+                Future<Object> t = service.submit
+                    (
+                        new Callable<Object>()
                         {
-                            semaphore.acquire();
-                            mutex.acquire();
-                            Assert.assertTrue(hasLock.compareAndSet(false, true));
-                            try
+                            @Override
+                            public Object call() throws Exception
                             {
-                                if ( isFirst.compareAndSet(true, false) )
+                                semaphore.acquire();
+                                mutex.acquire();
+                                Assert.assertTrue(hasLock.compareAndSet(false, true));
+                                try
                                 {
-                                    semaphore.release(THREAD_QTY - 1);
-                                    while ( semaphore.availablePermits() > 0 )
+                                    if ( isFirst.compareAndSet(true, false) )
+                                    {
+                                        semaphore.release(THREAD_QTY - 1);
+                                        while ( semaphore.availablePermits() > 0 )
+                                        {
+                                            Thread.sleep(100);
+                                        }
+                                    }
+                                    else
                                     {
                                         Thread.sleep(100);
                                     }
                                 }
-                                else
+                                finally
                                 {
-                                    Thread.sleep(100);
+                                    mutex.release();
+                                    hasLock.set(false);
                                 }
+                                return null;
                             }
-                            finally
-                            {
-                                mutex.release();
-                                hasLock.set(false);
-                            }
-                            return null;
                         }
-                    }
-                );
+                    );
                 threads.add(t);
             }
 
@@ -268,7 +270,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
     }
 
     @Test
-    public void     testReentrant2Threads() throws Exception
+    public void testReentrant2Threads() throws Exception
     {
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
         client.start();
@@ -279,30 +281,30 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
 
             final InterProcessLock mutex = makeLock(client);
             Executors.newSingleThreadExecutor().submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
+                (
+                    new Callable<Object>()
                     {
-                        Assert.assertTrue(countLatchForBar.await(10, TimeUnit.SECONDS));
-                        try
-                        {
-                            mutex.acquire(10, TimeUnit.SECONDS);
-                            Assert.fail();
-                        }
-                        catch ( Exception e )
-                        {
-                            // correct
-                        }
-                        finally
+                        @Override
+                        public Object call() throws Exception
                         {
-                            waitLatchForBar.countDown();
+                            Assert.assertTrue(countLatchForBar.await(10, TimeUnit.SECONDS));
+                            try
+                            {
+                                mutex.acquire(10, TimeUnit.SECONDS);
+                                Assert.fail();
+                            }
+                            catch ( Exception e )
+                            {
+                                // correct
+                            }
+                            finally
+                            {
+                                waitLatchForBar.countDown();
+                            }
+                            return null;
                         }
-                        return null;
                     }
-                }
-            );
+                );
 
             foo(mutex);
             Assert.assertFalse(mutex.isAcquiredInThisProcess());
@@ -314,7 +316,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
     }
 
     @Test
-    public void     testReentrant() throws Exception
+    public void testReentrant() throws Exception
     {
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
         client.start();
@@ -330,7 +332,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
         }
     }
 
-    private void        foo(InterProcessLock mutex) throws Exception
+    private void foo(InterProcessLock mutex) throws Exception
     {
         mutex.acquire(10, TimeUnit.SECONDS);
         Assert.assertTrue(mutex.isAcquiredInThisProcess());
@@ -339,7 +341,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
         mutex.release();
     }
 
-    private void        bar(InterProcessLock mutex) throws Exception
+    private void bar(InterProcessLock mutex) throws Exception
     {
         mutex.acquire(10, TimeUnit.SECONDS);
         Assert.assertTrue(mutex.isAcquiredInThisProcess());
@@ -353,7 +355,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
         mutex.release();
     }
 
-    private void        snafu(InterProcessLock mutex) throws Exception
+    private void snafu(InterProcessLock mutex) throws Exception
     {
         mutex.acquire(10, TimeUnit.SECONDS);
         Assert.assertTrue(mutex.isAcquiredInThisProcess());
@@ -362,7 +364,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
     }
 
     @Test
-    public void     test2Clients() throws Exception
+    public void test2Clients() throws Exception
     {
         CuratorFramework client1 = null;
         CuratorFramework client2 = null;
@@ -376,58 +378,58 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
             final InterProcessLock mutexForClient1 = makeLock(client1);
             final InterProcessLock mutexForClient2 = makeLock(client2);
 
-            final CountDownLatch              latchForClient1 = new CountDownLatch(1);
-            final CountDownLatch              latchForClient2 = new CountDownLatch(1);
-            final CountDownLatch              acquiredLatchForClient1 = new CountDownLatch(1);
-            final CountDownLatch              acquiredLatchForClient2 = new CountDownLatch(1);
+            final CountDownLatch latchForClient1 = new CountDownLatch(1);
+            final CountDownLatch latchForClient2 = new CountDownLatch(1);
+            final CountDownLatch acquiredLatchForClient1 = new CountDownLatch(1);
+            final CountDownLatch acquiredLatchForClient2 = new CountDownLatch(1);
 
-            final AtomicReference<Exception>  exceptionRef = new AtomicReference<Exception>();
+            final AtomicReference<Exception> exceptionRef = new AtomicReference<Exception>();
 
-            ExecutorService                   service = Executors.newCachedThreadPool();
-            Future<Object>                    future1 = service.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
+            ExecutorService service = Executors.newCachedThreadPool();
+            Future<Object> future1 = service.submit
+                (
+                    new Callable<Object>()
                     {
-                        try
-                        {
-                            mutexForClient1.acquire(10, TimeUnit.SECONDS);
-                            acquiredLatchForClient1.countDown();
-                            latchForClient1.await(10, TimeUnit.SECONDS);
-                            mutexForClient1.release();
-                        }
-                        catch ( Exception e )
+                        @Override
+                        public Object call() throws Exception
                         {
-                            exceptionRef.set(e);
+                            try
+                            {
+                                mutexForClient1.acquire(10, TimeUnit.SECONDS);
+                                acquiredLatchForClient1.countDown();
+                                latchForClient1.await(10, TimeUnit.SECONDS);
+                                mutexForClient1.release();
+                            }
+                            catch ( Exception e )
+                            {
+                                exceptionRef.set(e);
+                            }
+                            return null;
                         }
-                        return null;
                     }
-                }
-            );
-            Future<Object>                    future2 = service.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
+                );
+            Future<Object> future2 = service.submit
+                (
+                    new Callable<Object>()
                     {
-                        try
-                        {
-                            mutexForClient2.acquire(10, TimeUnit.SECONDS);
-                            acquiredLatchForClient2.countDown();
-                            latchForClient2.await(10, TimeUnit.SECONDS);
-                            mutexForClient2.release();
-                        }
-                        catch ( Exception e )
+                        @Override
+                        public Object call() throws Exception
                         {
-                            exceptionRef.set(e);
+                            try
+                            {
+                                mutexForClient2.acquire(10, TimeUnit.SECONDS);
+                                acquiredLatchForClient2.countDown();
+                                latchForClient2.await(10, TimeUnit.SECONDS);
+                                mutexForClient2.release();
+                            }
+                            catch ( Exception e )
+                            {
+                                exceptionRef.set(e);
+                            }
+                            return null;
                         }
-                        return null;
                     }
-                }
-            );
+                );
 
             while ( !mutexForClient1.isAcquiredInThisProcess() && !mutexForClient2.isAcquiredInThisProcess()
)
             {


Mime
View raw message