hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r648697 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapOutputLocation.java src/java/org/apache/hadoop/mapred/ReduceTask.java
Date Wed, 16 Apr 2008 13:44:21 GMT
Author: ddas
Date: Wed Apr 16 06:44:19 2008
New Revision: 648697

URL: http://svn.apache.org/viewvc?rev=648697&view=rev
Log:
HADOOP-3130. Make the connect timeout smaller for getFile. Contributed by Amar Ramesh Kamat
and Runping Qi.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=648697&r1=648696&r2=648697&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr 16 06:44:19 2008
@@ -18,6 +18,9 @@
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().
     (Lohit Vjayarenu via rangadi)
 
+    HADOOP-3130. Make the connect timeout smaller for getFile.
+    (Amar Ramesh Kamat via ddas)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?rev=648697&r1=648696&r2=648697&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Wed Apr 16
06:44:19 2008
@@ -49,6 +49,10 @@
   private String host;
   private int port;
   private String jobId;
+  // basic/unit connection timeout (in milliseconds)
+  private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
+  // default read timeout (in milliseconds)
+  private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
 
   /** RPC constructor **/
   public MapOutputLocation() {
@@ -102,6 +106,51 @@
            "&map=" + mapTaskId;
   }
   
+  /** 
+   * The connection establishment is attempted multiple times and is given up 
+   * only on the last failure. Instead of connecting with a timeout of 
+   * X, we try connecting with a timeout of x < X but multiple times. 
+   */
+  private InputStream getInputStream(URLConnection connection, 
+                                     int connectionTimeout, 
+                                     int readTimeout) 
+  throws IOException {
+    int unit = 0;
+    if (connectionTimeout < 0) {
+      throw new IOException("Invalid timeout "
+                            + "[timeout = " + connectionTimeout + " ms]");
+    } else if (connectionTimeout > 0) {
+      unit = (UNIT_CONNECT_TIMEOUT > connectionTimeout)
+             ? connectionTimeout
+             : UNIT_CONNECT_TIMEOUT;
+    }
+    // set the read timeout to the total timeout
+    connection.setReadTimeout(readTimeout);
+    // set the connect timeout to the unit-connect-timeout
+    connection.setConnectTimeout(unit);
+    while (true) {
+      try {
+        return connection.getInputStream();
+      } catch (IOException ioe) {
+        // update the total remaining connect-timeout
+        connectionTimeout -= unit;
+
+        // throw an exception if we have waited for timeout amount of time
+        // note that the updated value if timeout is used here
+        if (connectionTimeout == 0) {
+          throw ioe;
+        }
+
+        // reset the connect timeout for the last try
+        if (connectionTimeout < unit) {
+          unit = connectionTimeout;
+          // reset the connect time out for the final connect
+          connection.setConnectTimeout(unit);
+        }
+      }
+    }
+  }
+
   /**
    * Get the map output into a local file (either in the inmemory fs or on the 
    * local fs) from the remote server.
@@ -113,7 +162,7 @@
    * @param lDirAlloc the LocalDirAllocator object
    * @param conf the Configuration object
    * @param reduce the reduce id to get for
-   * @param timeout number of ms for connection and read timeout
+   * @param timeout number of milliseconds for connection timeout
    * @return the path of the file that got created
    * @throws IOException when something goes wrong
    */
@@ -125,6 +174,36 @@
                       Configuration conf, int reduce,
                       int timeout, Progressable progressable) 
   throws IOException, InterruptedException {
+    return getFile(inMemFileSys, localFileSys, shuffleMetrics, localFilename, 
+                   lDirAlloc, conf, reduce, timeout, DEFAULT_READ_TIMEOUT, 
+                   progressable);
+  }
+
+  /**
+   * Get the map output into a local file (either in the inmemory fs or on the 
+   * local fs) from the remote server.
+   * We use the file system so that we generate checksum files on the data.
+   * @param inMemFileSys the inmemory filesystem to write the file to
+   * @param localFileSys the local filesystem to write the file to
+   * @param shuffleMetrics the metrics context
+   * @param localFilename the filename to write the data into
+   * @param lDirAlloc the LocalDirAllocator object
+   * @param conf the Configuration object
+   * @param reduce the reduce id to get for
+   * @param connectionTimeout number of milliseconds for connection timeout
+   * @param readTimeout number of milliseconds for read timeout
+   * @return the path of the file that got created
+   * @throws IOException when something goes wrong
+   */
+  public Path getFile(InMemoryFileSystem inMemFileSys,
+                      FileSystem localFileSys,
+                      ShuffleClientMetrics shuffleMetrics,
+                      Path localFilename, 
+                      LocalDirAllocator lDirAlloc,
+                      Configuration conf, int reduce,
+                      int connectionTimeout, int readTimeout, 
+                      Progressable progressable) 
+  throws IOException, InterruptedException {
     boolean good = false;
     long totalBytes = 0;
     FileSystem fileSys = localFileSys;
@@ -132,11 +211,8 @@
     URL path = new URL(toString() + "&reduce=" + reduce);
     try {
       URLConnection connection = path.openConnection();
-      if (timeout > 0) {
-        connection.setConnectTimeout(timeout);
-        connection.setReadTimeout(timeout);
-      }
-      InputStream input = connection.getInputStream();
+      InputStream input = getInputStream(connection, connectionTimeout, 
+                                         readTimeout); 
       OutputStream output = null;
       
       //We will put a file in memory if it meets certain criteria:

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=648697&r1=648696&r2=648697&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Apr 16 06:44:19
2008
@@ -1030,8 +1030,9 @@
         // loop until we get all required outputs
         while (!neededOutputs.isEmpty() && mergeThrowable == null) {
           
-          LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
-          " map output(s)");
+          LOG.info(reduceTask.getTaskId() + " Need another " 
+                   + neededOutputs.size() + " map output(s) where " 
+                   + numInFlight + " is already in progress");
           
           try {
             // Put the hash entries for the failed fetches. Entries here



Mime
View raw message