hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r653607 - in /hadoop/core/trunk: ./ conf/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/ipc/metrics/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/ipc/
Date Mon, 05 May 2008 21:24:26 GMT
Author: hairong
Date: Mon May  5 14:24:25 2008
New Revision: 653607

URL: http://svn.apache.org/viewvc?rev=653607&view=rev
Log:
HADOOP-2188. RPC should send a ping rather than use client timeouts. Contributed by Hairong Kuang.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java
    hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java
    hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May  5 14:24:25 2008
@@ -31,6 +31,12 @@
     xxxID.toString() and xxxID.forName() methods to convert/restore objects 
     to/from strings. (Enis Soztutar via ddas)
 
+    HADOOP-2188. RPC client sends a ping rather than throw timeouts.
+    RPC server does not throw away old RPCs. If clients and the server are on
+    different versions, they are not able to function well. In addition,
+    The property ipc.client.timeout is removed from the default hadoop
+    configuration. It also removes metrics RpcOpsDiscardedOPsNum. (hairong)
+
   NEW FEATURES
 
     HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Mon May  5 14:24:25 2008
@@ -1060,12 +1060,6 @@
 <!-- ipc properties -->
 
 <property>
-  <name>ipc.client.timeout</name>
-  <value>60000</value>
-  <description>Defines the timeout for IPC calls in milliseconds.</description>
-</property>
-
-<property>
   <name>ipc.client.idlethreshold</name>
   <value>4000</value>
   <description>Defines the threshold number of connections after which
@@ -1076,7 +1070,7 @@
 <property>
   <name>ipc.client.maxidletime</name>
   <value>120000</value>
-  <description>Defines the maximum idle time for a connected client after 
+  <description>Defines the maximum idle time in msec for a connected client after 
                which it may be disconnected.
   </description>
 </property>
@@ -1090,8 +1084,8 @@
 
 <property>
   <name>ipc.client.connection.maxidletime</name>
-  <value>1000</value>
-  <description>The maximum time after which a client will bring down the
+  <value>10000</value>
+  <description>The maximum time in msec after which a client will bring down the
                connection to the server.
   </description>
 </property>

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon May  5 14:24:25 2008
@@ -106,8 +106,6 @@
 
   private static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
     throws IOException {
-    RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
-        5, 200, TimeUnit.MILLISECONDS);
     RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
         5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
     
@@ -120,27 +118,10 @@
     exceptionToPolicyMap.put(RemoteException.class, 
         RetryPolicies.retryByRemoteException(
             RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
-    exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy);
     RetryPolicy methodPolicy = RetryPolicies.retryByException(
         RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
     Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
     
-    methodNameToPolicyMap.put("open", methodPolicy);
-    methodNameToPolicyMap.put("setReplication", methodPolicy);
-    methodNameToPolicyMap.put("abandonBlock", methodPolicy);
-    methodNameToPolicyMap.put("abandonFileInProgress", methodPolicy);
-    methodNameToPolicyMap.put("reportBadBlocks", methodPolicy);
-    methodNameToPolicyMap.put("isDir", methodPolicy);
-    methodNameToPolicyMap.put("getListing", methodPolicy);
-    methodNameToPolicyMap.put("getHints", methodPolicy);
-    methodNameToPolicyMap.put("getBlockLocations", methodPolicy);
-    methodNameToPolicyMap.put("renewLease", methodPolicy);
-    methodNameToPolicyMap.put("getStats", methodPolicy);
-    methodNameToPolicyMap.put("getDatanodeReport", methodPolicy);
-    methodNameToPolicyMap.put("getPreferredBlockSize", methodPolicy);
-    methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
-    methodNameToPolicyMap.put("complete", methodPolicy);
-    methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
     methodNameToPolicyMap.put("create", methodPolicy);
 
     return (ClientProtocol) RetryProxy.create(ClientProtocol.class,

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Mon May  5 14:24:25 2008
@@ -24,22 +24,23 @@
 import java.net.UnknownHostException;
 
 import java.io.IOException;
-import java.io.EOFException;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.FilterInputStream;
-import java.io.FilterOutputStream;
+import java.io.InputStream;
 
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.Map.Entry;
 
 import javax.net.SocketFactory;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -62,28 +63,65 @@
   private Hashtable<ConnectionId, Connection> connections =
     new Hashtable<ConnectionId, Connection>();
 
-  private Class valueClass;                       // class of call values
-  private int timeout;// timeout for calls
+  private Class<?> valueClass;                       // class of call values
   private int counter;                            // counter for call ids
   private boolean running = true;                 // true while client runs
-  private Configuration conf;
-  private int maxIdleTime; //connections will be culled if it was idle for 
+  final private Configuration conf;
+  final private int maxIdleTime; //connections will be culled if it was idle for 
                            //maxIdleTime msecs
   final private int maxRetries; //the max. no. of retries for socket connections
   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
-  private Thread connectionCullerThread;
+  private int pingInterval; // how often sends ping to the server in msecs
+
   private SocketFactory socketFactory;           // how to create sockets
-  
   private int refCount = 1;
   
+  final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
+  final public static int DEFAULT_PING_INTERVAL = 60000; // 1 min
+  final static int PING_CALL_ID = -1;
+  
+  /**
+   * set the ping interval value in configuration
+   * 
+   * @param conf Configuration
+   * @param pingInterval the ping interval
+   */
+  final public static void setPingInterval(Configuration conf, int pingInterval) {
+    conf.setInt(PING_INTERVAL_NAME, pingInterval);
+  }
+
+  /**
+   * Get the ping interval from configuration;
+   * If not set in the configuration, return the default value.
+   * 
+   * @param conf Configuration
+   * @return the ping interval
+   */
+  final static int getPingInterval(Configuration conf) {
+    return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
+  }
+  
+  /**
+   * Increment this client's reference count
+   *
+   */
   synchronized void incCount() {
-	  refCount++;
+    refCount++;
   }
   
+  /**
+   * Decrement this client's reference count
+   *
+   */
   synchronized void decCount() {
     refCount--;
   }
   
+  /**
+   * Return if this client has no reference
+   * 
+   * @return true if this client has no reference; false otherwise
+   */
   synchronized boolean isZeroReference() {
     return refCount==0;
   }
@@ -93,9 +131,7 @@
     int id;                                       // call id
     Writable param;                               // parameter
     Writable value;                               // value, null if error
-    String error;                                 // exception, null if value
-    String errorClass;                            // class of exception
-    long lastActivity;                            // time of last i/o
+    IOException error;                            // exception, null if value
     boolean done;                                 // true when call is done
 
     protected Call(Writable param) {
@@ -103,30 +139,34 @@
       synchronized (Client.this) {
         this.id = counter++;
       }
-      touch();
     }
 
-    /** Called by the connection thread when the call is complete and the
-     * value or error string are available.  Notifies by default.  */
-    public synchronized void callComplete() {
+    /** Indicate when the call is complete and the
+     * value or error are available.  Notifies by default.  */
+    protected synchronized void callComplete() {
+      this.done = true;
       notify();                                 // notify caller
     }
 
-    /** Update lastActivity with the current time. */
-    public synchronized void touch() {
-      lastActivity = System.currentTimeMillis();
-    }
-
-    /** Update lastActivity with the current time. */
-    public synchronized void setResult(Writable value, 
-                                       String errorClass,
-                                       String error) {
-      this.value = value;
+    /** Set the exception when there is an error.
+     * Notify the caller the call is done.
+     * 
+     * @param error exception thrown by the call; either local or remote
+     */
+    public synchronized void setException(IOException error) {
       this.error = error;
-      this.errorClass =errorClass;
-      this.done = true;
+      callComplete();
     }
     
+    /** Set the return value when there is no error. 
+     * Notify the caller the call is done.
+     * 
+     * @param value return value of the call.
+     */
+    public synchronized void setValue(Writable value) {
+      this.value = value;
+      callComplete();
+    }
   }
 
   /** Thread that reads responses and notifies callers.  Each connection owns a
@@ -139,11 +179,9 @@
     private DataOutputStream out;
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
-    private Call readingCall;
-    private Call writingCall;
-    private int inUse = 0;
-    private long lastActivity = 0;
-    private boolean shouldCloseConnection = false;
+    private long lastActivity = 0;     // last I/O activity time
+    private boolean shouldCloseConnection = false;  // indicate if the connection is closed
+    private IOException closeException; // close reason
 
     public Connection(InetSocketAddress address) throws IOException {
       this(new ConnectionId(address, null));
@@ -155,59 +193,129 @@
                                        remoteId.getAddress().getHostName());
       }
       this.remoteId = remoteId;
-      this.setName("IPC Client connection to " + 
-                   remoteId.getAddress().toString());
+      UserGroupInformation ticket = remoteId.getTicket();
+      this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
+          remoteId.getAddress().toString() +
+          " from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
       this.setDaemon(true);
     }
 
-    public synchronized void setupIOstreams() throws IOException {
-      if (socket != null) {
-        notify();
+    /** Update lastActivity with the current time. */
+    private synchronized void touch() {
+      touch(System.currentTimeMillis());
+    }
+    
+    private synchronized void touch(long curTime) {
+      lastActivity = curTime;
+    }
+
+    /** Add a call to this connection's call queue */
+    private synchronized boolean addCall(Call call) {
+      if (shouldCloseConnection)
+        return false;
+      calls.put(call.id, call);
+      notify();
+      return true;
+    }
+    
+    /** This class sends a ping to the remote side when timeout on
+     * reading. If no failure is detected, it retries until at least
+     * a byte is read.
+     */
+    private class PingInputStream extends FilterInputStream {
+      /* constructor */
+      protected PingInputStream(InputStream in) {
+        super(in);
+      }
+
+      /* Process timeout exception
+       * if the connection is not going to be closed, send a ping.
+       * otherwise, throw the timeout exception.
+       */
+      private void handleTimeout(SocketTimeoutException e) throws IOException {
+        if (shouldCloseConnection || !running) {
+          throw e;
+        } else {
+          sendPing();
+        }
+      }
+      
+      /** Read a byte from the stream.
+       * Send a ping if timeout on read. Retries if no failure is detected
+       * until a byte is read.
+       */
+      public int read() throws IOException {
+        do {
+          try {
+            return super.read();
+          } catch (SocketTimeoutException e) {
+            handleTimeout(e);
+          }
+        } while (true);
+      }
+
+      /** Read bytes into a buffer starting from offset <code>off</code>
+       * Send a ping if timeout on read. Retries if no failure is detected
+       * until a byte is read.
+       * 
+       * @Return the total number of bytes read; -1 if the connection is closed.
+       */
+      public int read(byte[] buf, int off, int len) throws IOException {
+        do {
+          try {
+            return super.read(buf, off, len);
+          } catch (SocketTimeoutException e) {
+            handleTimeout(e);
+          }
+        } while (true);
+      }
+    }
+    
+    /** Connect to the server and set up the I/O streams. It then sends
+     * a header to the server and starts
+     * the connection thread that waits for responses.
+     */
+    private synchronized void setupIOstreams() {
+      if (socket != null || shouldCloseConnection) {
         return;
       }
       
       short ioFailures = 0;
       short timeoutFailures = 0;
-      while (true) {
-        try {
-          this.socket = socketFactory.createSocket();
-          this.socket.setTcpNoDelay(tcpNoDelay);
-          // connection time out is 20s
-          this.socket.connect(remoteId.getAddress(), 20000);
-          break;
-        } catch (SocketTimeoutException toe) {
-          /* The max number of retries is 45,
-           * which amounts to 20s*45 = 15 minutes retries.
-           */
-          handleConnectionFailure(timeoutFailures++, 45, toe);
-        } catch (IOException ie) {
-          handleConnectionFailure(ioFailures++, maxRetries, ie);
-        }
-      }
-      socket.setSoTimeout(timeout);
-      this.in = new DataInputStream
-        (new BufferedInputStream
-         (new FilterInputStream(NetUtils.getInputStream(socket)) {
-             public int read(byte[] buf, int off, int len) throws IOException {
-               int value = super.read(buf, off, len);
-               if (readingCall != null) {
-                 readingCall.touch();
-               }
-               return value;
-             }
-           }));
-      this.out = new DataOutputStream
-        (new BufferedOutputStream
-         (new FilterOutputStream(NetUtils.getOutputStream(socket)) {
-             public void write(byte[] buf, int o, int len) throws IOException {
-               out.write(buf, o, len);
-               if (writingCall != null) {
-                 writingCall.touch();
-               }
-             }
-           }));
-      writeHeader();
-      notify();
+      try {
+        LOG.info("Build a connection to "+remoteId.getAddress());
+        while (true) {
+          try {
+            this.socket = socketFactory.createSocket();
+            this.socket.setTcpNoDelay(tcpNoDelay);
+            // connection time out is 20s
+            this.socket.connect(remoteId.getAddress(), 20000);
+            this.socket.setSoTimeout(pingInterval);
+            break;
+          } catch (SocketTimeoutException toe) {
+            /* The max number of retries is 45,
+             * which amounts to 20s*45 = 15 minutes retries.
+             */
+            handleConnectionFailure(timeoutFailures++, 45, toe);
+          } catch (IOException ie) {
+            handleConnectionFailure(ioFailures++, maxRetries, ie);
+          }
+        }
+        this.in = new DataInputStream(new BufferedInputStream
+            (new PingInputStream(NetUtils.getInputStream(socket))));
+        this.out = new DataOutputStream
+            (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
+        writeHeader();
+
+        // update last activity time
+        touch();
+
+      } catch (IOException e) {
+        markClosed(e);
+        close();
+      }
+      // start the receiver thread after the socket connection has been set up
+      start();
     }
 
     /* Handle connection failures
@@ -235,11 +343,6 @@
 
       // throw the exception if the maximum number of retries is reached
       if (curRetries == maxRetries) {
-        //reset inUse so that the culler gets a chance to throw this
-        //connection object out of the table. We don't want to increment
-        //inUse to infinity (everytime getConnection is called inUse is
-        //incremented)!
-        inUse = 0;
         throw ioe;
       }
 
@@ -251,8 +354,11 @@
       LOG.info("Retrying connect to server: " + remoteId.getAddress() + 
           ". Already tried " + curRetries + " time(s).");
     }
-    
-    private synchronized void writeHeader() throws IOException {
+
+    /* Write the header for each connection
+     * Out is not synchronized because only the first thread does this.
+     */
+    private void writeHeader() throws IOException {
       out.write(Server.HEADER.array());
       out.write(Server.CURRENT_VERSION);
       //When there are more fields we can have ConnectionHeader Writable.
@@ -264,155 +370,197 @@
       out.write(buf.getData(), 0, bufLen);
     }
     
+    /* wait till someone signals us to start reading RPC response or
+     * it is idle too long, it is marked as to be closed, 
+     * or the client is marked as not running.
+     * 
+     * Return true if it is time to read a response; false otherwise.
+     */
     private synchronized boolean waitForWork() {
-      //wait till someone signals us to start reading RPC response or
-      //close the connection. If we are idle long enough (blocked in wait),
-      //the ConnectionCuller thread will wake us up and ask us to close the
-      //connection. 
-      //We need to wait when inUse is 0 or socket is null (it may be null if
-      //the Connection object has been created but the socket connection
-      //has not been setup yet). We stop waiting if we have been asked to close
-      //connection
-      while ((inUse == 0 || socket == null) && !shouldCloseConnection) {
-        try {
-          wait();
-        } catch (InterruptedException e) {}
+      if (calls.isEmpty() && !shouldCloseConnection  && running)  {
+        long timeout = maxIdleTime-(System.currentTimeMillis()-lastActivity);
+        if (timeout>0) {
+          try {
+            wait(timeout);
+          } catch (InterruptedException e) {}
+        }
       }
-      return !shouldCloseConnection;
-    }
-
-    private synchronized void incrementRef() {
-      inUse++;
-    }
-
-    private synchronized void decrementRef() {
-      lastActivity = System.currentTimeMillis();
-      inUse--;
-    }
-
-    public synchronized boolean isIdle() {
-      //check whether the connection is in use or just created
-      if (inUse != 0) return false;
-      long currTime = System.currentTimeMillis();
-      if (currTime - lastActivity > maxIdleTime)
+      
+      if (!calls.isEmpty() && !shouldCloseConnection && running) {
         return true;
-      return false;
+      } else if (shouldCloseConnection) {
+        return false;
+      } else if (!running) { //get stopped 
+        markClosed((IOException)new IOException().initCause(
+            new InterruptedException()));
+        return false;
+      } else { // closed because it has been idle for more than maxIdleTime
+        markClosed(null);
+        return false;
+      }
     }
 
     public InetSocketAddress getRemoteAddress() {
       return remoteId.getAddress();
     }
 
-    public void setCloseConnection() {
-      shouldCloseConnection = true;
+    /* Send a ping to the server if the time elapsed 
+     * since last I/O activity is equal to or greater than the ping interval
+     */
+    private synchronized void sendPing() throws IOException {
+      long curTime = System.currentTimeMillis();
+      if ( curTime - lastActivity >= pingInterval) {
+        touch(curTime);
+        synchronized (out) {
+          out.writeInt(PING_CALL_ID);
+          out.flush();
+        }
+      }
     }
 
     public void run() {
       if (LOG.isDebugEnabled())
-        LOG.debug(getName() + ": starting");
-      try {
-        while (running) {
-          int id;
-          //wait here for work - read connection or close connection
-          if (waitForWork() == false)
-            break;
-          try {
-            id = in.readInt();                    // try to read an id
-          } catch (SocketTimeoutException e) {
-            continue;
-          }
+        LOG.debug(getName() + ": starting, having connections " 
+            + connections.size());
 
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + " got value #" + id);
-
-          Call call = calls.remove(id);
-          boolean isError = in.readBoolean();     // read if error
-          if (isError) {
-            call.setResult(null, WritableUtils.readString(in),
-                           WritableUtils.readString(in));
-          } else {
-            Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
-            try {
-              readingCall = call;
-              value.readFields(in);                 // read value
-            } finally {
-              readingCall = null;
-            }
-            call.setResult(value, null, null);
-          }
-          call.callComplete();                   // deliver result to caller
-          //received the response. So decrement the ref count
-          decrementRef();
-        }
-      } catch (EOFException eof) {
-        // This is what happens when the remote side goes down
-      } catch (Exception e) {
-        LOG.info(StringUtils.stringifyException(e));
-      } finally {
-        //If there was no exception thrown in this method, then the only
-        //way we reached here is by breaking out of the while loop (after
-        //waitForWork). And if we took that route to reach here, we have 
-        //already removed the connection object in the ConnectionCuller thread.
-        //We don't want to remove this again as some other thread might have
-        //actually put a new Connection object in the table in the meantime.
-        synchronized (connections) {
-          if (connections.get(remoteId) == this) {
-            connections.remove(remoteId);
-          }
-        }
-        close();
+      while (waitForWork()) {//wait here for work - read or close connection
+        receiveResponse();
       }
+      
+      close();
+      
+      if (LOG.isDebugEnabled())
+        LOG.debug(getName() + ": stopped, remaining connections "
+            + connections.size());
     }
 
     /** Initiates a call by sending the parameter to the remote server.
      * Note: this is not called from the Connection thread, but by other
      * threads.
      */
-    public void sendParam(Call call) throws IOException {
-      boolean error = true;
+    public void sendParam(Call call) {
+      synchronized (this) {
+        if (shouldCloseConnection) {
+          return;
+        }
+      }
+
       try {
-        calls.put(call.id, call);
         synchronized (out) {
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + " sending #" + call.id);
-          try {
-            writingCall = call;
-            DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
-                                                         //data to be written
-            d.writeInt(call.id);
-            call.param.write(d);
-            byte[] data = d.getData();
-            int dataLength = d.getLength();
-
-            out.writeInt(dataLength);      //first put the data length
-            out.write(data, 0, dataLength);//write the data
-            out.flush();
-          } finally {
-            writingCall = null;
-          }
-        }
-        error = false;
-      } finally {
-        if (error) {
-          synchronized (connections) {
-            if (connections.get(remoteId) == this)
-              connections.remove(remoteId);
-          }
-          close();                                // close on error
+
+          DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
+          //data to be written
+          d.writeInt(call.id);
+          call.param.write(d);
+          byte[] data = d.getData();
+          int dataLength = d.getLength();
+
+          out.writeInt(dataLength);      //first put the data length
+          out.write(data, 0, dataLength);//write the data
+          out.flush();
         }
+      } catch(IOException e) {
+        markClosed(e);
       }
     }  
 
-    /** Close the connection. */
-    public void close() {
-      //socket may be null if the connection could not be established to the
-      //server in question, and the culler asked us to close the connection
-      if (socket == null) return;
+    /* Receive a response.
+     * Because only one receiver, so no synchronization on in.
+     */
+    private void receiveResponse() {
+      synchronized (this) {
+        if (shouldCloseConnection) {
+          return;
+        }
+      }
+      touch();
+      
       try {
-        socket.close();                           // close socket
-      } catch (IOException e) {}
+        int id = in.readInt();                    // try to read an id
+
+        if (LOG.isDebugEnabled())
+          LOG.debug(getName() + " got value #" + id);
+
+        Call call = calls.remove(id);
+
+        boolean isError = in.readBoolean();     // read if error
+        if (isError) {
+          call.setException(new RemoteException( WritableUtils.readString(in),
+              WritableUtils.readString(in)));
+        } else {
+          Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
+          value.readFields(in);                 // read value
+          call.setValue(value);
+        }
+      } catch (IOException e) {
+        markClosed(e);
+      }
+    }
+    
+    private synchronized void markClosed(IOException e) {
+      if (!shouldCloseConnection) {
+        shouldCloseConnection = true;
+        closeException = e;
+        notifyAll();
+      }
+    }
+    
+    /** Close the connection. */
+    private synchronized void close() {
+      if (!shouldCloseConnection) {
+        LOG.error("The connection is not in the closed state");
+        return;
+      }
+
+      synchronized (out) {
+        // release the resources
+        // first thing to do;take the connection out of the connection list
+        synchronized (connections) {
+          if (connections.get(remoteId) == this) {
+            connections.remove(remoteId);
+          }
+        }
+
+        // close the socket and streams
+        IOUtils.closeStream(in);
+        IOUtils.closeStream(out);
+        IOUtils.closeSocket(socket);
+
+        // clean up all calls
+        if (closeException == null) {
+          if (!calls.isEmpty()) {
+            LOG.warn(
+            "A connection is closed for no cause and calls are not empty");
+            
+            // clean up calls anyway
+            closeException = new IOException("Unexpected closed connection");
+            cleanupCalls();
+          }
+        } else {
+          // log the info
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("closing ipc connection to " + remoteId.address + ": " +
+                StringUtils.stringifyException(closeException));
+          }
+
+          // cleanup calls
+          cleanupCalls();
+        }
+      }
       if (LOG.isDebugEnabled())
-        LOG.debug(getName() + ": closing");
+        LOG.debug(getName() + ": closed");
+    }
+    
+    /* Cleanup all calls and mark them as done */
+    private void cleanupCalls() {
+      Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
+      while (itor.hasNext()) {
+        Call c = itor.next().getValue(); 
+        c.setException(closeException); // local exception
+        itor.remove();         
+      }
     }
   }
 
@@ -428,7 +576,7 @@
     }
 
     /** Deliver result to result collector. */
-    public void callComplete() {
+    protected void callComplete() {
       results.callComplete(this);
     }
   }
@@ -453,58 +601,21 @@
     }
   }
 
-  private class ConnectionCuller extends Thread {
-
-    public static final int MIN_SLEEP_TIME = 1000;
-
-    public void run() {
-
-      LOG.debug(getName() + ": starting");
-
-      while (running) {
-        try {
-          Thread.sleep(MIN_SLEEP_TIME);
-        } catch (InterruptedException ie) {}
-
-        synchronized (connections) {
-          Iterator i = connections.values().iterator();
-          while (i.hasNext()) {
-            Connection c = (Connection)i.next();
-            if (c.isIdle()) { 
-              //We don't actually close the socket here (i.e., don't invoke
-              //the close() method). We leave that work to the response receiver
-              //thread. The reason for that is since we have taken a lock on the
-              //connections table object, we don't want to slow down the entire
-              //system if we happen to talk to a slow server.
-              i.remove();
-              synchronized (c) {
-                c.setCloseConnection();
-                c.notify();
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-
   /** Construct an IPC client whose values are of the given {@link Writable}
    * class. */
   public Client(Class valueClass, Configuration conf, 
       SocketFactory factory) {
     this.valueClass = valueClass;
-    this.timeout = conf.getInt("ipc.client.timeout", 10000);
-    this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime", 1000);
+    this.maxIdleTime = 
+      conf.getInt("ipc.client.connection.maxidletime", 10000); //10s
     this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
     this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false);
+    this.pingInterval = getPingInterval(conf);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The ping interval is" + this.pingInterval + "ms.");
+    }
     this.conf = conf;
     this.socketFactory = factory;
-    this.connectionCullerThread = new ConnectionCuller();
-    connectionCullerThread.setDaemon(true);
-    connectionCullerThread.setName(valueClass.getName() + " Connection Culler");
-    LOG.debug(valueClass.getName() + 
-              "Connection culler maxidletime= " + maxIdleTime + "ms");
-    connectionCullerThread.start();
   }
 
   /**
@@ -535,19 +646,11 @@
       return;
     }
     running = false;
-
-    connectionCullerThread.interrupt();
-    try {
-      connectionCullerThread.join();
-    } catch(InterruptedException e) {}
-
-    // close and wake up all connections
+    
+    // wake up all connections
     synchronized (connections) {
       for (Connection conn : connections.values()) {
-        synchronized (conn) {
-          conn.setCloseConnection();
-          conn.notifyAll();
-        }
+        conn.interrupt();
       }
     }
     
@@ -560,9 +663,6 @@
     }
   }
 
-  /** Sets the timeout used for network i/o. */
-  public void setTimeout(int timeout) { this.timeout = timeout; }
-
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception. */
@@ -574,20 +674,24 @@
   public Writable call(Writable param, InetSocketAddress addr, 
                        UserGroupInformation ticket)  
                        throws InterruptedException, IOException {
-    Connection connection = getConnection(addr, ticket);
     Call call = new Call(param);
+    Connection connection = getConnection(addr, ticket, call);
+    connection.sendParam(call);                 // send the parameter
     synchronized (call) {
-      connection.sendParam(call);                 // send the parameter
-      long wait = timeout;
-      do {
-        call.wait(wait);                       // wait for the result
-        wait = timeout - (System.currentTimeMillis() - call.lastActivity);
-      } while (!call.done && wait > 0);
+      while (!call.done) {
+        try {
+          call.wait();                           // wait for the result
+        } catch (InterruptedException ignored) {}
+      }
 
       if (call.error != null) {
-        throw new RemoteException(call.errorClass, call.error);
-      } else if (!call.done) {
-        throw new SocketTimeoutException("timed out waiting for rpc response");
+        if (call.error instanceof RemoteException) {
+          call.error.fillInStackTrace();
+          throw call.error;
+        } else { // local exception
+          throw (IOException)new IOException(
+              "Call failed on local exception").initCause(call.error);
+        }
       } else {
         return call.value;
       }
@@ -607,7 +711,7 @@
       for (int i = 0; i < params.length; i++) {
         ParallelCall call = new ParallelCall(params[i], results, i);
         try {
-          Connection connection = getConnection(addresses[i], null);
+          Connection connection = getConnection(addresses[i], null, call);
           connection.sendParam(call);             // send each parameter
         } catch (IOException e) {
           LOG.info("Calling "+addresses[i]+" caught: " + 
@@ -615,38 +719,44 @@
           results.size--;                         //  wait for one fewer result
         }
       }
-      try {
-        results.wait(timeout);                    // wait for all results
-      } catch (InterruptedException e) {}
-
-      if (results.count == 0) {
-        throw new IOException("no responses");
-      } else {
-        return results.values;
+      while (results.count != results.size) {
+        try {
+          results.wait();                    // wait for all results
+        } catch (InterruptedException e) {}
       }
+
+      return results.values;
     }
   }
 
   /** Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given host/port are reused. */
   private Connection getConnection(InetSocketAddress addr, 
-                                   UserGroupInformation ticket)
+                                   UserGroupInformation ticket,
+                                   Call call)
                                    throws IOException {
+    synchronized (this) {
+      if (!running) {
+        // the client is stopped
+        throw new IOException("The client is stopped");
+      }
+    }
     Connection connection;
     /* we could avoid this allocation for each RPC by having a  
      * connectionsId object and with set() method. We need to manage the
      * refs for keys in HashMap properly. For now its ok.
      */
     ConnectionId remoteId = new ConnectionId(addr, ticket);
-    synchronized (connections) {
-      connection = connections.get(remoteId);
-      if (connection == null) {
-        connection = new Connection(remoteId);
-        connections.put(remoteId, connection);
-        connection.start();
+    do {
+      synchronized (connections) {
+        connection = connections.get(remoteId);
+        if (connection == null) {
+          connection = new Connection(remoteId);
+          connections.put(remoteId, connection);
+        }
       }
-      connection.incrementRef();
-    }
+    } while (!connection.addCall(call));
+    
     //we don't invoke the method below inside "synchronized (connections)"
     //block above. The reason for that is if the server happens to be slow,
     //it will take longer to establish a connection and that will slow the

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java Mon May  5 14:24:25 2008
@@ -72,15 +72,8 @@
    */
   public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
   
-  // 1 : Ticket is added to connection header
-  public static final byte CURRENT_VERSION = 1;
-  
-  /**
-   * How much time should be allocated for actually running the handler?
-   * Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME
-   * are ignored when the handler takes them off the queue.
-   */
-  private static final float MAX_CALL_QUEUE_TIME = 0.6f;
+  // 1 : Introduce ping and server does not throw away RPCs
+  public static final byte CURRENT_VERSION = 2;
   
   /**
    * How many calls/handler are allowed in the queue.
@@ -126,7 +119,7 @@
   private String bindAddress; 
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
-  private Class paramClass;                       // class of call parameters
+  private Class<?> paramClass;                    // class of call parameters
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
   private int thresholdIdleConnections;           // the number of idle connections
@@ -141,8 +134,6 @@
   
   private Configuration conf;
 
-  private int timeout;
-  private long maxCallStartAge;
   private int maxQueueSize;
   private int socketSendBufferSize;
   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
@@ -160,7 +151,7 @@
   private Handler[] handlers = null;
 
   /**
-   * A convience method to bind to a given address and report 
+   * A convenience method to bind to a given address and report 
    * better exceptions if the address is not a valid host.
    * @param socket the socket to bind
    * @param address the address to bind to
@@ -192,14 +183,15 @@
     private int id;                               // the client's call id
     private Writable param;                       // the parameter passed
     private Connection connection;                // connection to client
-    private long receivedTime;                    // the time received
+    private long timestamp;     // the time received when response is null
+                                   // the time served when response is not null
     private ByteBuffer response;                      // the response for this call
 
     public Call(int id, Writable param, Connection connection) {
       this.id = id;
       this.param = param;
       this.connection = connection;
-      this.receivedTime = System.currentTimeMillis();
+      this.timestamp = System.currentTimeMillis();
       this.response = null;
     }
     
@@ -299,10 +291,9 @@
         SelectionKey key = null;
         try {
           selector.select();
-          Iterator iter = selector.selectedKeys().iterator();
-          
+          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
           while (iter.hasNext()) {
-            key = (SelectionKey)iter.next();
+            key = iter.next();
             iter.remove();
             try {
               if (key.isValid()) {
@@ -439,6 +430,8 @@
   private class Responder extends Thread {
     private Selector writeSelector;
     private int pending;         // connections waiting to register
+    
+    final static int PURGE_INTERVAL = 900000; // 15mins
 
     Responder() throws IOException {
       this.setName("IPC Server Responder");
@@ -456,7 +449,7 @@
       while (running) {
         try {
           waitPending();     // If a channel is being registered, wait.
-          writeSelector.select(maxCallStartAge);
+          writeSelector.select(PURGE_INTERVAL);
           Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
           while (iter.hasNext()) {
             SelectionKey key = iter.next();
@@ -470,7 +463,7 @@
             }
           }
           long now = System.currentTimeMillis();
-          if (now < lastPurgeTime + maxCallStartAge) {
+          if (now < lastPurgeTime + PURGE_INTERVAL) {
             continue;
           }
           lastPurgeTime = now;
@@ -544,34 +537,17 @@
         LOG.info("doPurge: bad channel");
         return;
       }
-      boolean close = false;
       LinkedList<Call> responseQueue = call.connection.responseQueue;
       synchronized (responseQueue) {
         Iterator<Call> iter = responseQueue.listIterator(0);
         while (iter.hasNext()) {
           call = iter.next();
-          if (now > call.receivedTime + maxCallStartAge) {
-            LOG.info(getName() + ", call " + call +
-                     ": response discarded for being too old (" +
-                     (now - call.receivedTime) + ")");
-            iter.remove();
-            if (call.response.position() > 0) {
-              /* We should probably use a different start time 
-               * than receivedTime. receivedTime starts when the RPC
-               * was first read.
-               * We have written a partial response. will close the
-               * connection for now.
-               */
-              close = true;
-              break;
-            }            
+          if (now > call.timestamp + PURGE_INTERVAL) {
+            closeConnection(call.connection);
+            break;
           }
         }
       }
-      
-      if (close) {
-        closeConnection(call.connection);
-      }
     }
 
     // Processes one response. Returns true if there are no more pending
@@ -627,6 +603,9 @@
             call.connection.responseQueue.addFirst(call);
             
             if (inHandler) {
+              // set the serve time when the response has to be sent later
+              call.timestamp = System.currentTimeMillis();
+              
               incPending();
               try {
                 // Wakeup the thread blocked on select, only then can the call 
@@ -791,6 +770,11 @@
         if (data == null) {
           dataLengthBuffer.flip();
           dataLength = dataLengthBuffer.getInt();
+       
+          if (dataLength == Client.PING_CALL_ID) {
+            dataLengthBuffer.clear();
+            return 0;  //ping message
+          }
           data = ByteBuffer.allocate(dataLength);
         }
         
@@ -868,18 +852,6 @@
         try {
           Call call = callQueue.take(); // pop the queue; maybe blocked here
 
-          // throw the message away if it is too old
-          if (System.currentTimeMillis() - call.receivedTime > 
-              maxCallStartAge) {
-            ReflectionUtils.logThreadInfo(LOG, "Discarding call " + call, 30);
-            int timeInQ = (int) (System.currentTimeMillis() - call.receivedTime);
-            LOG.warn(getName()+", call "+call
-                     +": discarded for being too old (" +
-                     timeInQ + ")");
-            rpcMetrics.rpcDiscardedOps.inc(timeInQ);
-            continue;
-          }
-          
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + ": has #" + call.id + " from " +
                       call.connection);
@@ -892,7 +864,7 @@
           UserGroupInformation previous = UserGroupInformation.getCurrentUGI();
           UserGroupInformation.setCurrentUGI(call.connection.ticket);
           try {
-            value = call(call.param, call.receivedTime);             // make the call
+            value = call(call.param, call.timestamp);             // make the call
           } catch (Throwable e) {
             LOG.info(getName()+", call "+call+": error: " + e, e);
             errorClass = e.getClass().getName();
@@ -939,7 +911,7 @@
    * the number of handler threads that will be used to process calls.
    * 
    */
-  protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf,
+  protected Server(String bindAddress, int port, Class<?> paramClass, int handlerCount, Configuration conf,
                   String serverName) 
     throws IOException {
     this.bindAddress = bindAddress;
@@ -947,10 +919,8 @@
     this.port = port;
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
-    this.timeout = conf.getInt("ipc.client.timeout", 10000);
     this.socketSendBufferSize = 0;
-    maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
-    maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
+    this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
@@ -978,9 +948,6 @@
     }
   }
   
-  /** Sets the timeout used for network i/o. */
-  public void setTimeout(int timeout) { this.timeout = timeout; }
-
   /** Sets the socket buffer size used for responding to RPCs */
   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java Mon May  5 14:24:25 2008
@@ -39,7 +39,7 @@
  * This class has a number of metrics variables that are publicly accessible;
  * these variables (objects) have methods to update their values;
  * for example:
- *  <p> {@link #rpcDiscardedOps}.inc(time)
+ *  <p> {@link #rpcQueueTime}.inc(time)
  *
  */
 public class RpcMetrics implements Updater {
@@ -71,7 +71,6 @@
   
   public MetricsTimeVaryingRate rpcQueueTime = new MetricsTimeVaryingRate("RpcQueueTime");
   public MetricsTimeVaryingRate rpcProcessingTime = new MetricsTimeVaryingRate("RpcProcessingTime");
-  public MetricsTimeVaryingRate rpcDiscardedOps = new MetricsTimeVaryingRate("RpcDiscardedOps");
 
   public Map <String, MetricsTimeVaryingRate> metricsList = Collections.synchronizedMap(new HashMap<String, MetricsTimeVaryingRate>());
 
@@ -83,7 +82,6 @@
   public void doUpdates(MetricsContext context) {
     rpcQueueTime.pushMetric(metricsRecord);
     rpcProcessingTime.pushMetric(metricsRecord);
-    rpcDiscardedOps.pushMetric(metricsRecord);
 
     synchronized (metricsList) {
 	// Iterate through the rpcMetrics hashmap to propogate the different rpc metrics.

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java Mon May  5 14:24:25 2008
@@ -98,20 +98,6 @@
   /**
    * @inheritDoc
    */
-  public int getRpcOpsDiscardedOpsNum() {
-    return myMetrics.rpcDiscardedOps.getPreviousIntervalNumOps();
-  }
-
-  /**
-   * @inheritDoc
-   */
-  public long getRpcOpsDiscardedOpsQtime() {
-    return myMetrics.rpcDiscardedOps.getPreviousIntervalAverageTime();
-  }
-  
-  /**
-   * @inheritDoc
-   */
   public int getNumOpenConnections() {
     return myServer.getNumOpenConnections();
   }
@@ -129,6 +115,5 @@
   public void resetAllMinMax() {
     myMetrics.rpcProcessingTime.resetMinMax();
     myMetrics.rpcQueueTime.resetMinMax();
-    myMetrics.rpcDiscardedOps.resetMinMax();
   }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java Mon May  5 14:24:25 2008
@@ -86,19 +86,6 @@
    */
   long getRpcOpsAvgQueueTimeMax();
   
-  
-  /**
-   * Number of Discarded RPC operations due to timeout in the last interval
-   * @return number of operations
-   */
-  int getRpcOpsDiscardedOpsNum();
-  
-  /**
-   * Average Queued time for Discarded RPC Operations in last interval
-   * @return time in msec
-   */
-  long getRpcOpsDiscardedOpsQtime();
-  
   /**
    * Reset all min max times
    */

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon May  5 14:24:25 2008
@@ -333,7 +333,6 @@
   }
 
   JobSubmissionProtocol jobSubmitClient;
-  private JobSubmissionProtocol rpcProxy;
   
   FileSystem fs = null;
 
@@ -384,8 +383,7 @@
     if ("local".equals(tracker)) {
       this.jobSubmitClient = new LocalJobRunner(conf);
     } else {
-      this.rpcProxy = createRPCProxy(JobTracker.getAddress(conf), conf);
-      this.jobSubmitClient = createRetryProxy(this.rpcProxy);
+      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
     }        
   }
 
@@ -395,27 +393,6 @@
         JobSubmissionProtocol.versionID, addr, conf,
         NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
   }
-  /**
-   * Create a proxy JobSubmissionProtocol that retries timeouts.
-   * 
-   * @param addr the address to connect to.
-   * @param conf the server's configuration.
-   * @return a proxy object that will retry timeouts.
-   * @throws IOException
-   */
-  private JobSubmissionProtocol createRetryProxy(JobSubmissionProtocol raw
-                                            ) throws IOException {
-    RetryPolicy backoffPolicy =
-      RetryPolicies.retryUpToMaximumCountWithProportionalSleep
-      (5, 10, java.util.concurrent.TimeUnit.SECONDS);
-    Map<Class<? extends Exception>, RetryPolicy> handlers = 
-      new HashMap<Class<? extends Exception>, RetryPolicy>();
-    handlers.put(SocketTimeoutException.class, backoffPolicy);
-    RetryPolicy backoffTimeOuts = 
-      RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,handlers);
-    return (JobSubmissionProtocol)
-      RetryProxy.create(JobSubmissionProtocol.class, raw, backoffTimeOuts);
-  }
 
   /**
    * Build a job client, connect to the indicated job tracker.
@@ -425,15 +402,16 @@
    */
   public JobClient(InetSocketAddress jobTrackAddr, 
                    Configuration conf) throws IOException {
-    rpcProxy =  createRPCProxy(jobTrackAddr, conf);
-    jobSubmitClient = createRetryProxy(rpcProxy);
+    jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
   }
 
   /**
    * Close the <code>JobClient</code>.
    */
   public synchronized void close() throws IOException {
-    RPC.stopProxy(rpcProxy);
+    if (!(jobSubmitClient instanceof LocalJobRunner)) {
+      RPC.stopProxy(jobSubmitClient);
+    }
   }
 
   /**

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java Mon May  5 14:24:25 2008
@@ -22,6 +22,7 @@
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.net.NetUtils;
 
 import java.util.Random;
@@ -36,9 +37,13 @@
 public class TestIPC extends TestCase {
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.TestIPC");
-
-  private static Configuration conf = new Configuration();
   
+  final private static Configuration conf = new Configuration();
+  final static private int PING_INTERVAL = 1000;
+  
+  static {
+    Client.setPingInterval(conf, PING_INTERVAL);
+  }
   public TestIPC(String name) { super(name); }
 
   private static final Random RANDOM = new Random();
@@ -51,14 +56,13 @@
     public TestServer(int handlerCount, boolean sleep) 
       throws IOException {
       super(ADDRESS, 0, LongWritable.class, handlerCount, conf);
-      this.setTimeout(1000);
       this.sleep = sleep;
     }
 
     public Writable call(Writable param, long receivedTime) throws IOException {
       if (sleep) {
         try {
-          Thread.sleep(RANDOM.nextInt(200));      // sleep a bit
+          Thread.sleep(RANDOM.nextInt(2*PING_INTERVAL));      // sleep a bit
         } catch (InterruptedException e) {}
       }
       return param;                               // echo param as result
@@ -75,7 +79,6 @@
       this.client = client;
       this.server = server;
       this.count = count;
-      client.setTimeout(1000);
     }
 
     public void run() {
@@ -90,7 +93,7 @@
             break;
           }
         } catch (Exception e) {
-          LOG.fatal("Caught: " + e);
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
           failed = true;
         }
       }
@@ -108,7 +111,6 @@
       this.client = client;
       this.addresses = addresses;
       this.count = count;
-      client.setTimeout(1000);
     }
 
     public void run() {
@@ -126,7 +128,7 @@
             }
           }
         } catch (Exception e) {
-          LOG.fatal("Caught: " + e);
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
           failed = true;
         }
       }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java Mon May  5 14:24:25 2008
@@ -64,7 +64,6 @@
     public TestServer(final int handlerCount, final boolean sleep) 
                                               throws IOException {
       super(ADDRESS, 0, BytesWritable.class, handlerCount, conf);
-      this.setTimeout(1000);
       // Set the buffer size to half of the maximum parameter/result size 
       // to force the socket to block
       this.setSocketSendBufSize(BYTE_COUNT / 2);
@@ -95,7 +94,6 @@
       this.client = client;
       this.address = address;
       this.count = count;
-      client.setTimeout(1000);
     }
 
     @Override

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java Mon May  5 14:24:25 2008
@@ -71,7 +71,7 @@
     DistributedFileSystem dfs = (DistributedFileSystem) fs;
 
     JobClient client = null;
-
+    MiniMRCluster mr = null;
     try {
       // This will test RPC to the NameNode only.
       // could we test Client-DataNode connections?
@@ -85,7 +85,7 @@
       assertTrue(dfs.exists(filePath));
 
       // This will test TPC to a JobTracker
-      MiniMRCluster mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
+      mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
       final int jobTrackerPort = mr.getJobTrackerPort();
 
       JobConf jconf = new JobConf(cconf);
@@ -128,6 +128,13 @@
         // nothing we can do
         ignored.printStackTrace();
       }
+      if (mr != null) {
+        try {
+          mr.shutdown();
+        } catch (Exception ignored) {
+          ignored.printStackTrace();
+        }
+      }
     }
   }
 }



Mime
View raw message