hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r656828 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/util/CopyFiles.java src/test/org/apache/hadoop/fs/TestCopyFiles.java
Date Thu, 15 May 2008 20:46:51 GMT
Author: cdouglas
Date: Thu May 15 13:46:51 2008
New Revision: 656828

URL: http://svn.apache.org/viewvc?rev=656828&view=rev
Log:
HADOOP-3350. Add an argument to distcp to permit the user to limit the
number of maps.


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=656828&r1=656827&r2=656828&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu May 15 13:46:51 2008
@@ -134,6 +134,9 @@
     HADOOP-3355. Enhances Configuration class to accept hex numbers for getInt
     and getLong. (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-3350. Add an argument to distcp to permit the user to limit the
+    number of maps. (cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=656828&r1=656827&r2=656828&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Thu May 15 13:46:51 2008
@@ -74,6 +74,7 @@
     "\n                       -p alone is equivalent to -prbugp" +
     "\n-i                     Ignore failures" +
     "\n-log <logdir>          Write logs to <logdir>" +
+    "\n-m <num_maps>          Maximum number of simultaneous copies" +
     "\n-overwrite             Overwrite destination" +
     "\n-update                Overwrite if src size different from dst size" +
     "\n-f <urilist_uri>       Use list at <urilist_uri> as src list" +
@@ -139,10 +140,12 @@
   static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
   static final String DST_DIR_LABEL = NAME + ".dest.path";
   static final String JOB_DIR_LABEL = NAME + ".job.dir";
+  static final String MAX_MAPS_LABEL = NAME + ".max.map.tasks";
   static final String SRC_LIST_LABEL = NAME + ".src.list";
   static final String SRC_COUNT_LABEL = NAME + ".src.count";
   static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
   static final String DST_DIR_LIST_LABEL = NAME + ".dst.dir.list";
+  static final String BYTES_PER_MAP_LABEL = NAME + ".bytes.per.map";
   static final String PRESERVE_STATUS_LABEL
       = Options.PRESERVE_STATUS.propertyname + ".value";
 
@@ -708,6 +711,16 @@
             throw new IllegalArgumentException("logdir not specified in -log");
           }
           log = new Path(args[idx]);
+        } else if ("-m".equals(args[idx])) {
+          if (++idx == args.length) {
+            throw new IllegalArgumentException("num_maps not specified in -m");
+          }
+          try {
+            conf.setInt(MAX_MAPS_LABEL, Integer.valueOf(args[idx]));
+          } catch (NumberFormatException e) {
+            throw new IllegalArgumentException("Invalid argument to -m: " +
+                                               args[idx]);
+          }
         } else if ('-' == args[idx].codePointAt(0)) {
           throw new IllegalArgumentException("Invalid switch " + args[idx]);
         } else if (idx == args.length -1) {
@@ -793,17 +806,22 @@
 
   /**
    * Calculate how many maps to run.
-   * Number of maps is bounded by a minimum of the cumulative size of the copy /
-   * BYTES_PER_MAP and at most MAX_MAPS_PER_NODE * nodes in the
-   * cluster.
+   * Number of maps is bounded by a minimum of the cumulative size of the
+   * copy / (distcp.bytes.per.map, default BYTES_PER_MAP or -m on the
+   * command line) and at most (distcp.max.map.tasks, default
+   * MAX_MAPS_PER_NODE * nodes in the cluster).
    * @param totalBytes Count of total bytes for job
-   * @param numNodes the number of nodes in cluster
+   * @param job The job to configure
    * @return Count of maps to run.
    */
-  private static int getMapCount(long totalBytes, int numNodes) {
-    int numMaps = (int)(totalBytes / BYTES_PER_MAP);
-    numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
-    return Math.max(numMaps, 1);
+  private static void setMapCount(long totalBytes, JobConf job) 
+      throws IOException {
+    int numMaps =
+      (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP));
+    numMaps = Math.min(numMaps, 
+        job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE *
+          new JobClient(job).getClusterStatus().getTaskTrackers()));
+    job.setNumMapTasks(Math.max(numMaps, 1));
   }
 
   /** Fully delete dir */
@@ -989,8 +1007,7 @@
     LOG.info("srcCount=" + srcCount);
     jobConf.setInt(SRC_COUNT_LABEL, srcCount);
     jobConf.setLong(TOTAL_SIZE_LABEL, cbsize);
-    jobConf.setNumMapTasks(getMapCount(cbsize,
-        new JobClient(jobConf).getClusterStatus().getTaskTrackers()));
+    setMapCount(cbsize, jobConf);
   }
 
   static private void checkDuplication(FileSystem fs, Path file, Path sorted,

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=656828&r1=656827&r2=656828&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Thu May 15 13:46:51
2008
@@ -27,6 +27,8 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.util.CopyFiles;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -519,4 +521,50 @@
       if (cluster != null) { cluster.shutdown(); }
     }
   }
+
+  public void testMapCount() throws Exception {
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    try {
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 3, true, null);
+      FileSystem fs = dfs.getFileSystem();
+      namenode = fs.getUri().toString();
+      mr = new MiniMRCluster(3, namenode, 1);
+      MyFile[] files = createFiles(fs.getUri(), "/srcdat");
+      long totsize = 0;
+      for (MyFile f : files) {
+        totsize += f.getSize();
+      }
+      JobConf job = mr.createJobConf();
+      job.setLong("distcp.bytes.per.map", totsize / 3);
+      ToolRunner.run(new CopyFiles(job),
+          new String[] {"-m", "100",
+                        "-log",
+                        namenode+"/logs",
+                        namenode+"/srcdat",
+                        namenode+"/destdat"});
+      assertTrue("Source and destination directories do not match.",
+                 checkFiles(namenode, "/destdat", files));
+      FileStatus[] logs = fs.listStatus(new Path(namenode+"/logs"));
+      // rare case where splits are exact, logs.length can be 4
+      assertTrue("Unexpected map count", logs.length == 5 || logs.length == 4);
+
+      deldir(namenode, "/destdat");
+      deldir(namenode, "/logs");
+      ToolRunner.run(new CopyFiles(job),
+          new String[] {"-m", "1",
+                        "-log",
+                        namenode+"/logs",
+                        namenode+"/srcdat",
+                        namenode+"/destdat"});
+      logs = fs.listStatus(new Path(namenode+"/logs"));
+      assertTrue("Unexpected map count", logs.length == 2);
+    } finally {
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown(); }
+    }
+  }
+
 }



Mime
View raw message