accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject git commit: ACCUMULO-1379 - Adding close() to Instance to assist in freeing up resources
Date Wed, 02 Oct 2013 18:57:42 GMT
Updated Branches:
  refs/heads/master addacd0d6 -> 7da1164d8


ACCUMULO-1379 - Adding close() to Instance to assist in freeing up resources


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7da1164d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7da1164d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7da1164d

Branch: refs/heads/master
Commit: 7da1164d87227960d3e0cfc841f753067e2c0304
Parents: addacd0
Author: John Vines <jvines@gmail.com>
Authored: Wed Oct 2 14:57:01 2013 -0400
Committer: John Vines <jvines@gmail.com>
Committed: Wed Oct 2 14:57:01 2013 -0400

----------------------------------------------------------------------
 .../apache/accumulo/core/client/Instance.java   |  6 ++++
 .../accumulo/core/client/ZooKeeperInstance.java | 36 ++++++++++++++++++++
 .../core/client/impl/ThriftTransportPool.java   | 16 +++++++--
 .../accumulo/core/client/mock/MockInstance.java |  5 +++
 .../apache/accumulo/core/util/ThriftUtil.java   |  4 +++
 .../core/client/impl/TabletLocatorImplTest.java |  5 +++
 .../accumulo/fate/zookeeper/ZooCache.java       |  7 ++++
 .../accumulo/fate/zookeeper/ZooReader.java      |  4 +++
 .../accumulo/server/client/HdfsZooInstance.java |  9 +++++
 .../accumulo/test/ConditionalWriterTest.java    | 28 +++++++++++----
 10 files changed, 111 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7da1164d/core/src/main/java/org/apache/accumulo/core/client/Instance.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Instance.java b/core/src/main/java/org/apache/accumulo/core/client/Instance.java
index 0796059..95fc933 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Instance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Instance.java
@@ -148,4 +148,10 @@ public interface Instance {
    */
   public abstract Connector getConnector(String principal, AuthenticationToken token) throws
AccumuloException, AccumuloSecurityException;
   
+  /**
+   * Closes up the instance to free up all associated resources. You should try to reuse
an Instance as much as you can because there is some location caching
+   * stored which will enhance performance.
+   * @throws AccumuloException 
+   */
+  public abstract void close() throws AccumuloException;
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7da1164d/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
index ba8c094..5b5d041 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.ConnectorImpl;
@@ -33,6 +34,7 @@ import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.OpTimer;
 import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.hadoop.io.Text;
@@ -66,6 +68,8 @@ public class ZooKeeperInstance implements Instance {
   
   private final int zooKeepersSessionTimeOut;
   
+  private boolean closed = false;
+  
   /**
    * 
    * @param instanceName
@@ -95,6 +99,7 @@ public class ZooKeeperInstance implements Instance {
     this.zooKeepersSessionTimeOut = sessionTimeout;
     zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
     getInstanceID();
+    clientInstances.incrementAndGet();
   }
   
   /**
@@ -125,10 +130,13 @@ public class ZooKeeperInstance implements Instance {
     this.zooKeepers = zooKeepers;
     this.zooKeepersSessionTimeOut = sessionTimeout;
     zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
+    clientInstances.incrementAndGet();
   }
   
   @Override
   public String getInstanceID() {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     if (instanceId == null) {
       // want the instance id to be stable for the life of this instance object,
       // so only get it once
@@ -152,6 +160,8 @@ public class ZooKeeperInstance implements Instance {
   
   @Override
   public List<String> getMasterLocations() {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK;
     
     OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in
zoocache.");
@@ -167,6 +177,8 @@ public class ZooKeeperInstance implements Instance {
   
   @Override
   public String getRootTabletLocation() {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     String zRootLocPath = ZooUtil.getRoot(this) + RootTable.ZROOT_TABLET_LOCATION;
     
     OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location
in zookeeper.");
@@ -182,6 +194,8 @@ public class ZooKeeperInstance implements Instance {
   
   @Override
   public String getInstanceName() {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     if (instanceName == null)
       instanceName = lookupInstanceName(zooCache, UUID.fromString(getInstanceID()));
     
@@ -212,6 +226,8 @@ public class ZooKeeperInstance implements Instance {
   
   @Override
   public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException,
AccumuloSecurityException {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     return new ConnectorImpl(this, new Credentials(principal, token));
   }
   
@@ -249,5 +265,25 @@ public class ZooKeeperInstance implements Instance {
     }
     return null;
   }
+
+  static private final AtomicInteger clientInstances = new AtomicInteger(0);
+  
+  @Override
+  public void close() throws AccumuloException {
+    if (!closed && clientInstances.decrementAndGet() == 0)
+    try {
+      zooCache.close();
+      ThriftUtil.close();
+    } catch (InterruptedException e) {
+      throw new AccumuloException("Issues closing ZooKeeper.");
+    }
+    closed = true;
+  }
   
+  @Override
+  public void finalize() {
+    // This method intentionally left blank. Users need to explicitly close Instances if
they want things cleaned up nicely.
+    if (!closed)
+      log.warn("ZooKeeperInstance being cleaned up without being closed. Please remember
to call close() before dereferencing to clean up threads.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7da1164d/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index f12a9c3..e7dabb5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -81,13 +81,15 @@ public class ThriftTransportPool {
   
   private static class Closer implements Runnable {
     final ThriftTransportPool pool;
+    final AtomicBoolean stop;
     
-    public Closer(ThriftTransportPool pool) {
+    public Closer(ThriftTransportPool pool, AtomicBoolean stop) {
       this.pool = pool;
+      this.stop = stop;
     }
     
     public void run() {
-      while (true) {
+      while (!stop.get()) {
         
         ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
         
@@ -581,6 +583,7 @@ public class ThriftTransportPool {
   
   private static ThriftTransportPool instance = new ThriftTransportPool();
   private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
+  private static AtomicBoolean stopDaemon;
   
   public static ThriftTransportPool getInstance() {
     SecurityManager sm = System.getSecurityManager();
@@ -589,8 +592,15 @@ public class ThriftTransportPool {
     }
     
     if (daemonStarted.compareAndSet(false, true)) {
-      new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
+      stopDaemon = new AtomicBoolean(false);
+      new Daemon(new Closer(instance, stopDaemon), "Thrift Connection Pool Checker").start();
     }
     return instance;
   }
+  
+  public static void close() {
+    if (daemonStarted.compareAndSet(true, false)) {
+      stopDaemon.set(true);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7da1164d/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
index 2ba8c67..d391464 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
@@ -158,4 +158,9 @@ public class MockInstance implements Instance {
       throw new AccumuloSecurityException(principal, SecurityErrorCode.BAD_CREDENTIALS);
     return conn;
   }
+
+  @Override
+  public void close() throws AccumuloException {
+    // NOOP
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7da1164d/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
index fc18440..e8dd6a2 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
@@ -207,4 +207,8 @@ public class ThriftUtil {
   public static TProtocolFactory protocolFactory() {
     return protocolFactory;
   }
+  
+  public static void close() {
+    ThriftTransportPool.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7da1164d/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
index f4fe0a6..8f3fa1d 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
@@ -459,6 +459,11 @@ public class TabletLocatorImplTest extends TestCase {
     public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException,
AccumuloSecurityException {
       throw new UnsupportedOperationException();
     }
+    
+    @Override
+    public void close() throws AccumuloException {
+      // NOOP
+    }
   }
   
   static class TServers {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7da1164d/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
index aa24552..420533a 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
@@ -307,4 +307,11 @@ public class ZooCache {
     
     return zc;
   }
+  
+  public void close() throws InterruptedException {
+    cache.clear();
+    statCache.clear();
+    childrenCache.clear();
+    zReader.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7da1164d/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
index ab73012..e11f570 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
@@ -102,4 +102,8 @@ public class ZooReader implements IZooReader {
     this.keepers = keepers;
     this.timeout = timeout;
   }
+
+  public void close() throws InterruptedException {
+    getZooKeeper().close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7da1164d/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java b/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
index c0a3e2b..9e6bbe7 100644
--- a/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
+++ b/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
@@ -186,5 +186,14 @@ public class HdfsZooInstance implements Instance {
     System.out.println("ZooKeepers: " + instance.getZooKeepers());
     System.out.println("Masters: " + StringUtil.join(instance.getMasterLocations(), ", "));
   }
+
+  @Override
+  public void close() throws AccumuloException {
+    try {
+      zooCache.close();
+    } catch (InterruptedException e) {
+      throw new AccumuloException("Issues closing ZooKeeper, try again");
+    }
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7da1164d/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
index 936e0ac..633ea76 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
@@ -176,6 +176,7 @@ public class ConditionalWriterTest {
     Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
     
     Assert.assertEquals("doe", scanner.iterator().next().getValue().toString());
+    zki.close();
   }
   
   @Test
@@ -260,7 +261,8 @@ public class ConditionalWriterTest {
     Assert.assertEquals("2", entry.getValue().toString());
     
     // TODO test each field w/ absence
-    
+    zki.close();
+   
   }
   
   @Test
@@ -359,6 +361,8 @@ public class ConditionalWriterTest {
     } catch (AccumuloSecurityException ase) {}
     
     cw2.close();
+    
+    zki.close();
   }
   
   @Test
@@ -390,7 +394,8 @@ public class ConditionalWriterTest {
     Assert.assertTrue(scanner.iterator().hasNext());
     
     cw.close();
-    
+    zki.close();
+   
   }
   
   @Test
@@ -488,6 +493,7 @@ public class ConditionalWriterTest {
     // TODO test w/ table that has iterators configured
     
     cw.close();
+    zki.close();
   }
   
   @Test
@@ -594,6 +600,7 @@ public class ConditionalWriterTest {
     Assert.assertEquals("Doe", scanner.iterator().next().getValue().toString());
     
     cw.close();
+    zki.close();
   }
   
   @Test
@@ -669,6 +676,7 @@ public class ConditionalWriterTest {
     Assert.assertEquals(num, count);
     
     cw.close();
+    zki.close();
   }
   
   @Test
@@ -751,6 +759,7 @@ public class ConditionalWriterTest {
     Assert.assertFalse(iter.hasNext());
     
     cw.close();
+    zki.close();
   }
   
   @Test
@@ -804,6 +813,7 @@ public class ConditionalWriterTest {
     Assert.assertEquals(3, total);
     
     cw.close();
+    zki.close();
   }
   
   private static class Stats {
@@ -1011,6 +1021,7 @@ public class ConditionalWriterTest {
       Iterator<Entry<Key,Value>> row = rowIter.next();
       new Stats(row);
     }
+    zki.close();
   }
   
   private SortedSet<Text> nss(String... splits) {
@@ -1064,7 +1075,8 @@ public class ConditionalWriterTest {
     } catch (AccumuloSecurityException ase) {
       
     }
-    
+    zki.close();
+   
   }
   
   @Test
@@ -1115,7 +1127,8 @@ public class ConditionalWriterTest {
     Assert.assertEquals(cw.write(cm3).getStatus(), Status.ACCEPTED);
     
     cw.close();
-    
+    zki.close();
+
   }
   
   @Test
@@ -1148,7 +1161,8 @@ public class ConditionalWriterTest {
     } catch (AccumuloException ae) {
       Assert.assertEquals(TableDeletedException.class, ae.getCause().getClass());
     }
-    
+    zki.close();
+
   }
   
   @Test
@@ -1185,6 +1199,7 @@ public class ConditionalWriterTest {
       conn.createConditionalWriter(table, new ConditionalWriterConfig());
       Assert.assertFalse(true);
     } catch (TableOfflineException e) {}
+    zki.close();
   }
   
   void waitForSingleTabletTableToGoOffline(String table, ZooKeeperInstance zki) throws AccumuloException,
AccumuloSecurityException, TableNotFoundException {
@@ -1222,7 +1237,8 @@ public class ConditionalWriterTest {
     }
     
     cw.close();
-    
+    zki.close();
+
   }
   
   @AfterClass


Mime
View raw message