hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [31/50] hbase git commit: HBASE-15948 Port "HADOOP-9956 RPC listener inefficiently assigns connections to readers" Adds HADOOP-9955 RPC idle connection closing is extremely inefficient Then removes queue added by HADOOP-9956 at Enis suggestion
Date Sat, 11 Jun 2016 04:56:17 GMT
HBASE-15948 Port "HADOOP-9956 RPC listener inefficiently assigns connections to readers"
Adds HADOOP-9955 RPC idle connection closing is extremely inefficient
Then removes queue added by HADOOP-9956 at Enis suggestion

    Changes how we do accounting of Connections to match how it is done in Hadoop.
    Adds a ConnectionManager class. Adds new configurations for this new class.

    "hbase.ipc.client.idlethreshold" 4000
    "hbase.ipc.client.connection.idle-scan-interval.ms" 10000
    "hbase.ipc.client.connection.maxidletime" 10000
    "hbase.ipc.client.kill.max", 10
    "hbase.ipc.server.handler.queue.size", 100

    The new scheme does away with synchronization that purportedly would freeze out
    reads while we were cleaning up stale connections (according to HADOOP-9955)

    Also adds in new mechanism for accepting Connections by pulling in as many
    as we can at a time adding them to a Queue instead of doing one at a time.
    Can help when bursty traffic according to HADOOP-9956. Removes a blocking
    while Reader is busy parsing a request. Adds configuration
    "hbase.ipc.server.read.connection-queue.size" with default of 100 for
    queue size.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e0b70c00
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e0b70c00
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e0b70c00

Branch: refs/heads/hbase-12439
Commit: e0b70c00e74aeaac33570508e3732a53daea839e
Parents: da88b48
Author: stack <stack@apache.org>
Authored: Fri Jun 3 15:38:07 2016 -0700
Committer: stack <stack@apache.org>
Committed: Tue Jun 7 13:10:14 2016 -0700

----------------------------------------------------------------------
 .../hbase/ipc/MetricsHBaseServerSource.java     |  10 +-
 .../ipc/MetricsHBaseServerWrapperImpl.java      |   6 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 408 +++++++++++--------
 .../regionserver/SimpleRpcSchedulerFactory.java |   2 +-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       |   2 +-
 5 files changed, 241 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e0b70c00/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index bb89789..ce57e0f 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -52,14 +52,16 @@ public interface MetricsHBaseServerSource extends BaseSource {
   String TOTAL_CALL_TIME_NAME = "totalCallTime";
   String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time.";
   String QUEUE_SIZE_NAME = "queueSize";
-  String QUEUE_SIZE_DESC = "Number of bytes in the call queues.";
+  String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and
" +
+    "parsed and is waiting to run or is currently being executed.";
   String GENERAL_QUEUE_NAME = "numCallsInGeneralQueue";
-  String GENERAL_QUEUE_DESC = "Number of calls in the general call queue.";
+  String GENERAL_QUEUE_DESC = "Number of calls in the general call queue; " +
+    "parsed requests waiting in scheduler to be executed";
   String PRIORITY_QUEUE_NAME = "numCallsInPriorityQueue";
   String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
   String REPLICATION_QUEUE_DESC =
-      "Number of calls in the replication call queue.";
-  String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue.";
+      "Number of calls in the replication call queue waiting to be run";
+  String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be
run";
   String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections";
   String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
   String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0b70c00/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
index 9979c75..4f53709 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
@@ -36,7 +36,7 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
     if (!isServerStarted()) {
       return 0;
     }
-    return server.callQueueSize.get();
+    return server.callQueueSizeInBytes.get();
   }
 
   @Override
@@ -65,10 +65,10 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
 
   @Override
   public int getNumOpenConnections() {
-    if (!isServerStarted() || this.server.connectionList == null) {
+    if (!isServerStarted()) {
       return 0;
     }
-    return server.connectionList.size();
+    return server.getNumOpenConnections();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0b70c00/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 483ce86..aca3fdd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -48,15 +48,16 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -113,6 +114,7 @@ import org.apache.hadoop.hbase.util.Counter;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -183,11 +185,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
    */
   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
 
-  /**
-   * The maximum size that we can hold in the RPC queue
-   */
-  private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
-
   private final IPCUtil ipcUtil;
 
   private static final String AUTH_FAILED_FOR = "Auth failed for ";
@@ -210,22 +207,30 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
   protected int port;                             // port we listen on
   protected InetSocketAddress address;            // inet address we listen on
   private int readThreads;                        // number of read threads
-  protected int maxIdleTime;                      // the maximum idle time after
-                                                  // which a client may be
-                                                  // disconnected
-  protected int thresholdIdleConnections;         // the number of idle
-                                                  // connections after which we
-                                                  // will start cleaning up idle
-                                                  // connections
-  int maxConnectionsToNuke;                       // the max number of
-                                                  // connections to nuke
-                                                  // during a cleanup
-
   protected MetricsHBaseServer metrics;
 
   protected final Configuration conf;
 
-  private int maxQueueSize;
+  /**
+   * Maximum size in bytes of the currently queued and running Calls. If a new Call puts
us over
+   * this size, then we will reject the call (after parsing it though). It will go back to
the
+   * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size".
The
+   * call queue size gets incremented after we parse a call and before we add it to the queue
of
+   * calls for the scheduler to use. It get decremented after we have 'run' the Call. The
current
+   * size is kept in {@link #callQueueSizeInBytes}.
+   * @see {@link #callQueueSizeInBytes}
+   * @see {@link #DEFAULT_MAX_CALLQUEUE_SIZE}
+   * @see {@link #callQueueSizeInBytes}
+   */
+  private final long maxQueueSizeInBytes;
+  private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
+
+  /**
+   * This is a running count of the size in bytes of all outstanding calls whether currently
+   * executing or queued waiting to be run.
+   */
+  protected final Counter callQueueSizeInBytes = new Counter();
+
   protected int socketSendBufferSize;
   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
   protected final boolean tcpKeepAlive; // if T then use keepalives
@@ -244,19 +249,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
    */
   volatile boolean started = false;
 
-  /**
-   * This is a running count of the size of all outstanding calls by size.
-   */
-  protected final Counter callQueueSize = new Counter();
-
-  protected final List<Connection> connectionList =
-    Collections.synchronizedList(new LinkedList<Connection>());
-  //maintain a list
-  //of client connections
+  // maintains the set of client connections and handles idle timeouts
+  private ConnectionManager connectionManager;
   private Listener listener = null;
   protected Responder responder = null;
   protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
-  protected int numConnections = 0;
 
   protected HBaseRPCErrorHandler errorHandler = null;
 
@@ -623,18 +620,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
     private Selector selector = null; //the selector that we use for the server
     private Reader[] readers = null;
     private int currentReader = 0;
-    private Random rand = new Random();
-    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
-                                         //-tion (for idle connections) ran
-    private long cleanupInterval = 10000; //the minimum interval between
-                                          //two cleanup runs
-    private int backlogLength;
 
     private ExecutorService readPool;
 
     public Listener(final String name) throws IOException {
       super(name);
-      backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
+      // The backlog of requests that we will have the serversocket carry.
+      int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
       // Create a new server socket and set to non blocking mode
       acceptChannel = ServerSocketChannel.open();
       acceptChannel.configureBlocking(false);
@@ -644,9 +636,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
       address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
       // create a selector;
-      selector= Selector.open();
+      selector = Selector.open();
 
       readers = new Reader[readThreads];
+      // Why this executor thing? Why not like hadoop just start up all the threads? I suppose
it
+      // has an advantage in that it is easy to shutdown the pool.
       readPool = Executors.newFixedThreadPool(readThreads,
         new ThreadFactoryBuilder().setNameFormat(
           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
@@ -667,12 +661,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
 
 
     private class Reader implements Runnable {
-      private volatile boolean adding = false;
       private final Selector readSelector;
 
       Reader() throws IOException {
         this.readSelector = Selector.open();
       }
+
       @Override
       public void run() {
         try {
@@ -686,14 +680,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
         }
       }
 
-      private synchronized void doRunLoop() {
+      private void doRunLoop() {
         while (running) {
           try {
             readSelector.select();
-            while (adding) {
-              this.wait(1000);
-            }
-
             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
             while (iter.hasNext()) {
               SelectionKey key = iter.next();
@@ -703,9 +693,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
                   doRead(key);
                 }
               }
+              key = null;
             }
           } catch (InterruptedException e) {
-            LOG.debug("Interrupted while sleeping");
+            if (running) {                      // unexpected -- log it
+              LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
+            }
             return;
           } catch (IOException ex) {
             LOG.info(getName() + ": IOException in Reader", ex);
@@ -714,76 +707,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       }
 
       /**
-       * This gets reader into the state that waits for the new channel
-       * to be registered with readSelector. If it was waiting in select()
-       * the thread will be woken up, otherwise whenever select() is called
-       * it will return even if there is nothing to read and wait
-       * in while(adding) for finishAdd call
+       * Updating the readSelector while it's being used is not thread-safe,
+       * so the connection must be queued.  The reader will drain the queue
+       * and update its readSelector before performing the next select
        */
-      public void startAdd() {
-        adding = true;
+      public void addConnection(Connection conn) throws IOException {
+        conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
         readSelector.wakeup();
       }
-
-      public synchronized SelectionKey registerChannel(SocketChannel channel)
-        throws IOException {
-        return channel.register(readSelector, SelectionKey.OP_READ);
-      }
-
-      public synchronized void finishAdd() {
-        adding = false;
-        this.notify();
-      }
-    }
-
-    /** cleanup connections from connectionList. Choose a random range
-     * to scan and also have a limit on the number of the connections
-     * that will be cleanedup per run. The criteria for cleanup is the time
-     * for which the connection was idle. If 'force' is true then all
-     * connections will be looked at for the cleanup.
-     * @param force all connections will be looked at for cleanup
-     */
-    private void cleanupConnections(boolean force) {
-      if (force || numConnections > thresholdIdleConnections) {
-        long currentTime = System.currentTimeMillis();
-        if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
-          return;
-        }
-        int start = 0;
-        int end = numConnections - 1;
-        if (!force) {
-          start = rand.nextInt() % numConnections;
-          end = rand.nextInt() % numConnections;
-          int temp;
-          if (end < start) {
-            temp = start;
-            start = end;
-            end = temp;
-          }
-        }
-        int i = start;
-        int numNuked = 0;
-        while (i <= end) {
-          Connection c;
-          synchronized (connectionList) {
-            try {
-              c = connectionList.get(i);
-            } catch (Exception e) {return;}
-          }
-          if (c.timedOut(currentTime)) {
-            if (LOG.isDebugEnabled())
-              LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
-            closeConnection(c);
-            numNuked++;
-            end--;
-            //noinspection UnusedAssignment
-            c = null;
-            if (!force && numNuked == maxConnectionsToNuke) break;
-          }
-          else i++;
-        }
-        lastCleanupRunTime = System.currentTimeMillis();
-      }
     }
 
     @Override
@@ -792,6 +723,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
         "it will have per impact")
     public void run() {
       LOG.info(getName() + ": starting");
+      connectionManager.startIdleScan();
       while (running) {
         SelectionKey key = null;
         try {
@@ -815,7 +747,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
             if (errorHandler.checkOOME(e)) {
               LOG.info(getName() + ": exiting on OutOfMemoryError");
               closeCurrentConnection(key, e);
-              cleanupConnections(true);
+              connectionManager.closeIdle(true);
               return;
             }
           } else {
@@ -824,22 +756,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
             // some thread(s) a chance to finish
             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
             closeCurrentConnection(key, e);
-            cleanupConnections(true);
+            connectionManager.closeIdle(true);
             try {
               Thread.sleep(60000);
             } catch (InterruptedException ex) {
               LOG.debug("Interrupted while sleeping");
-              return;
             }
           }
         } catch (Exception e) {
           closeCurrentConnection(key, e);
         }
-        cleanupConnections(false);
       }
-
       LOG.info(getName() + ": stopping");
-
       synchronized (this) {
         try {
           acceptChannel.close();
@@ -851,10 +779,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
         selector= null;
         acceptChannel= null;
 
-        // clean up all connections
-        while (!connectionList.isEmpty()) {
-          closeConnection(connectionList.remove(0));
-        }
+        // close all connections
+        connectionManager.stopIdleScan();
+        connectionManager.closeAll();
       }
     }
 
@@ -862,10 +789,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       if (key != null) {
         Connection c = (Connection)key.attachment();
         if (c != null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
-                (e != null ? " on error " + e.getMessage() : ""));
-          }
           closeConnection(c);
           key.attach(null);
         }
@@ -876,37 +799,24 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       return address;
     }
 
-    void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
-      Connection c;
+    void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError
{
       ServerSocketChannel server = (ServerSocketChannel) key.channel();
-
       SocketChannel channel;
       while ((channel = server.accept()) != null) {
-        try {
-          channel.configureBlocking(false);
-          channel.socket().setTcpNoDelay(tcpNoDelay);
-          channel.socket().setKeepAlive(tcpKeepAlive);
-        } catch (IOException ioe) {
-          channel.close();
-          throw ioe;
-        }
-
+        channel.configureBlocking(false);
+        channel.socket().setTcpNoDelay(tcpNoDelay);
+        channel.socket().setKeepAlive(tcpKeepAlive);
         Reader reader = getReader();
-        try {
-          reader.startAdd();
-          SelectionKey readKey = reader.registerChannel(channel);
-          c = getConnection(channel, System.currentTimeMillis());
-          readKey.attach(c);
-          synchronized (connectionList) {
-            connectionList.add(numConnections, c);
-            numConnections++;
+        Connection c = connectionManager.register(channel);
+        // If the connectionManager can't take it, close the connection.
+        if (c == null) {
+          if (channel.isOpen()) {
+            IOUtils.cleanup(null, channel);
           }
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + ": connection from " + c.toString() +
-                "; # active connections: " + numConnections);
-        } finally {
-          reader.finishAdd();
+          continue;
         }
+        key.attach(c);  // so closeCurrentConnection can get the object
+        reader.addConnection(c);
       }
     }
 
@@ -919,12 +829,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       c.setLastContact(System.currentTimeMillis());
       try {
         count = c.readAndProcess();
-
-        if (count > 0) {
-          c.setLastContact(System.currentTimeMillis());
-        }
-
       } catch (InterruptedException ieo) {
+        LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException",
ieo);
         throw ieo;
       } catch (Exception e) {
         if (LOG.isDebugEnabled()) {
@@ -933,12 +839,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
         count = -1; //so that the (count < 0) block is executed
       }
       if (count < 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
-              " because read count=" + count +
-              ". Number of active connections: " + numConnections);
-        }
         closeConnection(c);
+        c = null;
+      } else {
+        c.setLastContact(System.currentTimeMillis());
       }
     }
 
@@ -1355,6 +1259,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       return null;
     }
 
+    public long getLastContact() {
+      return lastContact;
+    }
+
     /* Return true if the connection has no outstanding rpc */
     private boolean isIdle() {
       return rpcCount.get() == 0;
@@ -1370,10 +1278,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       rpcCount.increment();
     }
 
-    protected boolean timedOut(long currentTime) {
-      return isIdle() && currentTime - lastContact > maxIdleTime;
-    }
-
     private UserGroupInformation getAuthorizedUgi(String authorizedId)
         throws IOException {
       UserGroupInformation authorizedUgi;
@@ -1883,7 +1787,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       }
       // Enforcing the call queue size, this triggers a retry in the client
       // This is a bit late to be doing this check - we have already read in the total request.
-      if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
+      if ((totalRequestSize + callQueueSizeInBytes.get()) > maxQueueSizeInBytes) {
         final Call callTooBig =
           new Call(id, this.service, null, null, null, null, this,
             responder, totalRequestSize, null, null, 0);
@@ -1954,7 +1858,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
               totalRequestSize, traceInfo, this.addr, timeout);
 
       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
-        callQueueSize.add(-1 * call.getSize());
+        callQueueSizeInBytes.add(-1 * call.getSize());
 
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
@@ -2093,12 +1997,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
     this.bindAddress = bindAddress;
     this.conf = conf;
     this.socketSendBufferSize = 0;
-    this.maxQueueSize =
-      this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
+    // See declaration above for documentation on what this size is.
+    this.maxQueueSizeInBytes =
+      this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
-    this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
-    this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
-    this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
@@ -2120,6 +2022,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
 
     // Create the responder here
     responder = new Responder();
+    connectionManager = new ConnectionManager();
     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
     this.userProvider = UserProvider.instantiate(conf);
     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
@@ -2177,12 +2080,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
   }
 
   protected void closeConnection(Connection connection) {
-    synchronized (connectionList) {
-      if (connectionList.remove(connection)) {
-        numConnections--;
-      }
-    }
-    connection.close();
+    connectionManager.close(connection);
   }
 
   Configuration getConf() {
@@ -2440,7 +2338,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
 
   @Override
   public void addCallSize(final long diff) {
-    this.callQueueSize.add(diff);
+    this.callQueueSizeInBytes.add(diff);
   }
 
   /**
@@ -2578,6 +2476,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
   }
 
   /**
+   * The number of open RPC conections
+   * @return the number of open rpc connections
+   */
+  public int getNumOpenConnections() {
+    return connectionManager.size();
+  }
+
+  /**
    * Returns the username for any user associated with the current RPC
    * request or <code>null</code> if no user is set.
    */
@@ -2695,4 +2601,150 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
   public RpcScheduler getScheduler() {
     return scheduler;
   }
+
+  private class ConnectionManager {
+    final private AtomicInteger count = new AtomicInteger();
+    final private Set<Connection> connections;
+
+    final private Timer idleScanTimer;
+    final private int idleScanThreshold;
+    final private int idleScanInterval;
+    final private int maxIdleTime;
+    final private int maxIdleToClose;
+
+    ConnectionManager() {
+      this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port,
true);
+      this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
+      this.idleScanInterval =
+          conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
+      this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
+      this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
+      int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+          HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+      int maxConnectionQueueSize =
+          handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
+      // create a set with concurrency -and- a thread-safe iterator, add 2
+      // for listener and idle closer threads
+      this.connections = Collections.newSetFromMap(
+          new ConcurrentHashMap<Connection,Boolean>(
+              maxConnectionQueueSize, 0.75f, readThreads+2));
+    }
+
+    private boolean add(Connection connection) {
+      boolean added = connections.add(connection);
+      if (added) {
+        count.getAndIncrement();
+      }
+      return added;
+    }
+
+    private boolean remove(Connection connection) {
+      boolean removed = connections.remove(connection);
+      if (removed) {
+        count.getAndDecrement();
+      }
+      return removed;
+    }
+
+    int size() {
+      return count.get();
+    }
+
+    Connection[] toArray() {
+      return connections.toArray(new Connection[0]);
+    }
+
+    Connection register(SocketChannel channel) {
+      Connection connection = new Connection(channel, System.currentTimeMillis());
+      add(connection);
+      if (LOG.isDebugEnabled()) {
+        // Use metric names
+        LOG.debug("Server connection from " + connection +
+            "; numOpenConnections=" + size() +
+            ",  queueSize(bytes)=" + callQueueSizeInBytes.get() +
+            ", numCallsInGeneralQueue=" + scheduler.getGeneralQueueLength() +
+            ", numCallsInPriorityQueue=" + scheduler.getPriorityQueueLength());
+      }
+      return connection;
+    }
+
+    boolean close(Connection connection) {
+      boolean exists = remove(connection);
+      if (exists) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(Thread.currentThread().getName() +
+              ": disconnecting client " + connection +
+              ". Number of active connections: "+ size());
+        }
+        // only close if actually removed to avoid double-closing due
+        // to possible races
+        connection.close();
+      }
+      return exists;
+    }
+
+    // synch'ed to avoid explicit invocation upon OOM from colliding with
+    // timer task firing
+    synchronized void closeIdle(boolean scanAll) {
+      long minLastContact = System.currentTimeMillis() - maxIdleTime;
+      // concurrent iterator might miss new connections added
+      // during the iteration, but that's ok because they won't
+      // be idle yet anyway and will be caught on next scan
+      int closed = 0;
+      for (Connection connection : connections) {
+        // stop if connections dropped below threshold unless scanning all
+        if (!scanAll && size() < idleScanThreshold) {
+          break;
+        }
+        // stop if not scanning all and max connections are closed
+        if (connection.isIdle() &&
+            connection.getLastContact() < minLastContact &&
+            close(connection) &&
+            !scanAll && (++closed == maxIdleToClose)) {
+          break;
+        }
+      }
+    }
+
+    void closeAll() {
+      // use a copy of the connections to be absolutely sure the concurrent
+      // iterator doesn't miss a connection
+      for (Connection connection : toArray()) {
+        close(connection);
+      }
+    }
+
+    void startIdleScan() {
+      scheduleIdleScanTask();
+    }
+
+    void stopIdleScan() {
+      idleScanTimer.cancel();
+    }
+
+    private void scheduleIdleScanTask() {
+      if (!running) {
+        return;
+      }
+      TimerTask idleScanTask = new TimerTask(){
+        @Override
+        public void run() {
+          if (!running) {
+            return;
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(Thread.currentThread().getName()+": task running");
+          }
+          try {
+            closeIdle(false);
+          } finally {
+            // explicitly reschedule so next execution occurs relative
+            // to the end of this scan, not the beginning
+            scheduleIdleScanTask();
+          }
+        }
+      };
+      idleScanTimer.schedule(idleScanTask, idleScanInterval);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0b70c00/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
index 1f496b4..743c5bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
@@ -41,7 +41,7 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
   @Override
   public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server)
{
     int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
-		HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
 
     return new SimpleRpcScheduler(
       conf,

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0b70c00/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index ceb945b..45cec78 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -263,7 +263,7 @@ public abstract class AbstractTestIPC {
       fail("Expected an exception to have been thrown!");
     } catch (Exception e) {
       LOG.info("Caught expected exception: " + e.toString());
-      assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
+      assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
     } finally {
       rpcServer.stop();
     }


Mime
View raw message