hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r651011 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/Client.java src/java/org/apache/hadoop/ipc/Server.java
Date Wed, 23 Apr 2008 19:09:07 GMT
Author: hairong
Date: Wed Apr 23 12:09:03 2008
New Revision: 651011

URL: http://svn.apache.org/viewvc?rev=651011&view=rev
Log:
HADOOP-2910. Throttle IPC Clients during bursts of requests or server slowdown. Clients retry
connection for up to 15 minutes when socket connection times out. Contributed by Hairong Kuang

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=651011&r1=651010&r2=651011&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr 23 12:09:03 2008
@@ -31,6 +31,10 @@
     HADOOP-3160. Remove deprecated exists() from ClientProtocol and 
     FSNamesystem (Lohit Vjayarenu via rangadi)
 
+    HADOOP-2910. Throttle IPC Clients during bursts of requests or
+    server slowdown. Clients retry connection for up to 15 minutes
+    when socket connection times out. (hairong)
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=651011&r1=651010&r2=651011&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Apr 23 12:09:03 2008
@@ -40,7 +40,6 @@
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.dfs.FSConstants;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -70,7 +69,7 @@
   private Configuration conf;
   private int maxIdleTime; //connections will be culled if it was idle for 
                            //maxIdleTime msecs
-  private int maxRetries; //the max. no. of retries for socket connections
+  final private int maxRetries; //the max. no. of retries for socket connections
   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
   private Thread connectionCullerThread;
   private SocketFactory socketFactory;           // how to create sockets
@@ -166,33 +165,23 @@
         notify();
         return;
       }
-      short failures = 0;
+      
+      short ioFailures = 0;
+      short timeoutFailures = 0;
       while (true) {
         try {
           this.socket = socketFactory.createSocket();
           this.socket.setTcpNoDelay(tcpNoDelay);
-          this.socket.connect(remoteId.getAddress(), FSConstants.READ_TIMEOUT);
+          // connection time out is 20s
+          this.socket.connect(remoteId.getAddress(), 20000);
           break;
-        } catch (IOException ie) { //SocketTimeoutException is also caught 
-          if (failures == maxRetries) {
-            //reset inUse so that the culler gets a chance to throw this
-            //connection object out of the table. We don't want to increment
-            //inUse to infinity (everytime getConnection is called inUse is
-            //incremented)!
-            inUse = 0;
-            // set socket to null so that the next call to setupIOstreams
-            // can start the process of connect all over again.
-            socket.close();
-            socket = null;
-            throw ie;
-          }
-          failures++;
-          LOG.info("Retrying connect to server: " + remoteId.getAddress() + 
-                   ". Already tried " + failures + " time(s).");
-          try { 
-            Thread.sleep(1000);
-          } catch (InterruptedException iex){
-          }
+        } catch (SocketTimeoutException toe) {
+          /* The max number of retries is 45,
+           * which amounts to 20s*45 = 15 minutes retries.
+           */
+          handleConnectionFailure(timeoutFailures++, 45, toe);
+        } catch (IOException ie) {
+          handleConnectionFailure(ioFailures++, maxRetries, ie);
         }
       }
       socket.setSoTimeout(timeout);
@@ -221,6 +210,48 @@
       notify();
     }
 
+    /* Handle connection failures
+     *
+     * If the current number of retries is equal to the max number of retries,
+     * stop retrying and throw the exception; Otherwise backoff 1 second and
+     * try connecting again.
+     * 
+     * @param curRetries current number of retries
+     * @param maxRetries max number of retries allowed
+     * @param ioe failure reason
+     * @throws IOException if max number of retries is reached
+     */
+    private void handleConnectionFailure(
+        int curRetries, int maxRetries, IOException ioe) throws IOException {
+      // close the current connection
+      try {
+        socket.close();
+      } catch (IOException e) {
+        LOG.warn("Not able to close a socket", e);
+      }
+      // set socket to null so that the next call to setupIOstreams
+      // can start the process of connect all over again.
+      socket = null;
+
+      // throw the exception if the maximum number of retries is reached
+      if (curRetries == maxRetries) {
+        //reset inUse so that the culler gets a chance to throw this
+        //connection object out of the table. We don't want to increment
+        //inUse to infinity (everytime getConnection is called inUse is
+        //incremented)!
+        inUse = 0;
+        throw ioe;
+      }
+
+      // otherwise back off and retry
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ignored) {}
+      
+      LOG.info("Retrying connect to server: " + remoteId.getAddress() + 
+          ". Already tried " + curRetries + " time(s).");
+    }
+    
     private synchronized void writeHeader() throws IOException {
       out.write(Server.HEADER.array());
       out.write(Server.CURRENT_VERSION);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=651011&r1=651010&r2=651011&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Apr 23 12:09:03 2008
@@ -45,6 +45,8 @@
 import java.util.List;
 import java.util.Iterator;
 import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -146,7 +148,7 @@
   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
 
   volatile private boolean running = true;         // true while server runs
-  private LinkedList<Call> callQueue = new LinkedList<Call>(); // queued calls
+  private BlockingQueue<Call> callQueue; // queued calls
 
   private List<Connection> connectionList = 
     Collections.synchronizedList(new LinkedList<Connection>());
@@ -321,6 +323,11 @@
           closeCurrentConnection(key, e);
           cleanupConnections(true);
           try { Thread.sleep(60000); } catch (Exception ie) {}
+        } catch (InterruptedException e) {
+          if (running) {                          // unexpected -- log it
+            LOG.info(getName() + " caught: " +
+                     StringUtils.stringifyException(e));
+          }
         } catch (Exception e) {
           closeCurrentConnection(key, e);
         }
@@ -363,23 +370,28 @@
     void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
       Connection c = null;
       ServerSocketChannel server = (ServerSocketChannel) key.channel();
-      SocketChannel channel = server.accept();
-      channel.configureBlocking(false);
-      channel.socket().setTcpNoDelay(tcpNoDelay);
-      SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
-      c = new Connection(readKey, channel, System.currentTimeMillis());
-      readKey.attach(c);
-      synchronized (connectionList) {
-        connectionList.add(numConnections, c);
-        numConnections++;
+      // accept up to 10 connections
+      for (int i=0; i<10; i++) {
+        SocketChannel channel = server.accept();
+        if (channel==null) return;
+
+        channel.configureBlocking(false);
+        channel.socket().setTcpNoDelay(tcpNoDelay);
+        SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
+        c = new Connection(readKey, channel, System.currentTimeMillis());
+        readKey.attach(c);
+        synchronized (connectionList) {
+          connectionList.add(numConnections, c);
+          numConnections++;
+        }
+        if (LOG.isDebugEnabled())
+          LOG.debug("Server connection from " + c.toString() +
+              "; # active connections: " + numConnections +
+              "; # queued calls: " + callQueue.size());
       }
-      if (LOG.isDebugEnabled())
-        LOG.debug("Server connection from " + c.toString() +
-                  "; # active connections: " + numConnections +
-                  "; # queued calls: " + callQueue.size());
     }
 
-    void doRead(SelectionKey key) {
+    void doRead(SelectionKey key) throws InterruptedException {
       int count = 0;
       Connection c = (Connection)key.attachment();
       if (c == null) {
@@ -389,6 +401,8 @@
       
       try {
         count = c.readAndProcess();
+      } catch (InterruptedException ieo) {
+        throw ieo;
       } catch (Exception e) {
         LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes
read: " + count, e);
         count = -1; //so that the (count < 0) block is executed
@@ -822,15 +836,7 @@
       param.readFields(dis);        
         
       Call call = new Call(id, param, this);
-      synchronized (callQueue) {
-        if (callQueue.size() >= maxQueueSize) {
-          Call oldCall = callQueue.removeFirst();
-          LOG.warn("Call queue overflow discarding oldest call " + oldCall);
-        }
-        callQueue.addLast(call);              // queue the call
-        callQueue.notify();                   // wake up a waiting handler
-      }
-        
+      callQueue.put(call);              // queue the call; maybe blocked here
     }
 
     private synchronized void close() throws IOException {
@@ -860,14 +866,7 @@
       ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
       while (running) {
         try {
-          Call call;
-          synchronized (callQueue) {
-            while (running && callQueue.size()==0) { // wait for a call
-              callQueue.wait(timeout);
-            }
-            if (!running) break;
-            call = callQueue.removeFirst(); // pop the queue
-          }
+          Call call = callQueue.take(); // pop the queue; maybe blocked here
 
           // throw the message away if it is too old
           if (System.currentTimeMillis() - call.receivedTime > 
@@ -952,6 +951,7 @@
     this.socketSendBufferSize = 0;
     maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
     maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
+    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);



Mime
View raw message