hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject hbase git commit: Revert "HBASE-15948 Port "HADOOP-9956 RPC listener inefficiently assigns connections to readers"" Revert mistaken commit... This reverts commit e0b70c00e74aeaac33570508e3732a53daea839e.
Date Tue, 07 Jun 2016 23:41:51 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 6d5a25935 -> e66ecd7db


Revert "HBASE-15948 Port "HADOOP-9956 RPC listener inefficiently assigns connections to readers""
Revert mistaken commit...
This reverts commit e0b70c00e74aeaac33570508e3732a53daea839e.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e66ecd7d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e66ecd7d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e66ecd7d

Branch: refs/heads/master
Commit: e66ecd7db68d6ef57084543d08f7774c82f22f45
Parents: 6d5a259
Author: stack <stack@apache.org>
Authored: Tue Jun 7 16:41:30 2016 -0700
Committer: stack <stack@apache.org>
Committed: Tue Jun 7 16:41:30 2016 -0700

----------------------------------------------------------------------
 .../hbase/ipc/MetricsHBaseServerSource.java     |  10 +-
 .../ipc/MetricsHBaseServerWrapperImpl.java      |   6 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 408 ++++++++-----------
 .../regionserver/SimpleRpcSchedulerFactory.java |   2 +-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       |   2 +-
 5 files changed, 187 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e66ecd7d/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index ce57e0f..bb89789 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -52,16 +52,14 @@ public interface MetricsHBaseServerSource extends BaseSource {
   String TOTAL_CALL_TIME_NAME = "totalCallTime";
   String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time.";
   String QUEUE_SIZE_NAME = "queueSize";
-  String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and
" +
-    "parsed and is waiting to run or is currently being executed.";
+  String QUEUE_SIZE_DESC = "Number of bytes in the call queues.";
   String GENERAL_QUEUE_NAME = "numCallsInGeneralQueue";
-  String GENERAL_QUEUE_DESC = "Number of calls in the general call queue; " +
-    "parsed requests waiting in scheduler to be executed";
+  String GENERAL_QUEUE_DESC = "Number of calls in the general call queue.";
   String PRIORITY_QUEUE_NAME = "numCallsInPriorityQueue";
   String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
   String REPLICATION_QUEUE_DESC =
-      "Number of calls in the replication call queue waiting to be run";
-  String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be
run";
+      "Number of calls in the replication call queue.";
+  String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue.";
   String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections";
   String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
   String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";

http://git-wip-us.apache.org/repos/asf/hbase/blob/e66ecd7d/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
index 4f53709..9979c75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
@@ -36,7 +36,7 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
     if (!isServerStarted()) {
       return 0;
     }
-    return server.callQueueSizeInBytes.get();
+    return server.callQueueSize.get();
   }
 
   @Override
@@ -65,10 +65,10 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
 
   @Override
   public int getNumOpenConnections() {
-    if (!isServerStarted()) {
+    if (!isServerStarted() || this.server.connectionList == null) {
       return 0;
     }
-    return server.getNumOpenConnections();
+    return server.connectionList.size();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e66ecd7d/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index aca3fdd..483ce86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -48,16 +48,15 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -114,7 +113,6 @@ import org.apache.hadoop.hbase.util.Counter;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -185,6 +183,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
    */
   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
 
+  /**
+   * The maximum size that we can hold in the RPC queue
+   */
+  private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
+
   private final IPCUtil ipcUtil;
 
   private static final String AUTH_FAILED_FOR = "Auth failed for ";
@@ -207,30 +210,22 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
   protected int port;                             // port we listen on
   protected InetSocketAddress address;            // inet address we listen on
   private int readThreads;                        // number of read threads
+  protected int maxIdleTime;                      // the maximum idle time after
+                                                  // which a client may be
+                                                  // disconnected
+  protected int thresholdIdleConnections;         // the number of idle
+                                                  // connections after which we
+                                                  // will start cleaning up idle
+                                                  // connections
+  int maxConnectionsToNuke;                       // the max number of
+                                                  // connections to nuke
+                                                  // during a cleanup
+
   protected MetricsHBaseServer metrics;
 
   protected final Configuration conf;
 
-  /**
-   * Maximum size in bytes of the currently queued and running Calls. If a new Call puts
us over
-   * this size, then we will reject the call (after parsing it though). It will go back to
the
-   * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size".
The
-   * call queue size gets incremented after we parse a call and before we add it to the queue
of
-   * calls for the scheduler to use. It get decremented after we have 'run' the Call. The
current
-   * size is kept in {@link #callQueueSizeInBytes}.
-   * @see {@link #callQueueSizeInBytes}
-   * @see {@link #DEFAULT_MAX_CALLQUEUE_SIZE}
-   * @see {@link #callQueueSizeInBytes}
-   */
-  private final long maxQueueSizeInBytes;
-  private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
-
-  /**
-   * This is a running count of the size in bytes of all outstanding calls whether currently
-   * executing or queued waiting to be run.
-   */
-  protected final Counter callQueueSizeInBytes = new Counter();
-
+  private int maxQueueSize;
   protected int socketSendBufferSize;
   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
   protected final boolean tcpKeepAlive; // if T then use keepalives
@@ -249,11 +244,19 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
    */
   volatile boolean started = false;
 
-  // maintains the set of client connections and handles idle timeouts
-  private ConnectionManager connectionManager;
+  /**
+   * This is a running count of the size of all outstanding calls by size.
+   */
+  protected final Counter callQueueSize = new Counter();
+
+  protected final List<Connection> connectionList =
+    Collections.synchronizedList(new LinkedList<Connection>());
+  //maintain a list
+  //of client connections
   private Listener listener = null;
   protected Responder responder = null;
   protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
+  protected int numConnections = 0;
 
   protected HBaseRPCErrorHandler errorHandler = null;
 
@@ -620,13 +623,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
     private Selector selector = null; //the selector that we use for the server
     private Reader[] readers = null;
     private int currentReader = 0;
+    private Random rand = new Random();
+    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
+                                         //-tion (for idle connections) ran
+    private long cleanupInterval = 10000; //the minimum interval between
+                                          //two cleanup runs
+    private int backlogLength;
 
     private ExecutorService readPool;
 
     public Listener(final String name) throws IOException {
       super(name);
-      // The backlog of requests that we will have the serversocket carry.
-      int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
+      backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
       // Create a new server socket and set to non blocking mode
       acceptChannel = ServerSocketChannel.open();
       acceptChannel.configureBlocking(false);
@@ -636,11 +644,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
       address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
       // create a selector;
-      selector = Selector.open();
+      selector= Selector.open();
 
       readers = new Reader[readThreads];
-      // Why this executor thing? Why not like hadoop just start up all the threads? I suppose
it
-      // has an advantage in that it is easy to shutdown the pool.
       readPool = Executors.newFixedThreadPool(readThreads,
         new ThreadFactoryBuilder().setNameFormat(
           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
@@ -661,12 +667,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
 
 
     private class Reader implements Runnable {
+      private volatile boolean adding = false;
       private final Selector readSelector;
 
       Reader() throws IOException {
         this.readSelector = Selector.open();
       }
-
       @Override
       public void run() {
         try {
@@ -680,10 +686,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
         }
       }
 
-      private void doRunLoop() {
+      private synchronized void doRunLoop() {
         while (running) {
           try {
             readSelector.select();
+            while (adding) {
+              this.wait(1000);
+            }
+
             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
             while (iter.hasNext()) {
               SelectionKey key = iter.next();
@@ -693,12 +703,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
                   doRead(key);
                 }
               }
-              key = null;
             }
           } catch (InterruptedException e) {
-            if (running) {                      // unexpected -- log it
-              LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
-            }
+            LOG.debug("Interrupted while sleeping");
             return;
           } catch (IOException ex) {
             LOG.info(getName() + ": IOException in Reader", ex);
@@ -707,14 +714,76 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       }
 
       /**
-       * Updating the readSelector while it's being used is not thread-safe,
-       * so the connection must be queued.  The reader will drain the queue
-       * and update its readSelector before performing the next select
+       * This gets reader into the state that waits for the new channel
+       * to be registered with readSelector. If it was waiting in select()
+       * the thread will be woken up, otherwise whenever select() is called
+       * it will return even if there is nothing to read and wait
+       * in while(adding) for finishAdd call
        */
-      public void addConnection(Connection conn) throws IOException {
-        conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
+      public void startAdd() {
+        adding = true;
         readSelector.wakeup();
       }
+
+      public synchronized SelectionKey registerChannel(SocketChannel channel)
+        throws IOException {
+        return channel.register(readSelector, SelectionKey.OP_READ);
+      }
+
+      public synchronized void finishAdd() {
+        adding = false;
+        this.notify();
+      }
+    }
+
+    /** cleanup connections from connectionList. Choose a random range
+     * to scan and also have a limit on the number of the connections
+     * that will be cleanedup per run. The criteria for cleanup is the time
+     * for which the connection was idle. If 'force' is true then all
+     * connections will be looked at for the cleanup.
+     * @param force all connections will be looked at for cleanup
+     */
+    private void cleanupConnections(boolean force) {
+      if (force || numConnections > thresholdIdleConnections) {
+        long currentTime = System.currentTimeMillis();
+        if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
+          return;
+        }
+        int start = 0;
+        int end = numConnections - 1;
+        if (!force) {
+          start = rand.nextInt() % numConnections;
+          end = rand.nextInt() % numConnections;
+          int temp;
+          if (end < start) {
+            temp = start;
+            start = end;
+            end = temp;
+          }
+        }
+        int i = start;
+        int numNuked = 0;
+        while (i <= end) {
+          Connection c;
+          synchronized (connectionList) {
+            try {
+              c = connectionList.get(i);
+            } catch (Exception e) {return;}
+          }
+          if (c.timedOut(currentTime)) {
+            if (LOG.isDebugEnabled())
+              LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
+            closeConnection(c);
+            numNuked++;
+            end--;
+            //noinspection UnusedAssignment
+            c = null;
+            if (!force && numNuked == maxConnectionsToNuke) break;
+          }
+          else i++;
+        }
+        lastCleanupRunTime = System.currentTimeMillis();
+      }
     }
 
     @Override
@@ -723,7 +792,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
         "it will have per impact")
     public void run() {
       LOG.info(getName() + ": starting");
-      connectionManager.startIdleScan();
       while (running) {
         SelectionKey key = null;
         try {
@@ -747,7 +815,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
             if (errorHandler.checkOOME(e)) {
               LOG.info(getName() + ": exiting on OutOfMemoryError");
               closeCurrentConnection(key, e);
-              connectionManager.closeIdle(true);
+              cleanupConnections(true);
               return;
             }
           } else {
@@ -756,18 +824,22 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
             // some thread(s) a chance to finish
             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
             closeCurrentConnection(key, e);
-            connectionManager.closeIdle(true);
+            cleanupConnections(true);
             try {
               Thread.sleep(60000);
             } catch (InterruptedException ex) {
               LOG.debug("Interrupted while sleeping");
+              return;
             }
           }
         } catch (Exception e) {
           closeCurrentConnection(key, e);
         }
+        cleanupConnections(false);
       }
+
       LOG.info(getName() + ": stopping");
+
       synchronized (this) {
         try {
           acceptChannel.close();
@@ -779,9 +851,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
         selector= null;
         acceptChannel= null;
 
-        // close all connections
-        connectionManager.stopIdleScan();
-        connectionManager.closeAll();
+        // clean up all connections
+        while (!connectionList.isEmpty()) {
+          closeConnection(connectionList.remove(0));
+        }
       }
     }
 
@@ -789,6 +862,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       if (key != null) {
         Connection c = (Connection)key.attachment();
         if (c != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
+                (e != null ? " on error " + e.getMessage() : ""));
+          }
           closeConnection(c);
           key.attach(null);
         }
@@ -799,24 +876,37 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       return address;
     }
 
-    void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError
{
+    void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
+      Connection c;
       ServerSocketChannel server = (ServerSocketChannel) key.channel();
+
       SocketChannel channel;
       while ((channel = server.accept()) != null) {
-        channel.configureBlocking(false);
-        channel.socket().setTcpNoDelay(tcpNoDelay);
-        channel.socket().setKeepAlive(tcpKeepAlive);
+        try {
+          channel.configureBlocking(false);
+          channel.socket().setTcpNoDelay(tcpNoDelay);
+          channel.socket().setKeepAlive(tcpKeepAlive);
+        } catch (IOException ioe) {
+          channel.close();
+          throw ioe;
+        }
+
         Reader reader = getReader();
-        Connection c = connectionManager.register(channel);
-        // If the connectionManager can't take it, close the connection.
-        if (c == null) {
-          if (channel.isOpen()) {
-            IOUtils.cleanup(null, channel);
+        try {
+          reader.startAdd();
+          SelectionKey readKey = reader.registerChannel(channel);
+          c = getConnection(channel, System.currentTimeMillis());
+          readKey.attach(c);
+          synchronized (connectionList) {
+            connectionList.add(numConnections, c);
+            numConnections++;
           }
-          continue;
+          if (LOG.isDebugEnabled())
+            LOG.debug(getName() + ": connection from " + c.toString() +
+                "; # active connections: " + numConnections);
+        } finally {
+          reader.finishAdd();
         }
-        key.attach(c);  // so closeCurrentConnection can get the object
-        reader.addConnection(c);
       }
     }
 
@@ -829,8 +919,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       c.setLastContact(System.currentTimeMillis());
       try {
         count = c.readAndProcess();
+
+        if (count > 0) {
+          c.setLastContact(System.currentTimeMillis());
+        }
+
       } catch (InterruptedException ieo) {
-        LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException",
ieo);
         throw ieo;
       } catch (Exception e) {
         if (LOG.isDebugEnabled()) {
@@ -839,10 +933,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
         count = -1; //so that the (count < 0) block is executed
       }
       if (count < 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
+              " because read count=" + count +
+              ". Number of active connections: " + numConnections);
+        }
         closeConnection(c);
-        c = null;
-      } else {
-        c.setLastContact(System.currentTimeMillis());
       }
     }
 
@@ -1259,10 +1355,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       return null;
     }
 
-    public long getLastContact() {
-      return lastContact;
-    }
-
     /* Return true if the connection has no outstanding rpc */
     private boolean isIdle() {
       return rpcCount.get() == 0;
@@ -1278,6 +1370,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       rpcCount.increment();
     }
 
+    protected boolean timedOut(long currentTime) {
+      return isIdle() && currentTime - lastContact > maxIdleTime;
+    }
+
     private UserGroupInformation getAuthorizedUgi(String authorizedId)
         throws IOException {
       UserGroupInformation authorizedUgi;
@@ -1787,7 +1883,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       }
       // Enforcing the call queue size, this triggers a retry in the client
       // This is a bit late to be doing this check - we have already read in the total request.
-      if ((totalRequestSize + callQueueSizeInBytes.get()) > maxQueueSizeInBytes) {
+      if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
         final Call callTooBig =
           new Call(id, this.service, null, null, null, null, this,
             responder, totalRequestSize, null, null, 0);
@@ -1858,7 +1954,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
               totalRequestSize, traceInfo, this.addr, timeout);
 
       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
-        callQueueSizeInBytes.add(-1 * call.getSize());
+        callQueueSize.add(-1 * call.getSize());
 
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
@@ -1997,10 +2093,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
     this.bindAddress = bindAddress;
     this.conf = conf;
     this.socketSendBufferSize = 0;
-    // See declaration above for documentation on what this size is.
-    this.maxQueueSizeInBytes =
-      this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
+    this.maxQueueSize =
+      this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
+    this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
+    this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
+    this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
@@ -2022,7 +2120,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
 
     // Create the responder here
     responder = new Responder();
-    connectionManager = new ConnectionManager();
     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
     this.userProvider = UserProvider.instantiate(conf);
     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
@@ -2080,7 +2177,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
   }
 
   protected void closeConnection(Connection connection) {
-    connectionManager.close(connection);
+    synchronized (connectionList) {
+      if (connectionList.remove(connection)) {
+        numConnections--;
+      }
+    }
+    connection.close();
   }
 
   Configuration getConf() {
@@ -2338,7 +2440,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
 
   @Override
   public void addCallSize(final long diff) {
-    this.callQueueSizeInBytes.add(diff);
+    this.callQueueSize.add(diff);
   }
 
   /**
@@ -2476,14 +2578,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
   }
 
   /**
-   * The number of open RPC conections
-   * @return the number of open rpc connections
-   */
-  public int getNumOpenConnections() {
-    return connectionManager.size();
-  }
-
-  /**
    * Returns the username for any user associated with the current RPC
    * request or <code>null</code> if no user is set.
    */
@@ -2601,150 +2695,4 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
   public RpcScheduler getScheduler() {
     return scheduler;
   }
-
-  private class ConnectionManager {
-    final private AtomicInteger count = new AtomicInteger();
-    final private Set<Connection> connections;
-
-    final private Timer idleScanTimer;
-    final private int idleScanThreshold;
-    final private int idleScanInterval;
-    final private int maxIdleTime;
-    final private int maxIdleToClose;
-
-    ConnectionManager() {
-      this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port,
true);
-      this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
-      this.idleScanInterval =
-          conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
-      this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
-      this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
-      int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
-          HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
-      int maxConnectionQueueSize =
-          handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
-      // create a set with concurrency -and- a thread-safe iterator, add 2
-      // for listener and idle closer threads
-      this.connections = Collections.newSetFromMap(
-          new ConcurrentHashMap<Connection,Boolean>(
-              maxConnectionQueueSize, 0.75f, readThreads+2));
-    }
-
-    private boolean add(Connection connection) {
-      boolean added = connections.add(connection);
-      if (added) {
-        count.getAndIncrement();
-      }
-      return added;
-    }
-
-    private boolean remove(Connection connection) {
-      boolean removed = connections.remove(connection);
-      if (removed) {
-        count.getAndDecrement();
-      }
-      return removed;
-    }
-
-    int size() {
-      return count.get();
-    }
-
-    Connection[] toArray() {
-      return connections.toArray(new Connection[0]);
-    }
-
-    Connection register(SocketChannel channel) {
-      Connection connection = new Connection(channel, System.currentTimeMillis());
-      add(connection);
-      if (LOG.isDebugEnabled()) {
-        // Use metric names
-        LOG.debug("Server connection from " + connection +
-            "; numOpenConnections=" + size() +
-            ",  queueSize(bytes)=" + callQueueSizeInBytes.get() +
-            ", numCallsInGeneralQueue=" + scheduler.getGeneralQueueLength() +
-            ", numCallsInPriorityQueue=" + scheduler.getPriorityQueueLength());
-      }
-      return connection;
-    }
-
-    boolean close(Connection connection) {
-      boolean exists = remove(connection);
-      if (exists) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(Thread.currentThread().getName() +
-              ": disconnecting client " + connection +
-              ". Number of active connections: "+ size());
-        }
-        // only close if actually removed to avoid double-closing due
-        // to possible races
-        connection.close();
-      }
-      return exists;
-    }
-
-    // synch'ed to avoid explicit invocation upon OOM from colliding with
-    // timer task firing
-    synchronized void closeIdle(boolean scanAll) {
-      long minLastContact = System.currentTimeMillis() - maxIdleTime;
-      // concurrent iterator might miss new connections added
-      // during the iteration, but that's ok because they won't
-      // be idle yet anyway and will be caught on next scan
-      int closed = 0;
-      for (Connection connection : connections) {
-        // stop if connections dropped below threshold unless scanning all
-        if (!scanAll && size() < idleScanThreshold) {
-          break;
-        }
-        // stop if not scanning all and max connections are closed
-        if (connection.isIdle() &&
-            connection.getLastContact() < minLastContact &&
-            close(connection) &&
-            !scanAll && (++closed == maxIdleToClose)) {
-          break;
-        }
-      }
-    }
-
-    void closeAll() {
-      // use a copy of the connections to be absolutely sure the concurrent
-      // iterator doesn't miss a connection
-      for (Connection connection : toArray()) {
-        close(connection);
-      }
-    }
-
-    void startIdleScan() {
-      scheduleIdleScanTask();
-    }
-
-    void stopIdleScan() {
-      idleScanTimer.cancel();
-    }
-
-    private void scheduleIdleScanTask() {
-      if (!running) {
-        return;
-      }
-      TimerTask idleScanTask = new TimerTask(){
-        @Override
-        public void run() {
-          if (!running) {
-            return;
-          }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(Thread.currentThread().getName()+": task running");
-          }
-          try {
-            closeIdle(false);
-          } finally {
-            // explicitly reschedule so next execution occurs relative
-            // to the end of this scan, not the beginning
-            scheduleIdleScanTask();
-          }
-        }
-      };
-      idleScanTimer.schedule(idleScanTask, idleScanInterval);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e66ecd7d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
index 743c5bb..1f496b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
@@ -41,7 +41,7 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
   @Override
   public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server)
{
     int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
-        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+		HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
 
     return new SimpleRpcScheduler(
       conf,

http://git-wip-us.apache.org/repos/asf/hbase/blob/e66ecd7d/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 45cec78..ceb945b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -263,7 +263,7 @@ public abstract class AbstractTestIPC {
       fail("Expected an exception to have been thrown!");
     } catch (Exception e) {
       LOG.info("Caught expected exception: " + e.toString());
-      assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
+      assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
     } finally {
       rpcServer.stop();
     }


Mime
View raw message