Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CB81C10F57 for ; Wed, 13 Nov 2013 21:48:43 +0000 (UTC) Received: (qmail 79162 invoked by uid 500); 13 Nov 2013 21:48:43 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 79102 invoked by uid 500); 13 Nov 2013 21:48:43 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 79095 invoked by uid 99); 13 Nov 2013 21:48:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Nov 2013 21:48:43 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Nov 2013 21:48:41 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id BADB123889BB; Wed, 13 Nov 2013 21:48:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1541743 - in /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/ Date: Wed, 13 Nov 2013 21:48:21 -0000 To: common-commits@hadoop.apache.org From: daryn@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131113214821.BADB123889BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: daryn Date: Wed Nov 13 21:48:21 2013 New Revision: 1541743 URL: http://svn.apache.org/r1541743 Log: svn merge -c 1541736 FIXES: HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn) Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1541743&r1=1541742&r2=1541743&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Wed Nov 13 21:48:21 2013 @@ -2038,6 +2038,8 @@ Release 0.23.10 - UNRELEASED OPTIMIZATIONS + HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn) + BUG FIXES Release 0.23.9 - 2013-07-08 Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1541743&r1=1541742&r2=1541743&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java Wed Nov 13 21:48:21 2013 @@ -65,6 +65,13 @@ public class CommonConfigurationKeys ext /** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */ public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1; + /** Number of pending connections that may be queued per socket reader */ + public static final String IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY = + "ipc.server.read.connection-queue.size"; + /** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */ + public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT = + 100; + public static final String IPC_MAXIMUM_DATA_LENGTH = "ipc.maximum.data.length"; Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1541743&r1=1541742&r2=1541743&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Wed Nov 13 21:48:21 2013 @@ -342,6 +342,7 @@ public abstract class Server { private int port; // port we listen on private int handlerCount; // number of handler threads private int readThreads; // number of read threads + private int readerPendingConnectionQueue; // number of connections to queue per read thread private Class rpcRequestClass; // class used for deserializing the rpc request private int maxIdleTime; // the maximum idle time after // which a client may be disconnected @@ -550,12 +551,14 @@ public abstract class Server { } private class Reader extends Thread { - private volatile boolean adding = false; + final private BlockingQueue pendingConnections; private final Selector readSelector; Reader(String name) throws IOException { super(name); + this.pendingConnections = + new LinkedBlockingQueue(readerPendingConnectionQueue); this.readSelector = Selector.open(); } @@ -577,10 +580,14 @@ public abstract class Server { while (running) { SelectionKey key = null; try { + // consume as many connections as currently queued to avoid + // unbridled acceptance of connections that starves the select + int size = pendingConnections.size(); + for (int i=size; i>0; i--) { + Connection conn = pendingConnections.take(); + conn.channel.register(readSelector, SelectionKey.OP_READ, conn); + } readSelector.select(); - while (adding) { - this.wait(1000); - } Iterator iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { @@ -604,26 +611,14 @@ public abstract class Server { } /** - * This gets reader into the state that waits for the new channel - * to be registered with readSelector. If it was waiting in select() - * the thread will be woken up, otherwise whenever select() is called - * it will return even if there is nothing to read and wait - * in while(adding) for finishAdd call + * Updating the readSelector while it's being used is not thread-safe, + * so the connection must be queued. The reader will drain the queue + * and update its readSelector before performing the next select */ - public void startAdd() { - adding = true; + public void addConnection(Connection conn) throws InterruptedException { + pendingConnections.put(conn); readSelector.wakeup(); } - - public synchronized SelectionKey registerChannel(SocketChannel channel) - throws IOException { - return channel.register(readSelector, SelectionKey.OP_READ); - } - - public synchronized void finishAdd() { - adding = false; - this.notify(); - } void shutdown() { assert !running; @@ -763,20 +758,23 @@ public abstract class Server { Reader reader = getReader(); try { - reader.startAdd(); - SelectionKey readKey = reader.registerChannel(channel); - c = new Connection(readKey, channel, Time.now()); - readKey.attach(c); + c = new Connection(channel, Time.now()); synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } + reader.addConnection(c); if (LOG.isDebugEnabled()) LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections + "; # queued calls: " + callQueue.size()); - } finally { - reader.finishAdd(); + } catch (InterruptedException ie) { + if (running) { + LOG.info( + getName() + ": disconnecting client " + c.getHostAddress() + + " due to unexpected interrupt"); + } + closeConnection(c); } } } @@ -1187,8 +1185,7 @@ public abstract class Server { private boolean sentNegotiate = false; private boolean useWrap = false; - public Connection(SelectionKey key, SocketChannel channel, - long lastContact) { + public Connection(SocketChannel channel, long lastContact) { this.channel = channel; this.lastContact = lastContact; this.data = null; @@ -2186,6 +2183,9 @@ public abstract class Server { CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); } + this.readerPendingConnectionQueue = conf.getInt( + CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, + CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); this.callQueue = new LinkedBlockingQueue(maxQueueSize); this.maxIdleTime = 2 * conf.getInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1541743&r1=1541742&r2=1541743&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Wed Nov 13 21:48:21 2013 @@ -44,12 +44,16 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.SocketFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; @@ -613,6 +617,148 @@ public class TestIPC { server.stop(); } + private static class TestServerQueue extends Server { + final CountDownLatch firstCallLatch = new CountDownLatch(1); + final CountDownLatch callBlockLatch = new CountDownLatch(1); + + TestServerQueue(int expectedCalls, int readers, int callQ, int handlers, + Configuration conf) throws IOException { + super(ADDRESS, 0, LongWritable.class, handlers, readers, callQ, conf, null, null); + } + + @Override + public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, + long receiveTime) throws IOException { + firstCallLatch.countDown(); + try { + callBlockLatch.await(); + } catch (InterruptedException e) { + throw new IOException(e); + } + return param; + } + } + + /** + * Check that reader queueing works + * @throws BrokenBarrierException + * @throws InterruptedException + */ + @Test(timeout=60000) + public void testIpcWithReaderQueuing() throws Exception { + // 1 reader, 1 connectionQ slot, 1 callq + for (int i=0; i < 10; i++) { + checkBlocking(1, 1, 1); + } + // 4 readers, 5 connectionQ slots, 2 callq + for (int i=0; i < 10; i++) { + checkBlocking(4, 5, 2); + } + } + + // goal is to jam a handler with a connection, fill the callq with + // connections, in turn jamming the readers - then flood the server and + // ensure that the listener blocks when the reader connection queues fill + private void checkBlocking(int readers, int readerQ, int callQ) throws Exception { + int handlers = 1; // makes it easier + + final Configuration conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, readerQ); + + // send in enough clients to block up the handlers, callq, and readers + int initialClients = readers + callQ + handlers; + // max connections we should ever end up accepting at once + int maxAccept = initialClients + readers*readerQ + 1; // 1 = listener + // stress it with 2X the max + int clients = maxAccept*2; + + final AtomicInteger failures = new AtomicInteger(0); + final CountDownLatch callFinishedLatch = new CountDownLatch(clients); + + // start server + final TestServerQueue server = + new TestServerQueue(clients, readers, callQ, handlers, conf); + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + + Client.setConnectTimeout(conf, 10000); + + // instantiate the threads, will start in batches + Thread[] threads = new Thread[clients]; + for (int i=0; i