curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject git commit: use Timing
Date Sun, 07 Jul 2013 18:05:33 GMT
Updated Branches:
  refs/heads/2.2.0-incubating 477cb13e5 -> 9b81582da


use Timing


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/9b81582d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/9b81582d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/9b81582d

Branch: refs/heads/2.2.0-incubating
Commit: 9b81582da0d1742336e5025917ce60401041b457
Parents: 477cb13
Author: randgalt <randgalt@apache.org>
Authored: Sun Jul 7 11:05:16 2013 -0700
Committer: randgalt <randgalt@apache.org>
Committed: Sun Jul 7 11:05:16 2013 -0700

----------------------------------------------------------------------
 .../locks/TestInterProcessSemaphore.java        | 406 ++++++++++---------
 1 file changed, 206 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/9b81582d/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 6eeba94..b930cc0 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.collect.Lists;
@@ -47,21 +48,21 @@ public class TestInterProcessSemaphore extends BaseClassForTests
     @Test
     public void testThreadedLeaseIncrease() throws Exception
     {
-        final Timing        timing = new Timing();
-        CuratorFramework    client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+        final Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
         try
         {
             client.start();
 
-            final SharedCount             count = new SharedCount(client, "/foo/count", 1);
+            final SharedCount count = new SharedCount(client, "/foo/count", 1);
             count.start();
 
-            final InterProcessSemaphoreV2   semaphore = new InterProcessSemaphoreV2(client,
"/test", count);
+            final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client,
"/test", count);
 
-            ExecutorService     service = Executors.newCachedThreadPool();
+            ExecutorService service = Executors.newCachedThreadPool();
 
-            final CountDownLatch    latch = new CountDownLatch(1);
-            Future<Object>          future1 = service.submit
+            final CountDownLatch latch = new CountDownLatch(1);
+            Future<Object> future1 = service.submit
                 (
                     new Callable<Object>()
                     {
@@ -71,26 +72,26 @@ public class TestInterProcessSemaphore extends BaseClassForTests
                             Lease lease = semaphore.acquire(timing.seconds(), TimeUnit.SECONDS);
                             Assert.assertNotNull(lease);
                             latch.countDown();
-                            lease = semaphore.acquire(timing.seconds(), TimeUnit.SECONDS);
+                            lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
                             Assert.assertNotNull(lease);
                             return null;
                         }
                     }
                 );
-            Future<Object>          future2 = service.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
+            Future<Object> future2 = service.submit
+                (
+                    new Callable<Object>()
                     {
-                        Assert.assertTrue(latch.await(timing.seconds(), TimeUnit.SECONDS));
-                        timing.sleepABit(); // make sure second acquire is waiting
-                        Assert.assertTrue(count.trySetCount(2));
-                        return null;
+                        @Override
+                        public Object call() throws Exception
+                        {
+                            Assert.assertTrue(latch.await(timing.forWaiting().seconds(),
TimeUnit.SECONDS));
+                            timing.sleepABit(); // make sure second acquire is waiting
+                            Assert.assertTrue(count.trySetCount(2));
+                            return null;
+                        }
                     }
-                }
-            );
+                );
 
             future1.get();
             future2.get();
@@ -104,6 +105,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
     @Test
     public void testClientClose() throws Exception
     {
+        final Timing timing = new Timing();
         CuratorFramework client1 = null;
         CuratorFramework client2 = null;
         InterProcessSemaphoreV2 semaphore1;
@@ -119,7 +121,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
             semaphore1 = new InterProcessSemaphoreV2(client1, "/test", 1);
             semaphore2 = new InterProcessSemaphoreV2(client2, "/test", 1);
 
-            Lease lease = semaphore2.acquire(10, TimeUnit.SECONDS);
+            Lease lease = semaphore2.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
             Assert.assertNotNull(lease);
             lease.close();
 
@@ -128,8 +130,8 @@ public class TestInterProcessSemaphore extends BaseClassForTests
 
             client1.close();    // should release any held leases
             client1 = null;
-            
-            Assert.assertNotNull(semaphore2.acquire(10, TimeUnit.SECONDS));
+
+            Assert.assertNotNull(semaphore2.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
         }
         finally
         {
@@ -139,92 +141,93 @@ public class TestInterProcessSemaphore extends BaseClassForTests
     }
 
     @Test
-    public void     testMaxPerSession() throws Exception
+    public void testMaxPerSession() throws Exception
     {
-        final int             CLIENT_QTY = 10;
-        final int             LOOP_QTY = 100;
-        final Random          random = new Random();
-        final int             SESSION_MAX = random.nextInt(75) + 25;
-
-        List<Future<Object>>  futures = Lists.newArrayList();
-        ExecutorService       service = Executors.newCachedThreadPool();
-        final Counter         counter = new Counter();
-        final AtomicInteger   available = new AtomicInteger(SESSION_MAX);
+        final int CLIENT_QTY = 10;
+        final int LOOP_QTY = 100;
+        final Random random = new Random();
+        final int SESSION_MAX = random.nextInt(75) + 25;
+        final Timing timing = new Timing();
+
+        List<Future<Object>> futures = Lists.newArrayList();
+        ExecutorService service = Executors.newCachedThreadPool();
+        final Counter counter = new Counter();
+        final AtomicInteger available = new AtomicInteger(SESSION_MAX);
         for ( int i = 0; i < CLIENT_QTY; ++i )
         {
             futures.add
-            (
-                service.submit
                 (
-                    new Callable<Object>()
-                    {
-                        @Override
-                        public Object call() throws Exception
-                        {
-                            CuratorFramework    client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
-                            client.start();
-                            try
+                    service.submit
+                        (
+                            new Callable<Object>()
                             {
-                                InterProcessSemaphoreV2   semaphore = new InterProcessSemaphoreV2(client,
"/test", SESSION_MAX);
-
-                                for ( int i = 0; i < LOOP_QTY; ++i )
+                                @Override
+                                public Object call() throws Exception
                                 {
-                                    long    start = System.currentTimeMillis();
-                                    int     thisQty;
-                                    synchronized(available)
-                                    {
-                                        if ( (System.currentTimeMillis() - start) > 10000
)
-                                        {
-                                            throw new TimeoutException();
-                                        }
-                                        while ( available.get() == 0 )
-                                        {
-                                            available.wait(10000);
-                                        }
-
-                                        thisQty = (available.get() > 1) ? (random.nextInt(available.get())
+ 1) : 1;
-
-                                        available.addAndGet(-1 * thisQty);
-                                        Assert.assertTrue(available.get() >= 0);
-                                    }
-                                    Collection<Lease> leases = semaphore.acquire(thisQty,
10, TimeUnit.SECONDS);
-                                    Assert.assertNotNull(leases);
+                                    CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+                                    client.start();
                                     try
                                     {
-                                        synchronized(counter)
+                                        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client,
"/test", SESSION_MAX);
+
+                                        for ( int i = 0; i < LOOP_QTY; ++i )
                                         {
-                                            counter.currentCount += thisQty;
-                                            if ( counter.currentCount > counter.maxCount
)
+                                            long start = System.currentTimeMillis();
+                                            int thisQty;
+                                            synchronized(available)
+                                            {
+                                                if ( (System.currentTimeMillis() - start)
> 10000 )
+                                                {
+                                                    throw new TimeoutException();
+                                                }
+                                                while ( available.get() == 0 )
+                                                {
+                                                    available.wait(timing.forWaiting().milliseconds());
+                                                }
+
+                                                thisQty = (available.get() > 1) ? (random.nextInt(available.get())
+ 1) : 1;
+
+                                                available.addAndGet(-1 * thisQty);
+                                                Assert.assertTrue(available.get() >= 0);
+                                            }
+                                            Collection<Lease> leases = semaphore.acquire(thisQty,
timing.forWaiting().seconds(), TimeUnit.SECONDS);
+                                            Assert.assertNotNull(leases);
+                                            try
                                             {
-                                                counter.maxCount = counter.currentCount;
+                                                synchronized(counter)
+                                                {
+                                                    counter.currentCount += thisQty;
+                                                    if ( counter.currentCount > counter.maxCount
)
+                                                    {
+                                                        counter.maxCount = counter.currentCount;
+                                                    }
+                                                }
+                                                Thread.sleep(random.nextInt(25));
+                                            }
+                                            finally
+                                            {
+                                                synchronized(counter)
+                                                {
+                                                    counter.currentCount -= thisQty;
+                                                }
+                                                semaphore.returnAll(leases);
+                                                synchronized(available)
+                                                {
+                                                    available.addAndGet(thisQty);
+                                                    available.notifyAll();
+                                                }
                                             }
                                         }
-                                        Thread.sleep(random.nextInt(25));
                                     }
                                     finally
                                     {
-                                        synchronized(counter)
-                                        {
-                                            counter.currentCount -= thisQty;
-                                        }
-                                        semaphore.returnAll(leases);
-                                        synchronized(available)
-                                        {
-                                            available.addAndGet(thisQty);
-                                            available.notifyAll();
-                                        }
+                                        client.close();
                                     }
+                                    return null;
                                 }
                             }
-                            finally
-                            {
-                                client.close();
-                            }
-                            return null;
-                        }
-                    }
-                )
-            );
+                        )
+                );
         }
 
         for ( Future<Object> f : futures )
@@ -242,60 +245,61 @@ public class TestInterProcessSemaphore extends BaseClassForTests
     }
 
     @Test
-    public void     testRelease1AtATime() throws Exception
+    public void testRelease1AtATime() throws Exception
     {
-        final int               CLIENT_QTY = 10;
-        final int               MAX = CLIENT_QTY / 2;
-        final AtomicInteger     maxLeases = new AtomicInteger(0);
-        final AtomicInteger     activeQty = new AtomicInteger(0);
-        final AtomicInteger     uses = new AtomicInteger(0);
-
-        List<Future<Object>>    futures = Lists.newArrayList();
-        ExecutorService         service = Executors.newFixedThreadPool(CLIENT_QTY);
+        final Timing timing = new Timing();
+        final int CLIENT_QTY = 10;
+        final int MAX = CLIENT_QTY / 2;
+        final AtomicInteger maxLeases = new AtomicInteger(0);
+        final AtomicInteger activeQty = new AtomicInteger(0);
+        final AtomicInteger uses = new AtomicInteger(0);
+
+        List<Future<Object>> futures = Lists.newArrayList();
+        ExecutorService service = Executors.newFixedThreadPool(CLIENT_QTY);
         for ( int i = 0; i < CLIENT_QTY; ++i )
         {
-            Future<Object>      f = service.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
+            Future<Object> f = service.submit
+                (
+                    new Callable<Object>()
                     {
-                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
-                        client.start();
-                        try
+                        @Override
+                        public Object call() throws Exception
                         {
-                            InterProcessSemaphoreV2   semaphore = new InterProcessSemaphoreV2(client,
"/test", MAX);
-                            Lease lease = semaphore.acquire(10, TimeUnit.SECONDS);
-                            Assert.assertNotNull(lease);
-                            uses.incrementAndGet();
+                            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+                            client.start();
                             try
                             {
-                                synchronized(maxLeases)
+                                InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client,
"/test", MAX);
+                                Lease lease = semaphore.acquire(timing.forWaiting().seconds(),
TimeUnit.SECONDS);
+                                Assert.assertNotNull(lease);
+                                uses.incrementAndGet();
+                                try
                                 {
-                                    int         qty = activeQty.incrementAndGet();
-                                    if ( qty > maxLeases.get() )
+                                    synchronized(maxLeases)
                                     {
-                                        maxLeases.set(qty);
+                                        int qty = activeQty.incrementAndGet();
+                                        if ( qty > maxLeases.get() )
+                                        {
+                                            maxLeases.set(qty);
+                                        }
                                     }
-                                }
 
-                                Thread.sleep(500);
+                                    timing.sleepABit();
+                                }
+                                finally
+                                {
+                                    activeQty.decrementAndGet();
+                                    lease.close();
+                                }
                             }
                             finally
                             {
-                                activeQty.decrementAndGet();
-                                lease.close();
+                                client.close();
                             }
+                            return null;
                         }
-                        finally
-                        {
-                            client.close();
-                        }
-                        return null;
                     }
-                }
-            );
+                );
             futures.add(f);
         }
 
@@ -307,66 +311,67 @@ public class TestInterProcessSemaphore extends BaseClassForTests
         Assert.assertEquals(uses.get(), CLIENT_QTY);
         Assert.assertEquals(maxLeases.get(), MAX);
     }
-    
+
     @Test
-    public void     testReleaseInChunks() throws Exception
+    public void testReleaseInChunks() throws Exception
     {
-        final int       MAX_LEASES = 11;
-        final int       THREADS = 100;
+        final Timing timing = new Timing();
+        final int MAX_LEASES = 11;
+        final int THREADS = 100;
 
-        final CuratorFramework    client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
         client.start();
         try
         {
-            final Stepper         latch = new Stepper();
-            final Random          random = new Random();
-            final Counter         counter = new Counter();
-            ExecutorService       service = Executors.newCachedThreadPool();
-            ExecutorCompletionService<Object>   completionService = new ExecutorCompletionService<Object>(service);
+            final Stepper latch = new Stepper();
+            final Random random = new Random();
+            final Counter counter = new Counter();
+            ExecutorService service = Executors.newCachedThreadPool();
+            ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<Object>(service);
             for ( int i = 0; i < THREADS; ++i )
             {
                 completionService.submit
-                (
-                    new Callable<Object>()
-                    {
-                        @Override
-                        public Object call() throws Exception
+                    (
+                        new Callable<Object>()
                         {
-                            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client,
"/test", MAX_LEASES);
-                            Lease lease = semaphore.acquire(10, TimeUnit.SECONDS);
-                            if ( lease == null )
+                            @Override
+                            public Object call() throws Exception
                             {
-                                throw new Exception("timed out");
-                            }
-                            try
-                            {
-                                synchronized(counter)
+                                InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client,
"/test", MAX_LEASES);
+                                Lease lease = semaphore.acquire(timing.forWaiting().seconds(),
TimeUnit.SECONDS);
+                                if ( lease == null )
+                                {
+                                    throw new Exception("timed out");
+                                }
+                                try
                                 {
-                                    ++counter.currentCount;
-                                    if ( counter.currentCount > counter.maxCount )
+                                    synchronized(counter)
                                     {
-                                        counter.maxCount = counter.currentCount;
+                                        ++counter.currentCount;
+                                        if ( counter.currentCount > counter.maxCount )
+                                        {
+                                            counter.maxCount = counter.currentCount;
+                                        }
+                                        counter.notifyAll();
                                     }
-                                    counter.notifyAll();
-                                }
 
-                                latch.await();
-                            }
-                            finally
-                            {
-                                synchronized(counter)
+                                    latch.await();
+                                }
+                                finally
                                 {
-                                    --counter.currentCount;
+                                    synchronized(counter)
+                                    {
+                                        --counter.currentCount;
+                                    }
+                                    semaphore.returnLease(lease);
                                 }
-                                semaphore.returnLease(lease);
+                                return null;
                             }
-                            return null;
                         }
-                    }
-                );
+                    );
             }
 
-            int     remaining = THREADS;
+            int remaining = THREADS;
             while ( remaining > 0 )
             {
                 int times = Math.min(random.nextInt(5) + 1, remaining);
@@ -374,7 +379,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
                 remaining -= times;
                 Thread.sleep(random.nextInt(100) + 1);
             }
-            
+
             for ( int i = 0; i < THREADS; ++i )
             {
                 completionService.take();
@@ -393,40 +398,40 @@ public class TestInterProcessSemaphore extends BaseClassForTests
             client.close();
         }
     }
-    
+
     @Test
-    public void     testThreads() throws Exception
+    public void testThreads() throws Exception
     {
-        final int        THREAD_QTY = 10;
+        final int THREAD_QTY = 10;
 
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
         client.start();
         try
         {
-            final InterProcessSemaphoreV2   semaphore = new InterProcessSemaphoreV2(client,
"/test", 1);
-            ExecutorService               service = Executors.newFixedThreadPool(THREAD_QTY);
+            final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client,
"/test", 1);
+            ExecutorService service = Executors.newFixedThreadPool(THREAD_QTY);
             for ( int i = 0; i < THREAD_QTY; ++i )
             {
                 service.submit
-                (
-                    new Callable<Object>()
-                    {
-                        @Override
-                        public Object call() throws Exception
+                    (
+                        new Callable<Object>()
                         {
-                            Lease   lease = semaphore.acquire();
-                            try
-                            {
-                                Thread.sleep(1);
-                            }
-                            finally
+                            @Override
+                            public Object call() throws Exception
                             {
-                                lease.close();
+                                Lease lease = semaphore.acquire();
+                                try
+                                {
+                                    Thread.sleep(1);
+                                }
+                                finally
+                                {
+                                    lease.close();
+                                }
+                                return null;
                             }
-                            return null;
                         }
-                    }
-                );
+                    );
             }
             service.shutdown();
             Assert.assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
@@ -437,17 +442,17 @@ public class TestInterProcessSemaphore extends BaseClassForTests
         }
     }
 
-
     @Test
-    public void     testSimple() throws Exception
+    public void testSimple() throws Exception
     {
+        Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
         client.start();
         try
         {
-            InterProcessSemaphoreV2   semaphore = new InterProcessSemaphoreV2(client, "/test",
1);
-            Assert.assertNotNull(semaphore.acquire(10, TimeUnit.SECONDS));
-            Assert.assertNull(semaphore.acquire(3, TimeUnit.SECONDS));
+            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test",
1);
+            Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+            Assert.assertNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
         }
         finally
         {
@@ -456,29 +461,30 @@ public class TestInterProcessSemaphore extends BaseClassForTests
     }
 
     @Test
-    public void     testSimple2() throws Exception
+    public void testSimple2() throws Exception
     {
-        final int       MAX_LEASES = 3;
+        final int MAX_LEASES = 3;
+        Timing timing = new Timing();
 
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1));
         client.start();
         try
         {
-            List<Lease>        leases = Lists.newArrayList();
+            List<Lease> leases = Lists.newArrayList();
             for ( int i = 0; i < MAX_LEASES; ++i )
             {
-                InterProcessSemaphoreV2      semaphore = new InterProcessSemaphoreV2(client,
"/test", MAX_LEASES);
-                Lease                      lease = semaphore.acquire(10, TimeUnit.SECONDS);
+                InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test",
MAX_LEASES);
+                Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
                 Assert.assertNotNull(lease);
                 leases.add(lease);
             }
 
-            InterProcessSemaphoreV2      semaphore = new InterProcessSemaphoreV2(client,
"/test", MAX_LEASES);
-            Lease lease = semaphore.acquire(3, TimeUnit.SECONDS);
+            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test",
MAX_LEASES);
+            Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
             Assert.assertNull(lease);
 
             leases.remove(0).close();
-            Assert.assertNotNull(semaphore.acquire(10, TimeUnit.SECONDS));
+            Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
         }
         finally
         {


Mime
View raw message