curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cammcken...@apache.org
Subject [3/3] curator git commit: #noissue - Test case for interprocess mutex not reconnecting.
Date Thu, 02 Jun 2016 02:06:38 GMT
 #noissue - Test case for interprocess mutex not reconnecting.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/92dbba79
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/92dbba79
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/92dbba79

Branch: refs/heads/interprocess_mutex_issue
Commit: 92dbba79904dc362c124caba25dc87eebcc92040
Parents: e7fa01c
Author: Cam McKenzie <cammckenzie@apache.org>
Authored: Thu Jun 2 11:57:47 2016 +1000
Committer: Cam McKenzie <cammckenzie@apache.org>
Committed: Thu Jun 2 11:57:47 2016 +1000

----------------------------------------------------------------------
 .../TestInterProcessMutexNotReconnecting.java   | 147 +++++++++++++++++++
 1 file changed, 147 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/92dbba79/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexNotReconnecting.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexNotReconnecting.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexNotReconnecting.java
new file mode 100755
index 0000000..87468bd
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexNotReconnecting.java
@@ -0,0 +1,147 @@
+package org.apache.curator.framework.recipes.locks;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+
+public class TestInterProcessMutexNotReconnecting extends BaseClassForTests
+{
+    @org.testng.annotations.Test
+    public void test() throws Exception
+    {
+        final String SEMAPHORE_PATH = "/test";
+        final int MAX_SEMAPHORES = 1;
+        final int NUM_CLIENTS = 10;
+        
+        server.start();
+        
+        CuratorFramework client = null;
+
+        ExecutorService executor = Executors.newFixedThreadPool(NUM_CLIENTS);
+        
+        final AtomicInteger counter = new AtomicInteger(0);
+        final AtomicBoolean run = new AtomicBoolean(true);
+        
+        try {
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), 5000, 5000,
new RetryOneTime(1));
+            client.start();
+            
+            final CuratorFramework lClient = client;
+            
+            for(int i = 0; i < NUM_CLIENTS; ++i)
+            {
+                executor.execute(new Runnable()
+                    {
+                    
+                    @Override
+                    public void run()
+                    {
+                        while(run.get())
+                        {
+                            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(lClient,
SEMAPHORE_PATH, MAX_SEMAPHORES);
+                            System.err.println(Thread.currentThread() + "Acquiring");
+                            Lease lease = null;
+                            try
+                            {
+                                lease = semaphore.acquire();
+                                System.err.println(Thread.currentThread() + "Acquired");
+                                counter.incrementAndGet();
+                                Thread.sleep(2000);
+                            }
+                            catch(InterruptedException e)
+                            {
+                                System.err.println("Interrupted");
+                                Thread.currentThread().interrupt();
+                                break;
+                            }
+                            catch(KeeperException e)
+                            {
+                                try
+                                {
+                                    Thread.sleep(2000);
+                                }
+                                catch(InterruptedException e2)
+                                {
+                                    System.err.println("Interrupted");
+                                    Thread.currentThread().interrupt();
+                                    break;
+                                }
+                            }
+                            catch(Exception e)
+                            {
+                                e.printStackTrace();
+                            }
+                            finally
+                            {
+                                if(lease != null) {
+                                    semaphore.returnLease(lease);
+                                }
+                            }
+                        }
+                    }
+                    });
+            }
+            
+
+            final AtomicBoolean lost = new AtomicBoolean(false);
+            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
{
+                
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
{
+                   System.err.println("New state : " + newState);
+                   
+                   if(newState == ConnectionState.LOST) {
+                       lost.set(true);
+                   }
+                }
+            });
+            
+            Thread.sleep(2000);
+            
+            System.err.println("Stopping server");
+            server.stop();
+            System.err.println("Stopped server");
+            
+            while(!lost.get())
+            {
+                Thread.sleep(1000);
+            }
+            
+            int preRestartCount = counter.get();
+            
+            System.err.println("Restarting server");
+            server.restart();
+            
+            long startCheckTime = System.currentTimeMillis();
+            while(true)
+            {
+                if(counter.get() > preRestartCount)
+                {
+                    break;
+                }
+                else if((System.currentTimeMillis() - startCheckTime) > 30000)
+                {
+                    Assert.fail("Semaphores not reacquired after restart");
+                }
+            }
+
+        }
+        finally
+        {
+            run.set(false);
+            executor.shutdownNow();
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+}


Mime
View raw message