curator-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CURATOR-335) InterProcessSemaphoreV2 can deadlock under network stress
Date Sun, 05 Jun 2016 23:49:59 GMT

    [ https://issues.apache.org/jira/browse/CURATOR-335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15316082#comment-15316082
] 

ASF GitHub Bot commented on CURATOR-335:
----------------------------------------

Github user asfgit closed the pull request at:

    https://github.com/apache/curator/pull/155


> InterProcessSemaphoreV2 can deadlock under network stress
> ---------------------------------------------------------
>
>                 Key: CURATOR-335
>                 URL: https://issues.apache.org/jira/browse/CURATOR-335
>             Project: Apache Curator
>          Issue Type: Bug
>          Components: Recipes
>    Affects Versions: 3.1.0, 2.10.0
>            Reporter: Jordan Zimmerman
>            Assignee: Jordan Zimmerman
>            Priority: Critical
>             Fix For: 2.11.0, 3.2.0
>
>
> Under network stress, InterProcessSemaphoreV2 can stop acquiring new leases. This test
(by [~cammckenzie]) shows the issues :
> {code}
> package org.apache.curator.framework.recipes.locks;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicInteger;
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.CuratorFrameworkFactory;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.framework.state.ConnectionStateListener;
> import org.apache.curator.retry.RetryOneTime;
> import org.apache.curator.test.BaseClassForTests;
> import org.apache.curator.utils.CloseableUtils;
> import org.apache.zookeeper.KeeperException;
> import org.testng.Assert;
> import org.testng.ITestContext;
> import org.testng.annotations.BeforeSuite;
> import org.testng.annotations.Test;
> public class TestInterProcessMutexNotReconnecting extends BaseClassForTests
> {
>     @Test
>     public void test() throws Exception
>     {
>         final String SEMAPHORE_PATH = "/test";
>         final int MAX_SEMAPHORES = 1;
>         final int NUM_CLIENTS = 10;
>         
>         server.start();
>         
>         CuratorFramework client = null;
>         ExecutorService executor = Executors.newFixedThreadPool(NUM_CLIENTS);
>         
>         final AtomicInteger counter = new AtomicInteger(0);
>         final AtomicBoolean run = new AtomicBoolean(true);
>         
>         try {
>             client = CuratorFrameworkFactory.newClient(server.getConnectString(), 5000,
5000, new RetryOneTime(1));
>             client.start();
>             
>             final CuratorFramework lClient = client;
>             
>             for(int i = 0; i < NUM_CLIENTS; ++i)
>             {
>                 executor.execute(new Runnable()
>                     {
>                     
>                     @Override
>                     public void run()
>                     {
>                         while(run.get())
>                         {
>                             InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(lClient,
SEMAPHORE_PATH, MAX_SEMAPHORES);
>                             System.err.println(Thread.currentThread() + "Acquiring");
>                             Lease lease = null;
>                             try
>                             {
>                                 lease = semaphore.acquire();
>                                 System.err.println(Thread.currentThread() + "Acquired");
>                                 counter.incrementAndGet();
>                                 Thread.sleep(2000);
>                             }
>                             catch(InterruptedException e)
>                             {
>                                 System.err.println("Interrupted");
>                                 Thread.currentThread().interrupt();
>                                 break;
>                             }
>                             catch(KeeperException e)
>                             {
>                                 try
>                                 {
>                                     Thread.sleep(2000);
>                                 }
>                                 catch(InterruptedException e2)
>                                 {
>                                     System.err.println("Interrupted");
>                                     Thread.currentThread().interrupt();
>                                     break;
>                                 }
>                             }
>                             catch(Exception e)
>                             {
>                                 e.printStackTrace();
>                             }
>                             finally
>                             {
>                                 if(lease != null) {
>                                     semaphore.returnLease(lease);
>                                 }
>                             }
>                         }
>                     }
>                     });
>             }
>             
>             final AtomicBoolean lost = new AtomicBoolean(false);
>             client.getConnectionStateListenable().addListener(new ConnectionStateListener()
{
>                 
>                 @Override
>                 public void stateChanged(CuratorFramework client, ConnectionState newState)
{
>                    System.err.println("New state : " + newState);
>                    
>                    if(newState == ConnectionState.LOST) {
>                        lost.set(true);
>                    }
>                 }
>             });
>             
>             Thread.sleep(2000);
>             
>             System.err.println("Stopping server");
>             server.stop();
>             System.err.println("Stopped server");
>             
>             while(!lost.get())
>             {
>                 Thread.sleep(1000);
>             }
>             
>             int preRestartCount = counter.get();
>             
>             System.err.println("Restarting server");
>             server.restart();
>             
>             long startCheckTime = System.currentTimeMillis();
>             while(true)
>             {
>                 if(counter.get() > preRestartCount)
>                 {
>                     break;
>                 }
>                 else if((System.currentTimeMillis() - startCheckTime) > 30000)
>                 {
>                     Assert.fail("Semaphores not reacquired after restart");
>                 }
>             }
>         }
>         finally
>         {
>             run.set(false);
>             executor.shutdownNow();
>             CloseableUtils.closeQuietly(client);
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message