incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Cleaning up client connection pool and adding more defaults settings to the configuration.
Date Tue, 21 Oct 2014 13:49:31 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 6fdbd90c4 -> 27dae736c


Cleaning up client connection pool and adding more defaults settings to the configuration.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/27dae736
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/27dae736
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/27dae736

Branch: refs/heads/master
Commit: 27dae736c44402405592726876d755854232a623
Parents: 6fdbd90
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Oct 21 09:49:21 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Oct 21 09:49:21 2014 -0400

----------------------------------------------------------------------
 .../blur/thrift/BlurControllerServer.java       |   1 +
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   |   7 +
 .../java/org/apache/blur/thrift/ClientPool.java | 136 +++++++++++--------
 .../java/org/apache/blur/thrift/Connection.java |   8 +-
 .../org/apache/blur/utils/BlurConstants.java    |   3 +-
 .../src/main/resources/blur-default.properties  |   9 ++
 6 files changed, 106 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27dae736/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 477e0d0..466f84a 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -289,6 +289,7 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
       client.ping();
       LOG.debug("Pinging shard server [{0}]", shardServer);
     } catch (Exception e) {
+      e.printStackTrace();
       LOG.error("Error while trying to ping shard server [{0}]", shardServer);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27dae736/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index 526c12e..ba68c33 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -374,6 +374,9 @@ public class HdfsKeyValueStore implements Store {
   }
 
   public void cleanupOldFiles() throws IOException {
+    if (!isOpenForWriting()) {
+      return;
+    }
     SortedSet<FileStatus> fileStatusSet = getSortedSet(_path);
     if (fileStatusSet == null || fileStatusSet.size() < 1) {
       return;
@@ -398,6 +401,10 @@ public class HdfsKeyValueStore implements Store {
     }
   }
 
+  private boolean isOpenForWriting() {
+    return _output != null;
+  }
+
   private Operation getPutOperation(OperationType put, BytesRef key, BytesRef value) {
     Operation operation = new Operation();
     operation.type = put;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27dae736/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
index 5c3063e..d77ebb7 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
@@ -18,7 +18,8 @@ package org.apache.blur.thrift;
  */
 
 import static org.apache.blur.utils.BlurConstants.BLUR_CLIENTPOOL_CLIENT_CLEAN_FREQUENCY;
-import static org.apache.blur.utils.BlurConstants.BLUR_CLIENTPOOL_CLIENT_CLOSE_THRESHOLD;
+import static org.apache.blur.utils.BlurConstants.BLUR_CLIENTPOOL_CLIENT_STALE_THRESHOLD;
+import static org.apache.blur.utils.BlurConstants.BLUR_CLIENTPOOL_CLIENT_MAX_CONNECTIONS_PER_HOST;
 import static org.apache.blur.utils.BlurConstants.BLUR_THRIFT_MAX_FRAME_SIZE;
 
 import java.io.IOException;
@@ -26,13 +27,15 @@ import java.net.InetSocketAddress;
 import java.net.Proxy;
 import java.net.Proxy.Type;
 import java.net.Socket;
+import java.net.SocketAddress;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.log.Log;
@@ -50,79 +53,87 @@ public class ClientPool {
 
   private static final Log LOG = LogFactory.getLog(ClientPool.class);
   private static final Map<Connection, BlockingQueue<Client>> _connMap = new
ConcurrentHashMap<Connection, BlockingQueue<Client>>();
-  private int _maxConnectionsPerHost = Integer.MAX_VALUE;
   private static final int _maxFrameSize;
-  private static AtomicBoolean _running = new AtomicBoolean(true);
-  private static long _idleTimeBeforeClosingClient;
-  private static long _clientPoolCleanFrequency;
-  private static Thread _master;
+  private static final int _maxConnectionsPerHost;
+
+  private static final long _idleTimeBeforeClosingClient;
+  private static final long _clientPoolCleanFrequency;
+  private static final Timer _master;
 
   static {
     try {
       BlurConfiguration config = new BlurConfiguration();
-      _idleTimeBeforeClosingClient = config.getLong(BLUR_CLIENTPOOL_CLIENT_CLOSE_THRESHOLD,
-          TimeUnit.SECONDS.toNanos(30));
-      _clientPoolCleanFrequency = config.getLong(BLUR_CLIENTPOOL_CLIENT_CLEAN_FREQUENCY,
TimeUnit.SECONDS.toMillis(3));
+      int maxConnectionsPerHost = config.getInt(BLUR_CLIENTPOOL_CLIENT_MAX_CONNECTIONS_PER_HOST,
64);
+      if (maxConnectionsPerHost < 1) {
+        LOG.fatal("Max connections per host cannot be less than 1 current value [{0}] using
1.", maxConnectionsPerHost);
+        maxConnectionsPerHost = 1;
+      }
+      _maxConnectionsPerHost = maxConnectionsPerHost;
+      _idleTimeBeforeClosingClient = TimeUnit.SECONDS.toNanos(config
+          .getLong(BLUR_CLIENTPOOL_CLIENT_STALE_THRESHOLD, 30));
+      _clientPoolCleanFrequency = TimeUnit.SECONDS.toMillis(config.getLong(BLUR_CLIENTPOOL_CLIENT_CLEAN_FREQUENCY,
10));
       _maxFrameSize = config.getInt(BLUR_THRIFT_MAX_FRAME_SIZE, 16384000);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    checkAndRemoveStaleClients();
+    _master = checkAndRemoveStaleClients();
   }
 
-  private static void checkAndRemoveStaleClients() {
-    _master = new Thread(new Runnable() {
+  public static void close() {
+    _master.cancel();
+    _master.purge();
+  }
+
+  private static Timer checkAndRemoveStaleClients() {
+    Timer master = new Timer("Blur-Client-Connection-Cleaner", true);
+    master.schedule(new TimerTask() {
       @Override
       public void run() {
-        while (_running.get()) {
-          for (Entry<Connection, BlockingQueue<Client>> e : _connMap.entrySet())
{
-            testConnections(e.getKey(), e.getValue());
-          }
-          try {
-            Thread.sleep(getClientPoolCleanFrequency());
-          } catch (InterruptedException e) {
-            return;
-          }
+        for (Entry<Connection, BlockingQueue<Client>> e : _connMap.entrySet())
{
+          testConnections(e.getKey(), e.getValue());
         }
       }
+    }, getClientPoolCleanFrequency(), getClientPoolCleanFrequency());
+    return master;
+  }
 
-      private void testConnections(Connection connection, BlockingQueue<Client> clients)
{
-        LOG.debug("Testing clients for connection [{0}]", connection);
-        int size = clients.size();
-        for (int i = 0; i < size; i++) {
-          WeightedClient weightedClient = (WeightedClient) clients.poll();
-          if (weightedClient == null) {
-            return;
-          }
-          if (weightedClient.isStale()) {
-            if (testClient(connection, weightedClient)) {
-              tryToReturnToQueue(clients, weightedClient);
-            } else {
-              close(weightedClient);
-            }
-          } else {
-            tryToReturnToQueue(clients, weightedClient);
-          }
-        }
+  private static void testConnections(Connection connection, BlockingQueue<Client>
clients) {
+    int size = clients.size();
+    LOG.debug("Testing clients for connection [{0}] has size of [{1}]", connection, size);
+    for (int i = 0; i < size; i++) {
+      WeightedClient weightedClient = (WeightedClient) clients.poll();
+      if (weightedClient == null) {
+        return;
       }
-
-      private void tryToReturnToQueue(BlockingQueue<Client> clients, WeightedClient
weightedClient) {
-        if (!clients.offer(weightedClient)) {
-          // Close client
+      if (weightedClient.isStale()) {
+        if (testClient(connection, weightedClient)) {
+          tryToReturnToQueue(clients, weightedClient);
+        } else {
+          LOG.error("Closing potentially bad client [{0}]", weightedClient);
           close(weightedClient);
         }
+      } else {
+        tryToReturnToQueue(clients, weightedClient);
       }
-    });
-    _master.setDaemon(true);
-    _master.setName("Blur-Client-Connection-Cleaner");
-    _master.start();
+    }
+  }
+
+  private static void tryToReturnToQueue(BlockingQueue<Client> clients, WeightedClient
weightedClient) {
+    LOG.debug("Offering client [{0}] to queue.", weightedClient);
+    if (!clients.offer(weightedClient)) {
+      // Close client
+      LOG.info("Too many clients in pool, closing client [{0}]", weightedClient);
+      close(weightedClient);
+    }
   }
 
   private class WeightedClient extends SafeClientGen {
     private long _lastUse = System.nanoTime();
+    private final String _id;
 
-    public WeightedClient(TProtocol prot) {
+    public WeightedClient(TProtocol prot, String id) {
       super(prot);
+      _id = id;
     }
 
     public void touch() {
@@ -133,6 +144,12 @@ public class ClientPool {
       long diff = System.nanoTime() - _lastUse;
       return diff >= getClientIdleTimeThreshold();
     }
+
+    @Override
+    public String toString() {
+      return _id;
+    }
+
   }
 
   private static long getClientIdleTimeThreshold() {
@@ -144,11 +161,10 @@ public class ClientPool {
   }
 
   public void returnClient(Connection connection, Client client) {
-    ((WeightedClient) client).touch();
-    if (!getQueue(connection).offer(client)) {
-      // Close client
-      close(client);
-    }
+    BlockingQueue<Client> queue = getQueue(connection);
+    WeightedClient weightedClient = (WeightedClient) client;
+    weightedClient.touch();
+    tryToReturnToQueue(queue, weightedClient);
   }
 
   private BlockingQueue<Client> getQueue(Connection connection) {
@@ -176,7 +192,7 @@ public class ClientPool {
         throw new RuntimeException(e);
       }
     }
-    LOG.info("Trashing client for connections [{0}]", connection);
+    LOG.debug("Trashing client for connections [{0}]", connection);
     for (Client c : blockingQueue) {
       close(c);
     }
@@ -198,11 +214,13 @@ public class ClientPool {
   public Client getClient(Connection connection) throws TTransportException, IOException
{
     BlockingQueue<Client> blockingQueue = getQueue(connection);
     if (blockingQueue.isEmpty()) {
+      LOG.debug("New client for connection [{0}]", connection);
       return newClient(connection);
     }
     while (true) {
       WeightedClient client = (WeightedClient) blockingQueue.poll();
       if (client == null) {
+        LOG.debug("New client for connection [{0}]", connection);
         return newClient(connection);
       }
       if (client.isStale()) {
@@ -234,7 +252,13 @@ public class ClientPool {
     trans = new TSocket(socket);
 
     TProtocol proto = new TBinaryProtocol(new TFramedTransport(trans, _maxFrameSize));
-    return new WeightedClient(proto);
+    return new WeightedClient(proto, getIdentifer(socket));
+  }
+
+  private String getIdentifer(Socket socket) {
+    SocketAddress localSocketAddress = socket.getLocalSocketAddress();
+    SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();
+    return localSocketAddress.toString() + " -> " + remoteSocketAddress.toString();
   }
 
   private static boolean testClient(Connection connection, WeightedClient weightedClient)
{

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27dae736/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java b/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java
index c976a47..2b9843a 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java
@@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class Connection {
 
-  public final static int DEFAULT_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(10);
+  public final static int DEFAULT_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60);
 
   private final String _host;
   private final int _port;
@@ -160,4 +160,10 @@ public class Connection {
     }
     return _host + ":" + _port + "#" + _timeout;
   }
+
+  @Override
+  public String toString() {
+    return getConnectionStr();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27dae736/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 43c0b83..0e0f5b0 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -137,7 +137,8 @@ public class BlurConstants {
   public static final String BLUR_CONTROLLER_THRIFT_SELECTOR_THREADS = "blur.controller.thrift.selector.threads";
   public static final String BLUR_CONTROLLER_THRIFT_MAX_READ_BUFFER_BYTES = "blur.controller.thrift.max.read.buffer.bytes";
   public static final String BLUR_CONTROLLER_THRIFT_ACCEPT_QUEUE_SIZE_PER_THREAD = "blur.controller.thrift.accept.queue.size.per.thread";
-  public static final String BLUR_CLIENTPOOL_CLIENT_CLOSE_THRESHOLD = "blur.clientpool.client.close.threshold";
+  public static final String BLUR_CLIENTPOOL_CLIENT_MAX_CONNECTIONS_PER_HOST = "blur.clientpool.client.max.connections.per.host";
+  public static final String BLUR_CLIENTPOOL_CLIENT_STALE_THRESHOLD = "blur.clientpool.client.stale.threshold";
   public static final String BLUR_CLIENTPOOL_CLIENT_CLEAN_FREQUENCY = "blur.clientpool.client.clean.frequency";
   public static final String BLUR_LUCENE_FST_BYTEARRAY_FACTORY = "blur.lucene.fst.bytearray.factory";
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27dae736/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index 1fa82cf..999167e 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -19,6 +19,15 @@ blur.zookeeper.timeout=90000
 # The path in HDFS where the distributed traces will be stored, if blank trace output will
be written to the log or the ZooKeeper store.
 blur.hdfs.trace.path=
 
+# Max number of connections per host.
+blur.clientpool.client.max.connections.per.host=64
+
+# Number of seconds between use that a connection will be marked as stale.
+blur.clientpool.client.stale.threshold=30
+
+# Number of seconds between checking connections.
+blur.clientpool.client.clean.frequency=10
+
 # The maximum number of results that can be fetched in a single request
 blur.query.max.results.fetch=1000
 


Mime
View raw message