Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 94299 invoked from network); 6 Oct 2006 21:12:15 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 6 Oct 2006 21:12:15 -0000 Received: (qmail 11879 invoked by uid 500); 6 Oct 2006 21:12:15 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 11852 invoked by uid 500); 6 Oct 2006 21:12:15 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 11843 invoked by uid 99); 6 Oct 2006 21:12:15 -0000 Received: from idunn.apache.osuosl.org (HELO idunn.apache.osuosl.org) (140.211.166.84) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Oct 2006 14:12:15 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=5.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME Received: from [140.211.166.113] ([140.211.166.113:55932] helo=eris.apache.org) by idunn.apache.osuosl.org (ecelerity 2.1.1.8 r(12930)) with ESMTP id 13/3C-24193-CA6C6254 for ; Fri, 06 Oct 2006 14:12:12 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 22A201A981A; Fri, 6 Oct 2006 14:12:09 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r453766 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/Server.java Date: Fri, 06 Oct 2006 21:12:09 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061006211209.22A201A981A@eris.apache.org> X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: cutting Date: Fri Oct 6 14:12:08 2006 New Revision: 453766 URL: http://svn.apache.org/viewvc?view=rev&rev=453766 Log: HADOOP-255. Discard stale queued IPC calls. Contributed by Owen. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=453766&r1=453765&r2=453766 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Oct 6 14:12:08 2006 @@ -142,6 +142,14 @@ 34. HADOOP-506. Ignore heartbeats from stale task trackers. (Sanjay Dahiya via cutting) +35. HADOOP-255. Discard stale, queued IPC calls. Do not process + calls whose clients will likely time out before they receive a + response. When the queue is full, new calls are now received and + queued, and the oldest calls are discarded, so that, when servers + get bogged down, they no longer develop a backlog on the socket. + This should improve some DFS namenode failure modes. + (omalley via cutting) + Release 0.6.2 - 2006-09-18 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?view=diff&rev=453766&r1=453765&r2=453766 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Fri Oct 6 14:12:08 2006 @@ -17,7 +17,6 @@ package org.apache.hadoop.ipc; import java.io.IOException; -import java.io.EOFException; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.BufferedOutputStream; @@ -30,7 +29,6 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.nio.BufferUnderflowException; import java.net.InetSocketAddress; import java.net.Socket; @@ -57,6 +55,18 @@ * @see Client */ public abstract class Server { + /** + * How much time should be allocated for actually running the handler? + * Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME + * are ignored when the handler takes them off the queue. + */ + private static final float MAX_CALL_QUEUE_TIME = 0.6f; + + /** + * How many calls/handler are allowed in the queue. + */ + private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100; + public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.Server"); @@ -72,7 +82,6 @@ private String bindAddress; private int port; // port we listen on private int handlerCount; // number of handler threads - private int maxQueuedCalls; // max number of queued calls private Class paramClass; // class of call parameters private int maxIdleTime; // the maximum idle time after // which a client may be disconnected @@ -87,6 +96,8 @@ private Configuration conf; private int timeout; + private long maxCallStartAge; + private int maxQueueSize; private boolean running = true; // true while server runs private LinkedList callQueue = new LinkedList(); // queued calls @@ -103,11 +114,17 @@ private int id; // the client's call id private Writable param; // the parameter passed private Connection connection; // connection to client + private long receivedTime; // the time received public Call(int id, Writable param, Connection connection) { this.id = id; this.param = param; this.connection = connection; + this.receivedTime = System.currentTimeMillis(); + } + + public String toString() { + return param.toString() + " from " + connection.toString(); } } @@ -348,6 +365,10 @@ this.channelOut = new SocketChannelOutputStream(channel, 4096))); } + public String toString() { + return getHostAddress() + ":" + socket.getPort(); + } + public String getHostAddress() { return socket.getInetAddress().getHostAddress(); } @@ -409,15 +430,13 @@ Call call = new Call(id, param, this); synchronized (callQueue) { + if (callQueue.size() >= maxQueueSize) { + callQueue.removeFirst(); + } callQueue.addLast(call); // queue the call callQueue.notify(); // wake up a waiting handler } - while (running && callQueue.size() >= maxQueuedCalls) { - synchronized (callDequeued) { // queue is full - callDequeued.wait(timeout); // wait for a dequeue - } - } } private void close() throws IOException { @@ -462,6 +481,15 @@ callDequeued.notify(); } + // throw the message away if it is too old + if (System.currentTimeMillis() - call.receivedTime > + maxCallStartAge) { + LOG.info("Call " + call.toString() + + " discarded for being too old (" + + (System.currentTimeMillis() - call.receivedTime) + ")"); + continue; + } + if (LOG.isDebugEnabled()) LOG.debug(getName() + ": has #" + call.id + " from " + call.connection.socket.getInetAddress().getHostAddress()); @@ -526,8 +554,9 @@ this.port = port; this.paramClass = paramClass; this.handlerCount = handlerCount; - this.maxQueuedCalls = handlerCount; this.timeout = conf.getInt("ipc.client.timeout",10000); + maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME); + maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER; this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);