hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1541736 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/
Date Wed, 13 Nov 2013 21:37:21 GMT
Author: daryn
Date: Wed Nov 13 21:37:21 2013
New Revision: 1541736

URL: http://svn.apache.org/r1541736
Log:
HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn)

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1541736&r1=1541735&r2=1541736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Wed Nov 13 21:37:21
2013
@@ -284,7 +284,9 @@ Trunk (Unreleased)
 
     HADOOP-7761. Improve the performance of raw comparisons. (todd)
 
-    HADOOP-8589 ViewFs tests fail when tests and home dirs are nested (sanjay Radia)
+    HADOOP-8589. ViewFs tests fail when tests and home dirs are nested (sanjay Radia)
+
+    HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn)
 
 Release 2.3.0 - UNRELEASED
 

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1541736&r1=1541735&r2=1541736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
Wed Nov 13 21:37:21 2013
@@ -65,6 +65,13 @@ public class CommonConfigurationKeys ext
   /** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */
   public static final int     IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
   
+  /** Number of pending connections that may be queued per socket reader */
+  public static final String IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY =
+      "ipc.server.read.connection-queue.size";
+  /** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */
+  public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT =
+      100;
+      
   public static final String IPC_MAXIMUM_DATA_LENGTH =
       "ipc.maximum.data.length";
   

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1541736&r1=1541735&r2=1541736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Wed Nov 13 21:37:21 2013
@@ -345,6 +345,7 @@ public abstract class Server {
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
   private int readThreads;                        // number of read threads
+  private int readerPendingConnectionQueue;         // number of connections to queue per
read thread
   private Class<? extends Writable> rpcRequestClass;   // class used for deserializing
the rpc request
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
@@ -553,12 +554,14 @@ public abstract class Server {
     }
     
     private class Reader extends Thread {
-      private volatile boolean adding = false;
+      final private BlockingQueue<Connection> pendingConnections;
       private final Selector readSelector;
 
       Reader(String name) throws IOException {
         super(name);
 
+        this.pendingConnections =
+            new LinkedBlockingQueue<Connection>(readerPendingConnectionQueue);
         this.readSelector = Selector.open();
       }
       
@@ -580,10 +583,14 @@ public abstract class Server {
         while (running) {
           SelectionKey key = null;
           try {
+            // consume as many connections as currently queued to avoid
+            // unbridled acceptance of connections that starves the select
+            int size = pendingConnections.size();
+            for (int i=size; i>0; i--) {
+              Connection conn = pendingConnections.take();
+              conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
+            }
             readSelector.select();
-            while (adding) {
-              this.wait(1000);
-            }              
 
             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
             while (iter.hasNext()) {
@@ -607,26 +614,14 @@ public abstract class Server {
       }
 
       /**
-       * This gets reader into the state that waits for the new channel
-       * to be registered with readSelector. If it was waiting in select()
-       * the thread will be woken up, otherwise whenever select() is called
-       * it will return even if there is nothing to read and wait
-       * in while(adding) for finishAdd call
+       * Updating the readSelector while it's being used is not thread-safe,
+       * so the connection must be queued.  The reader will drain the queue
+       * and update its readSelector before performing the next select
        */
-      public void startAdd() {
-        adding = true;
+      public void addConnection(Connection conn) throws InterruptedException {
+        pendingConnections.put(conn);
         readSelector.wakeup();
       }
-      
-      public synchronized SelectionKey registerChannel(SocketChannel channel)
-                                                          throws IOException {
-          return channel.register(readSelector, SelectionKey.OP_READ);
-      }
-
-      public synchronized void finishAdd() {
-        adding = false;
-        this.notify();        
-      }
 
       void shutdown() {
         assert !running;
@@ -766,20 +761,23 @@ public abstract class Server {
         
         Reader reader = getReader();
         try {
-          reader.startAdd();
-          SelectionKey readKey = reader.registerChannel(channel);
-          c = new Connection(readKey, channel, Time.now());
-          readKey.attach(c);
+          c = new Connection(channel, Time.now());
           synchronized (connectionList) {
             connectionList.add(numConnections, c);
             numConnections++;
           }
+          reader.addConnection(c);
           if (LOG.isDebugEnabled())
             LOG.debug("Server connection from " + c.toString() +
                 "; # active connections: " + numConnections +
                 "; # queued calls: " + callQueue.size());          
-        } finally {
-          reader.finishAdd(); 
+        } catch (InterruptedException ie) {
+          if (running) {
+            LOG.info(
+                getName() + ": disconnecting client " + c.getHostAddress() +
+                " due to unexpected interrupt");
+          }
+          closeConnection(c);
         }
       }
     }
@@ -1190,8 +1188,7 @@ public abstract class Server {
     private boolean sentNegotiate = false;
     private boolean useWrap = false;
     
-    public Connection(SelectionKey key, SocketChannel channel, 
-                      long lastContact) {
+    public Connection(SocketChannel channel, long lastContact) {
       this.channel = channel;
       this.lastContact = lastContact;
       this.data = null;
@@ -2189,6 +2186,9 @@ public abstract class Server {
           CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
           CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
     }
+    this.readerPendingConnectionQueue = conf.getInt(
+        CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
+        CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
     this.maxIdleTime = 2 * conf.getInt(
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1541736&r1=1541735&r2=1541736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
Wed Nov 13 21:37:21 2013
@@ -44,12 +44,16 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.SocketFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
@@ -613,6 +617,148 @@ public class TestIPC {
     server.stop();
   }
 
+  private static class TestServerQueue extends Server {
+    final CountDownLatch firstCallLatch = new CountDownLatch(1);
+    final CountDownLatch callBlockLatch = new CountDownLatch(1);
+    
+    TestServerQueue(int expectedCalls, int readers, int callQ, int handlers,
+        Configuration conf) throws IOException {
+      super(ADDRESS, 0, LongWritable.class, handlers, readers, callQ, conf, null, null);

+    }
+
+    @Override
+    public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param,
+        long receiveTime) throws IOException {
+      firstCallLatch.countDown();
+      try {
+        callBlockLatch.await();
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+      return param;
+    }
+  }
+
+  /**
+   * Check that reader queueing works
+   * @throws BrokenBarrierException 
+   * @throws InterruptedException 
+   */
+  @Test(timeout=60000)
+  public void testIpcWithReaderQueuing() throws Exception {
+    // 1 reader, 1 connectionQ slot, 1 callq
+    for (int i=0; i < 10; i++) {
+      checkBlocking(1, 1, 1);
+    }
+    // 4 readers, 5 connectionQ slots, 2 callq
+    for (int i=0; i < 10; i++) {
+      checkBlocking(4, 5, 2);
+    }
+  }
+  
+  // goal is to jam a handler with a connection, fill the callq with
+  // connections, in turn jamming the readers - then flood the server and
+  // ensure that the listener blocks when the reader connection queues fill
+  private void checkBlocking(int readers, int readerQ, int callQ) throws Exception {
+    int handlers = 1; // makes it easier
+    
+    final Configuration conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, readerQ);
+
+    // send in enough clients to block up the handlers, callq, and readers
+    int initialClients = readers + callQ + handlers;
+    // max connections we should ever end up accepting at once
+    int maxAccept = initialClients + readers*readerQ + 1; // 1 = listener
+    // stress it with 2X the max
+    int clients = maxAccept*2;
+    
+    final AtomicInteger failures = new AtomicInteger(0);
+    final CountDownLatch callFinishedLatch = new CountDownLatch(clients);
+
+    // start server
+    final TestServerQueue server =
+        new TestServerQueue(clients, readers, callQ, handlers, conf);
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
+
+    Client.setConnectTimeout(conf, 10000);
+    
+    // instantiate the threads, will start in batches
+    Thread[] threads = new Thread[clients];
+    for (int i=0; i<clients; i++) {
+      threads[i] = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          Client client = new Client(LongWritable.class, conf);
+          try {
+            client.call(new LongWritable(Thread.currentThread().getId()),
+                addr, null, null, 60000, conf);
+          } catch (Throwable e) {
+            LOG.error(e);
+            failures.incrementAndGet();
+            return;
+          } finally {
+            callFinishedLatch.countDown();            
+            client.stop();
+          }
+        }
+      });
+    }
+    
+    // start enough clients to block up the handler, callq, and each reader;
+    // let the calls sequentially slot in to avoid some readers blocking
+    // and others not blocking in the race to fill the callq
+    for (int i=0; i < initialClients; i++) {
+      threads[i].start();
+      if (i==0) {
+        // let first reader block in a call
+        server.firstCallLatch.await();
+      } else if (i <= callQ) {
+        // let subsequent readers jam the callq, will happen immediately 
+        while (server.getCallQueueLen() != i) {
+          Thread.sleep(1);
+        }
+      } // additional threads block the readers trying to add to the callq
+    }
+
+    // wait till everything is slotted, should happen immediately
+    Thread.sleep(10);
+    if (server.getNumOpenConnections() < initialClients) {
+      LOG.info("(initial clients) need:"+initialClients+" connections have:"+server.getNumOpenConnections());
+      Thread.sleep(100);
+    }
+    LOG.info("ipc layer should be blocked");
+    assertEquals(callQ, server.getCallQueueLen());
+    assertEquals(initialClients, server.getNumOpenConnections());
+    
+    // now flood the server with the rest of the connections, the reader's
+    // connection queues should fill and then the listener should block
+    for (int i=initialClients; i<clients; i++) {
+      threads[i].start();
+    }
+    Thread.sleep(10);
+    if (server.getNumOpenConnections() < maxAccept) {
+      LOG.info("(max clients) need:"+maxAccept+" connections have:"+server.getNumOpenConnections());
+      Thread.sleep(100);
+    }
+    // check a few times to make sure we didn't go over
+    for (int i=0; i<4; i++) {
+      assertEquals(maxAccept, server.getNumOpenConnections());
+      Thread.sleep(100);
+    }
+    
+    // sanity check that no calls have finished
+    assertEquals(clients, callFinishedLatch.getCount());
+    LOG.info("releasing the calls");
+    server.callBlockLatch.countDown();
+    callFinishedLatch.await();
+    for (Thread t : threads) {
+      t.join();
+    }
+    assertEquals(0, failures.get());
+    server.stop();
+  }
+
   /**
    * Make a call from a client and verify if header info is changed in server side
    */



Mime
View raw message