accumulo-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Keith Turner (JIRA)" <j...@apache.org>
Subject [jira] [Created] (ACCUMULO-2027) ZooKeeperInstance.close() not freeing resources in multithreaded env
Date Mon, 16 Dec 2013 19:01:09 GMT
Keith Turner created ACCUMULO-2027:
--------------------------------------

             Summary: ZooKeeperInstance.close() not freeing resources in multithreaded env
                 Key: ACCUMULO-2027
                 URL: https://issues.apache.org/jira/browse/ACCUMULO-2027
             Project: Accumulo
          Issue Type: Bug
            Reporter: Keith Turner
            Priority: Critical
             Fix For: 1.4.5, 1.5.1, 1.6.0


While looking at the changes related to ZooKeeperInstance.close() in the 1.4.5-SNAPSHOT branch
I noticed there were race conditions where resources were not properly released.   One type
of race condition is where a thread is between a closed check in ZooKeeperInstance and calling
a ZooCache method when ZooKeeperInstance.close() is called.  The following is an example situation

 # Thread 1 uses ZooKeeperInstance1 to get a zoocache.
 # Thread 2 calls close() on ZooKeeperInstnce1 which calls close() on zoocache
 # Thread 1 uses the zoocache it has reference to, causing a new zookeeper connection to be
created.

Below is an example program that will trigger this behavior.   For me this little example
program reliably shows a connected zookeeper after all of the threads die.  If I use 0 threads
it will show a closed zookeeper connection at the end.

{code:java}

 static class WriteTask implements Runnable {

    private BatchWriter writer;
    private Random rand;

    WriteTask(Connector conn) throws TableNotFoundException {
      rand = new Random();
      writer = conn.createBatchWriter("foo5", 10000000, 30000, 1);
    }

    @Override
    public void run() {
      try {
        while (true) {
          Mutation m1 = new Mutation(String.format("%06d", rand.nextInt(1000000)));
          m1.put(String.format("%06d", rand.nextInt(100)), String.format("%06d", rand.nextInt(100)),
String.format("%06d", rand.nextInt(1000000)));
          writer.addMutation(m1);
          writer.flush();
        }
      } catch (Exception e) {
        System.out.println(e.getMessage());
      }

    }

  }

  static class ReadTask implements Runnable {

    private Scanner scanner;

    ReadTask(Connector conn) throws TableNotFoundException {
      scanner = conn.createScanner("foo5", new Authorizations());
    }

    @Override
    public void run() {
      try {
        while (true) {

          for (Entry<Key,Value> entry : scanner) {

          }
        }
      } catch (Exception e) {
        System.out.println(e.getMessage());
      }

    }

  }

  @Test(timeout = 30000)
  public void test2() throws Exception {
    ZooKeeperInstance zki = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());

    Connector conn = zki.getConnector("root", "superSecret");

    conn.tableOperations().create("foo5");

    ArrayList<Thread> threads = new ArrayList<Thread>();

    int numThreads = 10;

    for (int i = 0; i < numThreads; i++) {
      Thread t = new Thread(new WriteTask(conn));
      t.start();
      threads.add(t);
    }

    for (int i = 0; i < numThreads; i++) {
      Thread t = new Thread(new ReadTask(conn));
      t.start();
      threads.add(t);
    }

    // let threads get spun up
    Thread.sleep(1000);

    ZooSession.printSessions();

    zki.close();

    // wait for the threads to die
    for (Thread thread : threads) {
      thread.join();
    }

    ZooSession.printSessions();

  }

{code}

Below are some changes I made to ZooSession for debugging purposes.

{noformat}
diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
index b3db26f..475a21d 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
@@ -20,6 +20,8 @@
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.log4j.Logger;
@@ -29,7 +31,7 @@
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
 
-class ZooSession {
+public class ZooSession {
   
   private static final Logger log = Logger.getLogger(ZooSession.class);
   
@@ -121,6 +123,8 @@
     
     ZooSessionInfo zsi = sessions.get(sessionKey);
     if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) {
+      System.out.println("Removing closed session ");
+      new Exception().printStackTrace();
       if (auth != null && sessions.get(readOnlySessionKey) == zsi)
         sessions.remove(readOnlySessionKey);
       zsi = null;
@@ -137,4 +141,13 @@
     }
     return zsi.zooKeeper;
   }
+
+  public static synchronized void printSessions() {
+    Set<Entry<String,ZooSessionInfo>> es = sessions.entrySet();
+
+    for (Entry<String,ZooSessionInfo> entry : es) {
+      System.out.println(entry.getKey() + " " + entry.getValue().zooKeeper.getState());
+    }
+  }
+
 }
{noformat}

With the above changes I will see an exception like the following when one of the race conditions
occurs.

{noformat}
Removing closed session 
java.lang.Exception
	at org.apache.accumulo.core.zookeeper.ZooSession.getSession(ZooSession.java:127)
	at org.apache.accumulo.core.zookeeper.ZooReader.getSession(ZooReader.java:37)
	at org.apache.accumulo.core.zookeeper.ZooReader.getZooKeeper(ZooReader.java:41)
	at org.apache.accumulo.core.zookeeper.ZooCache.getZooKeeper(ZooCache.java:56)
	at org.apache.accumulo.core.zookeeper.ZooCache.retry(ZooCache.java:127)
	at org.apache.accumulo.core.zookeeper.ZooCache.get(ZooCache.java:233)
	at org.apache.accumulo.core.zookeeper.ZooCache.get(ZooCache.java:188)
	at org.apache.accumulo.core.client.ZooKeeperInstance.getInstanceID(ZooKeeperInstance.java:156)
	at org.apache.accumulo.core.client.impl.TabletLocator.getInstance(TabletLocator.java:96)
	at org.apache.accumulo.core.client.impl.ThriftScanner.scan(ThriftScanner.java:245)
	at org.apache.accumulo.core.client.impl.ScannerIterator$Reader.run(ScannerIterator.java:94)
	at org.apache.accumulo.core.client.impl.ScannerIterator.hasNext(ScannerIterator.java:176)
	at org.apache.accumulo.minicluster.MiniAccumuloClusterTest$ReadTask.run(MiniAccumuloClusterTest.java:109)
	at java.lang.Thread.run(Thread.java:662)
{noformat}





--
This message was sent by Atlassian JIRA
(v6.1.4#6159)

Mime
View raw message