accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1438225 - /accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
Date Thu, 24 Jan 2013 22:10:51 GMT
Author: ecn
Date: Thu Jan 24 22:10:51 2013
New Revision: 1438225

URL: http://svn.apache.org/viewvc?rev=1438225&view=rev
Log:
ACCUMULO-985 cleanup batch scanners and batch writers when they are removed from the LRU

Modified:
    accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java

Modified: accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java?rev=1438225&r1=1438224&r2=1438225&view=diff
==============================================================================
--- accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java (original)
+++ accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java Thu Jan
24 22:10:51 2013
@@ -83,20 +83,48 @@ import org.apache.accumulo.proxy.thrift.
 import org.apache.accumulo.proxy.thrift.UserPass;
 import org.apache.accumulo.proxy.thrift.WriterOptions;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 
 public class ProxyServer implements AccumuloProxy.Iface {
   
+  public static final Logger logger = Logger.getLogger(ProxyServer.class);
   protected Instance instance;
   
-  protected class ScannerPlusIterator {
+  static protected class ScannerPlusIterator {
     public ScannerBase scanner;
     public Iterator<Map.Entry<Key,Value>> iterator;
   }
   
+  static class CloseWriter implements RemovalListener<UUID,BatchWriter> {
+    @Override
+    public void onRemoval(RemovalNotification<UUID,BatchWriter> notification) {
+      try {
+        notification.getValue().close();
+      } catch (MutationsRejectedException e) {
+        logger.warn(e, e);
+      }
+    }
+    public CloseWriter() {}
+  }
+  
+  static class CloseScanner implements RemovalListener<UUID,ScannerPlusIterator> {
+    @Override
+    public void onRemoval(RemovalNotification<UUID,ScannerPlusIterator> notification)
{
+      final ScannerBase base = notification.getValue().scanner;
+      if (base instanceof BatchScanner) {
+        final BatchScanner scanner = (BatchScanner)base;
+        scanner.close();
+      }
+    }
+    public CloseScanner() {}
+  }
+
   protected Cache<UUID,ScannerPlusIterator> scannerCache;
   protected Cache<UUID,BatchWriter> writerCache;
   
@@ -108,9 +136,9 @@ public class ProxyServer implements Accu
       instance = new ZooKeeperInstance(props.getProperty("org.apache.accumulo.proxy.ProxyServer.instancename"),
           props.getProperty("org.apache.accumulo.proxy.ProxyServer.zookeepers"));
     
-    scannerCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).maximumSize(1000).build();
+    scannerCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).maximumSize(1000).removalListener(new
CloseScanner()).build();
     
-    writerCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).maximumSize(1000).build();
+    writerCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).maximumSize(1000).removalListener(new
CloseWriter()).build();
   }
   
   protected Connector getConnector(UserPass userpass) throws Exception {



Mime
View raw message