hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raw...@apache.org
Subject svn commit: r997512 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
Date Wed, 15 Sep 2010 21:26:40 GMT
Author: rawson
Date: Wed Sep 15 21:26:40 2010
New Revision: 997512

URL: http://svn.apache.org/viewvc?rev=997512&view=rev
Log:
HBASE-2941  port HADOOP-6713 - threading scalability for RPC reads - to HBase


Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=997512&r1=997511&r2=997512&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Sep 15 21:26:40 2010
@@ -909,6 +909,7 @@ Release 0.21.0 - Unreleased
    HBASE-2977  Refactor master command line to a new class
    HBASE-2980  Refactor region server command line to a new class
    HBASE-2988  Support alternate compression for major compactions
+   HBASE-2941  port HADOOP-6713 - threading scalability for RPC reads - to HBase
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=997512&r1=997511&r2=997512&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Wed Sep 15 21:26:40
2010
@@ -58,6 +58,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
@@ -129,6 +131,7 @@ public abstract class HBaseServer {
   protected String bindAddress;
   protected int port;                             // port we listen on
   private int handlerCount;                       // number of handler threads
+  private int readThreads;                        // number of read threads
   protected Class<? extends Writable> paramClass; // class of call parameters
   protected int maxIdleTime;                      // the maximum idle time after
                                                   // which a client may be
@@ -227,6 +230,8 @@ public abstract class HBaseServer {
 
     private ServerSocketChannel acceptChannel = null; //the accept channel
     private Selector selector = null; //the selector that we use for the server
+    private Reader[] readers = null;
+    private int currentReader = 0;
     private InetSocketAddress address; //the address we bind at
     private Random rand = new Random();
     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
@@ -235,6 +240,8 @@ public abstract class HBaseServer {
                                           //two cleanup runs
     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
 
+    private ExecutorService readPool;
+
     public Listener() throws IOException {
       address = new InetSocketAddress(bindAddress, port);
       // Create a new server socket and set to non blocking mode
@@ -247,11 +254,86 @@ public abstract class HBaseServer {
       // create a selector;
       selector= Selector.open();
 
+      readers = new Reader[readThreads];
+      readPool = Executors.newFixedThreadPool(readThreads);
+      for (int i = 0; i < readThreads; ++i) {
+        Selector readSelector = Selector.open();
+        Reader reader = new Reader(readSelector);
+        readers[i] = reader;
+        readPool.execute(reader);
+      }
+
       // Register accepts on the server socket with the selector.
       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
       this.setName("IPC Server listener on " + port);
       this.setDaemon(true);
     }
+
+
+    private class Reader implements Runnable {
+      private volatile boolean adding = false;
+      private Selector readSelector = null;
+
+      Reader(Selector readSelector) {
+        this.readSelector = readSelector;
+      }
+      public void run() {
+        LOG.info("Starting SocketReader");
+        synchronized(this) {
+          while (running) {
+            SelectionKey key = null;
+            try {
+              readSelector.select();
+              while (adding) {
+                this.wait(1000);
+              }
+
+              Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
+              while (iter.hasNext()) {
+                key = iter.next();
+                iter.remove();
+                if (key.isValid()) {
+                  if (key.isReadable()) {
+                    doRead(key);
+                  }
+                }
+                key = null;
+              }
+            } catch (InterruptedException e) {
+              if (running) {                     // unexpected -- log it
+                LOG.info(getName() + "caught: " +
+                    StringUtils.stringifyException(e));
+              }
+            } catch (IOException ex) {
+               LOG.error("Error in Reader", ex);
+            }
+          }
+        }
+      }
+
+      /**
+       * This gets reader into the state that waits for the new channel
+       * to be registered with readSelector. If it was waiting in select()
+       * the thread will be woken up, otherwise whenever select() is called
+       * it will return even if there is nothing to read and wait
+       * in while(adding) for finishAdd call
+       */
+      public void startAdd() {
+        adding = true;
+        readSelector.wakeup();
+      }
+
+      public synchronized SelectionKey registerChannel(SocketChannel channel)
+        throws IOException {
+        return channel.register(readSelector, SelectionKey.OP_READ);
+      }
+
+      public synchronized void finishAdd() {
+        adding = false;
+        this.notify();
+      }
+    }
+
     /** cleanup connections from connectionList. Choose a random range
      * to scan and also have a limit on the number of the connections
      * that will be cleanedup per run. The criteria for cleanup is the time
@@ -319,8 +401,6 @@ public abstract class HBaseServer {
               if (key.isValid()) {
                 if (key.isAcceptable())
                   doAccept(key);
-                else if (key.isReadable())
-                  doRead(key);
               }
             } catch (IOException ignored) {
             }
@@ -343,11 +423,6 @@ public abstract class HBaseServer {
             cleanupConnections(true);
             try { Thread.sleep(60000); } catch (Exception ignored) {}
       }
-        } catch (InterruptedException e) {
-          if (running) {                          // unexpected -- log it
-            LOG.info(getName() + " caught: " +
-                     StringUtils.stringifyException(e));
-          }
         } catch (Exception e) {
           closeCurrentConnection(key);
         }
@@ -389,25 +464,30 @@ public abstract class HBaseServer {
     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
       Connection c;
       ServerSocketChannel server = (ServerSocketChannel) key.channel();
-      // accept up to 10 connections
-      for (int i=0; i<10; i++) {
-        SocketChannel channel = server.accept();
-        if (channel==null) return;
 
+      SocketChannel channel;
+      while ((channel = server.accept()) != null) {
         channel.configureBlocking(false);
         channel.socket().setTcpNoDelay(tcpNoDelay);
         channel.socket().setKeepAlive(tcpKeepAlive);
-        SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
-        c = new Connection(channel, System.currentTimeMillis());
-        readKey.attach(c);
-        synchronized (connectionList) {
-          connectionList.add(numConnections, c);
-          numConnections++;
+
+        Reader reader = getReader();
+        try {
+          reader.startAdd();
+          SelectionKey readKey = reader.registerChannel(channel);
+          c = new Connection(channel, System.currentTimeMillis());
+          readKey.attach(c);
+          synchronized (connectionList) {
+            connectionList.add(numConnections, c);
+            numConnections++;
+          }
+          if (LOG.isDebugEnabled())
+            LOG.debug("Server connection from " + c.toString() +
+                "; # active connections: " + numConnections +
+                "; # queued calls: " + callQueue.size());
+        } finally {
+          reader.finishAdd();
         }
-        if (LOG.isDebugEnabled())
-          LOG.debug("Server connection from " + c.toString() +
-              "; # active connections: " + numConnections +
-              "; # queued calls: " + callQueue.size());
       }
     }
 
@@ -452,6 +532,14 @@ public abstract class HBaseServer {
           LOG.info(getName() + ":Exception in closing listener socket. " + e);
         }
       }
+      readPool.shutdownNow();
+    }
+
+    // The method that will return the next reader to work with
+    // Simplistic implementation of round robin for now
+    Reader getReader() {
+      currentReader = (currentReader + 1) % readers.length;
+      return readers[currentReader];
     }
   }
 
@@ -993,6 +1081,9 @@ public abstract class HBaseServer {
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
     this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
+     this.readThreads = conf.getInt(
+        "ipc.server.read.threadpool.size",
+        10);
     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize);
     this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);



Mime
View raw message