hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r789402 - in /hadoop/mapreduce/trunk: CHANGES.txt src/tools/org/apache/hadoop/tools/DistCp.java
Date Mon, 29 Jun 2009 18:20:01 GMT
Author: szetszwo
Date: Mon Jun 29 18:20:01 2009
New Revision: 789402

URL: http://svn.apache.org/viewvc?rev=789402&view=rev
Log:
MAPREDUCE-646. Increase srcfilelist replication number in dictcp job.  Contributed by Ravi
Gummadi

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=789402&r1=789401&r2=789402&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jun 29 18:20:01 2009
@@ -38,6 +38,9 @@
     MAPREDUCE-416. Moves the history file to a "done" folder whenever a job 
     completes. (Amar Kamat via ddas)
 
+    MAPREDUCE-646. Increase srcfilelist replication number in dictcp job.
+    (Ravi Gummadi via szetszwo)
+
   BUG FIXES
     HADOOP-4687. MapReduce is split from Hadoop Core. It is a subproject under 
     Hadoop (Owen O'Malley)

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=789402&r1=789401&r2=789402&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Mon Jun 29 18:20:01
2009
@@ -945,14 +945,16 @@
    * @param job The job to configure
    * @return Count of maps to run.
    */
-  private static void setMapCount(long totalBytes, JobConf job) 
+  private static int setMapCount(long totalBytes, JobConf job) 
       throws IOException {
     int numMaps =
       (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP));
     numMaps = Math.min(numMaps, 
         job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE *
           new JobClient(job).getClusterStatus().getTaskTrackers()));
-    job.setNumMapTasks(Math.max(numMaps, 1));
+    numMaps = Math.max(numMaps, 1);
+    job.setNumMapTasks(numMaps);
+    return numMaps;
   }
 
   /** Fully delete dir */
@@ -987,6 +989,28 @@
   }
 
   /**
+   * Increase the replication factor of _distcp_src_files to
+   * sqrt(min(maxMapsOnCluster, numMaps)). This is to reduce the chance of
+   * failing of distcp because of "not having a replication of _distcp_src_files
+   * available for reading for some maps".
+   */
+  private static void setReplication(Configuration conf, JobConf jobConf,
+                         Path srcfilelist, int numMaps) throws IOException {
+    int numMaxMaps = new JobClient(jobConf).getClusterStatus().getMaxMapTasks();
+    short replication = (short) Math.ceil(
+                                Math.sqrt(Math.min(numMaxMaps, numMaps)));
+    FileSystem fs = srcfilelist.getFileSystem(conf);
+    FileStatus srcStatus = fs.getFileStatus(srcfilelist);
+
+    if (srcStatus.getReplication() < replication) {
+      if (!fs.setReplication(srcfilelist, replication)) {
+        throw new IOException("Unable to increase the replication of file " +
+                              srcfilelist);
+      }
+    }
+  }
+  
+  /**
    * Initialize DFSCopyFileMapper specific job-configuration.
    * @param conf : The dfs/mapred configuration.
    * @param jobConf : The handle to the jobConf object to be initialized.
@@ -1142,6 +1166,10 @@
       checkAndClose(dir_writer);
     }
 
+    int mapCount = setMapCount(byteCount, jobConf);
+    // Increase the replication of _distcp_src_files, if needed
+    setReplication(conf, jobConf, srcfilelist, mapCount);
+    
     FileStatus dststatus = null;
     try {
       dststatus = dstfs.getFileStatus(args.dst);
@@ -1173,7 +1201,7 @@
     LOG.info("bytesToCopyCount=" + StringUtils.humanReadableInt(byteCount));
     jobConf.setInt(SRC_COUNT_LABEL, srcCount);
     jobConf.setLong(TOTAL_SIZE_LABEL, byteCount);
-    setMapCount(byteCount, jobConf);
+    
     return (fileCount + dirCount) > 0;
   }
 



Mime
View raw message