curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject git commit: more read/write tests
Date Fri, 07 Mar 2014 13:25:48 GMT
Repository: curator
Updated Branches:
  refs/heads/CURATOR-88 203b49d90 -> 05d5420f1


more read/write tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/05d5420f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/05d5420f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/05d5420f

Branch: refs/heads/CURATOR-88
Commit: 05d5420f10d72df52a355354d027639170044b99
Parents: 203b49d
Author: randgalt <randgalt@apache.org>
Authored: Fri Mar 7 08:25:35 2014 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Fri Mar 7 08:25:35 2014 -0500

----------------------------------------------------------------------
 .../TestInterProcessReadWriteLockBase.java      | 115 +++++++++++++++++++
 1 file changed, 115 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/05d5420f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
index 0bd36d8..101d40f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
@@ -19,6 +19,8 @@
 
 package org.apache.curator.framework.recipes.locks;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.BaseClassForTests;
@@ -27,18 +29,131 @@ import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.Collection;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class TestInterProcessReadWriteLockBase extends BaseClassForTests
 {
     @Test
+    public void testOrdering() throws Exception
+    {
+        Timing timing = new Timing();
+
+        ExecutorService service = Executors.newCachedThreadPool();
+        try
+        {
+            Semaphore advanceSemaphore = new Semaphore(0);
+            Semaphore acquiredSemaphore = new Semaphore(0);
+            Semaphore stagingSemaphore = new Semaphore(0);
+            Set<Integer> lockers = Sets.newSetFromMap(Maps.<Integer, Boolean>newConcurrentMap());
+            for ( int i = 1; i <= 3; ++i )
+            {
+                service.submit(orderingProc(timing, null, advanceSemaphore, acquiredSemaphore,
lockers, i, false));
+            }
+
+            Assert.assertTrue(timing.acquireSemaphore(acquiredSemaphore, 3));
+            service.submit(orderingProc(timing, null, advanceSemaphore, acquiredSemaphore,
lockers, 4, true));
+            timing.sleepABit();
+
+            // writer should not succeed yet
+            Assert.assertEquals(lockers, Sets.newHashSet(1, 2, 3));
+            lockers.clear();
+            advanceSemaphore.release(3);
+
+            Assert.assertTrue(timing.acquireSemaphore(acquiredSemaphore, 1));
+            service.submit(orderingProc(timing, stagingSemaphore, advanceSemaphore, acquiredSemaphore,
lockers, 5, false));
+            Assert.assertTrue(timing.acquireSemaphore(stagingSemaphore));
+            timing.sleepABit();
+            service.submit(orderingProc(timing, stagingSemaphore, advanceSemaphore, acquiredSemaphore,
lockers, 6, false));
+            Assert.assertTrue(timing.acquireSemaphore(stagingSemaphore));
+            timing.sleepABit();
+            service.submit(orderingProc(timing, stagingSemaphore, advanceSemaphore, acquiredSemaphore,
lockers, 7, true));
+            Assert.assertTrue(timing.acquireSemaphore(stagingSemaphore));
+            timing.sleepABit();
+            service.submit(orderingProc(timing, stagingSemaphore, advanceSemaphore, acquiredSemaphore,
lockers, 8, false));
+            Assert.assertTrue(timing.acquireSemaphore(stagingSemaphore));
+            timing.sleepABit();
+
+            // new readers/writers should not succeed yet
+            Assert.assertEquals(lockers, Sets.newHashSet(4));
+            lockers.clear();
+            advanceSemaphore.release(1);
+
+            Assert.assertTrue(timing.acquireSemaphore(acquiredSemaphore, 2));
+            timing.sleepABit();
+
+            // only 2 readers should succeed next
+            Assert.assertEquals(lockers, Sets.newHashSet(5, 6));
+            lockers.clear();
+            advanceSemaphore.release(2);
+            timing.sleepABit();
+
+            // now 1 writer
+            Assert.assertEquals(lockers, Sets.newHashSet(7));
+            lockers.clear();
+            advanceSemaphore.release(1);
+            timing.sleepABit();
+
+            // last reader
+            Assert.assertEquals(lockers, Sets.newHashSet(8));
+            advanceSemaphore.release(1);
+        }
+        finally
+        {
+            service.shutdownNow();
+        }
+    }
+
+    protected Callable<Void> orderingProc(final Timing timing, final Semaphore stagingSemaphore,
final Semaphore advanceSemaphore, final Semaphore acquiredSemaphore, final Collection<Integer>
lockers, final int index, final boolean isWriter)
+    {
+        return new Callable<Void>()
+        {
+            @Override
+            public Void call() throws Exception
+            {
+                CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+                try
+                {
+                    client.start();
+                    InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+                    InterProcessLock thisLock = isWriter ? lock.writeLock() : lock.readLock();
+
+                    if ( stagingSemaphore != null )
+                    {
+                        stagingSemaphore.release();
+                    }
+
+                    Assert.assertTrue(thisLock.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+                    try
+                    {
+                        acquiredSemaphore.release();
+                        lockers.add(index);
+                        Assert.assertTrue(timing.acquireSemaphore(advanceSemaphore));
+                    }
+                    finally
+                    {
+                        thisLock.release();
+                    }
+                }
+                finally
+                {
+                    CloseableUtils.closeQuietly(client);
+                }
+                return null;
+            }
+        };
+    }
+
+    @Test
     public void testWriterAgainstConstantReaders() throws Exception
     {
         final int CONCURRENCY = 8;


Mime
View raw message