incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [09/50] [abbrv] git commit: BLUR-183, Auto disconnect idle clients in ClientPool
Date Sun, 03 Nov 2013 15:20:03 GMT
BLUR-183, Auto disconnect idle clients in ClientPool

Signed-off-by: Aaron McCurry <amccurry@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/0b02b8b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/0b02b8b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/0b02b8b3

Branch: refs/heads/0.3.0-lucene-upgrade
Commit: 0b02b8b3a8925eacbd4e371a5acc128045a222f3
Parents: 57810c8
Author: Vikrant Navalgund <vikrant.navalgund@gmail.com>
Authored: Wed Oct 23 10:36:11 2013 +1100
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Oct 23 08:25:44 2013 -0400

----------------------------------------------------------------------
 .../apache/blur/thrift/BlurClientManager.java   |   2 +-
 .../java/org/apache/blur/thrift/ClientPool.java | 125 ++++++++++++++++---
 .../org/apache/blur/utils/BlurConstants.java    |   4 +-
 3 files changed, 114 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0b02b8b3/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
index 25773a9..40604ff 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
@@ -296,7 +296,7 @@ public class BlurClientManager {
   }
 
   public static void close(Client client) {
-    _clientPool.close(client);
+    ClientPool.close(client);
   }
 
   public static Client newClient(Connection connection) throws TTransportException, IOException
{

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0b02b8b3/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
index 92544a3..440eea1 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
@@ -22,11 +22,17 @@ import java.net.InetSocketAddress;
 import java.net.Proxy;
 import java.net.Proxy.Type;
 import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.blur.BlurConfiguration;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TBinaryProtocol;
@@ -34,44 +40,135 @@ import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TFramedTransport;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TSocket;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import static org.apache.blur.utils.BlurConstants.BLUR_CLIENTPOOL_CLIENT_CLOSE_THRESHOLD;
+import static org.apache.blur.utils.BlurConstants.BLUR_CLIENTPOOL_CLIENT_CLEAN_FREQUENCY;
 import org.apache.blur.thrift.generated.Blur.Client;
 
 public class ClientPool {
 
   private static final Log LOG = LogFactory.getLog(ClientPool.class);
-  private final Map<Connection, BlockingQueue<Client>> _clientPool = new ConcurrentHashMap<Connection,
BlockingQueue<Client>>();
+  private static final Map<Connection, BlockingQueue<Client>> _connMap = new
ConcurrentHashMap<Connection, BlockingQueue<Client>>();
   private int _maxConnectionsPerHost = Integer.MAX_VALUE;
+  private static AtomicBoolean _running = new AtomicBoolean(true);
+  private static long _idleTimeBeforeClosingClient;
+  private static long _clientPoolCleanFrequency;
+  private static Thread _master;
 
-  // private long _idleTimeBeforeClosingClient = Long.MAX_VALUE;
+  static {
+    try {
+      BlurConfiguration config = new BlurConfiguration();
+      _idleTimeBeforeClosingClient = config.getLong(BLUR_CLIENTPOOL_CLIENT_CLOSE_THRESHOLD,
+                      TimeUnit.SECONDS.toMillis(30));
+      _clientPoolCleanFrequency = config.getLong(BLUR_CLIENTPOOL_CLIENT_CLEAN_FREQUENCY,
+              TimeUnit.SECONDS.toMillis(300)); 
+    } catch (Exception e) {
+        throw new RuntimeException(e);
+    }
+    checkAndRemoveStaleClients();
+  }
+  
+  private static void checkAndRemoveStaleClients() {
+    _master = new Thread(new Runnable() {
+    @Override
+    public void run() {
+      while (_running.get()) {
+        try {
+          Thread.sleep(getClientPoolCleanFrequency());
+          List<Thread> workers = new ArrayList<Thread>();
+          int num = 0;
+          for (Connection connection : _connMap.keySet()) {
+            Thread thread = new poolWorker(connection);
+            thread.setName("client-cleaner_" + ++num);
+            thread.start();
+            workers.add(thread);
+          }
+          for(Thread t : workers) {
+            t.join();
+          }
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+      }
+    }
+    });
+    _master.setDaemon(true);
+    _master.setName("Blur-Client-Connection-Cleaner");
+    _master.start();
+  }
+  
+  private static class poolWorker extends Thread {
+	private final Connection connection;
+	public poolWorker(Connection conn) {
+	  this.connection = conn;
+	}
+	@Override
+	public void run() {
+	  BlockingQueue<Client> bq = _connMap.get(connection);
+	  synchronized(connection) {
+	    if (!_connMap.get(connection).isEmpty()) {
+	      Iterator<Client> it = bq.iterator();
+	      try {
+	        while (it.hasNext()) {
+		      Client client = it.next();
+		      if (((WeightedClient)client).isStale()) {
+		    	close(client);
+		        bq.take();
+		      } else break;
+		    }
+	      } catch (InterruptedException e) {
+	          throw new RuntimeException(e);
+          }
+	    }
+	  }
+	}
+  }
+
+  private class WeightedClient extends Client {
+	private long _enqueueTime;
+	public WeightedClient(TProtocol prot) {
+	  super(prot);
+	}
+	public void setEnqueTime(long _currentTime) {
+	  this._enqueueTime = _currentTime;
+	}
+	public boolean isStale() {
+	  long diff = System.currentTimeMillis() - _enqueueTime;
+   	  return diff >= getClientIdleTimeThreshold();
+	}
+  }
+
+  private static long getClientIdleTimeThreshold() { return _idleTimeBeforeClosingClient;
}
+  private static long getClientPoolCleanFrequency() { return _clientPoolCleanFrequency; }
 
   public void returnClient(Connection connection, Client client) {
     try {
+      ((WeightedClient) client).setEnqueTime(System.currentTimeMillis());	
       getQueue(connection).put(client);
     } catch (InterruptedException e) {
-      throw new RuntimeException(e);
+        throw new RuntimeException(e);
     }
   }
 
   private BlockingQueue<Client> getQueue(Connection connection) {
-    BlockingQueue<Client> blockingQueue = _clientPool.get(connection);
-    synchronized (_clientPool) {
-      blockingQueue = _clientPool.get(connection);
+    BlockingQueue<Client> blockingQueue;
+    synchronized (connection) {
+      blockingQueue = _connMap.get(connection);
       if (blockingQueue == null) {
         blockingQueue = getNewQueue();
-        _clientPool.put(connection, blockingQueue);
+        _connMap.put(connection, blockingQueue);
       }
     }
-    return _clientPool.get(connection);
+    return _connMap.get(connection);
   }
 
   public void trashConnections(Connection connection, Client client) {
     BlockingQueue<Client> blockingQueue;
-    synchronized (_clientPool) {
-      blockingQueue = _clientPool.put(connection, getNewQueue());
+    synchronized (connection) {
+      blockingQueue = _connMap.put(connection, getNewQueue());
       try {
         blockingQueue.put(client);
       } catch (InterruptedException e) {
-        throw new RuntimeException(e);
+          throw new RuntimeException(e);
       }
     }
 
@@ -115,13 +212,11 @@ public class ClientPool {
     trans = new TSocket(socket);
 
     TProtocol proto = new TBinaryProtocol(new TFramedTransport(trans));
-    Client client = new Client(proto);
-    return client;
+    return new WeightedClient(proto);
   }
 
-  public void close(Client client) {
+  public static void close(Client client) {
     client.getInputProtocol().getTransport().close();
     client.getOutputProtocol().getTransport().close();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0b02b8b3/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index ebebdb5..d58d304 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -100,7 +100,9 @@ public class BlurConstants {
   public static final String BLUR_CONTROLLER_THRIFT_SELECTOR_THREADS = "blur.controller.thrift.selector.threads";
   public static final String BLUR_CONTROLLER_THRIFT_MAX_READ_BUFFER_BYTES = "blur.controller.thrift.max.read.buffer.bytes";
   public static final String BLUR_CONTROLLER_THRIFT_ACCEPT_QUEUE_SIZE_PER_THREAD = "blur.controller.thrift.accept.queue.size.per.thread";
-
+  public static final String BLUR_CLIENTPOOL_CLIENT_CLOSE_THRESHOLD = "blur.clientpool.client.close.threshold";
+  public static final String BLUR_CLIENTPOOL_CLIENT_CLEAN_FREQUENCY = "blur.clientpool.client.clean.frequency";
+  
   public static final String BLUR_GUI_CONTROLLER_PORT = "blur.gui.controller.port";
   public static final String BLUR_GUI_SHARD_PORT = "blur.gui.shard.port";
 


Mime
View raw message