Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 90852 invoked from network); 4 Mar 2011 03:28:15 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:28:15 -0000 Received: (qmail 6410 invoked by uid 500); 4 Mar 2011 03:28:15 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 6369 invoked by uid 500); 4 Mar 2011 03:28:15 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 6077 invoked by uid 99); 4 Mar 2011 03:28:14 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:28:14 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:28:04 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1CC322388C0F; Fri, 4 Mar 2011 03:27:43 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1076978 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/streaming/src/test/org/apache/hadoop/streaming/ core/org/apache/hadoop/util/ docs/src/documentation/content/xdocs/ mapred/org/apache/hadoop/mapred/ test/org/apa... Date: Fri, 04 Mar 2011 03:27:42 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304032743.1CC322388C0F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Fri Mar 4 03:27:42 2011 New Revision: 1076978 URL: http://svn.apache.org/viewvc?rev=1076978&view=rev Log: commit 46247fca6d3a0fe6e475109a6ccfdc95af454388 Author: Yahoo\! Date: Tue Aug 18 09:22:17 2009 -0700 Apply Patch for MAPREDUCE:478 from: http://issues.apache.org/jira/secure/attachment/12416638/MAPREDUCE-478_3_20090814_yhadoop.patch Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/TestChild.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRChildTask.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java?rev=1076978&r1=1076977&r2=1076978&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java Fri Mar 4 03:27:42 2011 @@ -76,9 +76,18 @@ public class TestMultipleCachefiles exte "-jobconf", strNamenode, "-jobconf", strJobtracker, "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"), - "-jobconf", "mapred.child.java.opts=-Dcontrib.name=" + System.getProperty("contrib.name") + " " + - "-Dbuild.test=" + System.getProperty("build.test") + " " + - conf.get("mapred.child.java.opts",""), + "-jobconf", + JobConf.MAPRED_MAP_TASK_JAVA_OPTS + "=" + + "-Dcontrib.name=" + System.getProperty("contrib.name") + " " + + "-Dbuild.test=" + System.getProperty("build.test") + " " + + conf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, + conf.get(JobConf.MAPRED_TASK_JAVA_OPTS, "")), + "-jobconf", + JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + "=" + + "-Dcontrib.name=" + System.getProperty("contrib.name") + " " + + "-Dbuild.test=" + System.getProperty("build.test") + " " + + conf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, + conf.get(JobConf.MAPRED_TASK_JAVA_OPTS, "")), "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#" + mapString, "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE_2 + "#" + mapString2 }; Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?rev=1076978&r1=1076977&r2=1076978&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Fri Mar 4 03:27:42 2011 @@ -73,9 +73,18 @@ public class TestSymLink extends TestCas "-jobconf", strNamenode, "-jobconf", strJobtracker, "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"), - "-jobconf", "mapred.child.java.opts=-Dcontrib.name=" + System.getProperty("contrib.name") + " " + - "-Dbuild.test=" + System.getProperty("build.test") + " " + - conf.get("mapred.child.java.opts",""), + "-jobconf", + JobConf.MAPRED_MAP_TASK_JAVA_OPTS + "=" + + "-Dcontrib.name=" + System.getProperty("contrib.name") + " " + + "-Dbuild.test=" + System.getProperty("build.test") + " " + + conf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, + conf.get(JobConf.MAPRED_TASK_JAVA_OPTS, "")), + "-jobconf", + JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + "=" + + "-Dcontrib.name=" + System.getProperty("contrib.name") + " " + + "-Dbuild.test=" + System.getProperty("build.test") + " " + + conf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, + conf.get(JobConf.MAPRED_TASK_JAVA_OPTS, "")), "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#testlink" }; Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java?rev=1076978&r1=1076977&r2=1076978&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java Fri Mar 4 03:27:42 2011 @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.TestMiniMRWithDFS; @@ -54,7 +55,7 @@ public class TestUlimit extends TestCase "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer", "-numReduceTasks", "0", "-jobconf", "mapred.map.tasks=1", - "-jobconf", "mapred.child.ulimit=" + memLimit, + "-jobconf", JobConf.MAPRED_MAP_TASK_ULIMIT + "=" + memLimit, "-jobconf", "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort(), "-jobconf", "fs.default.name=" + "hdfs://localhost:" Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java?rev=1076978&r1=1076977&r2=1076978&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java Fri Mar 4 03:27:42 2011 @@ -63,6 +63,31 @@ abstract public class Shell { /** If or not script timed out*/ private AtomicBoolean timedOut; + /** a Unix command to get ulimit of a process. */ + public static final String ULIMIT_COMMAND = "ulimit"; + + /** + * Get the Unix command for setting the maximum virtual memory available + * to a given child process. This is only relevant when we are forking a + * process from within the Mapper or the Reducer implementations. + * Also see Hadoop Pipes and Hadoop Streaming. + * + * It also checks to ensure that we are running on a *nix platform else + * (e.g. in Cygwin/Windows) it returns null. + * @param memoryLimit virtual memory limit + * @return a String[] with the ulimit command arguments or + * null if we are running on a non *nix platform or + * if the limit is unspecified. + */ + public static String[] getUlimitMemoryCommand(int memoryLimit) { + // ulimit isn't supported on Windows + if (WINDOWS) { + return null; + } + + return new String[] {ULIMIT_COMMAND, "-v", String.valueOf(memoryLimit)}; + } + /** * Get the Unix command for setting the maximum virtual memory available * to a given child process. This is only relevant when we are forking a @@ -77,7 +102,9 @@ abstract public class Shell { * @return a String[] with the ulimit command arguments or * null if we are running on a non *nix platform or * if the limit is unspecified. + * @deprecated Use {@link #getUlimitMemoryCommand(int)} */ + @Deprecated public static String[] getUlimitMemoryCommand(Configuration conf) { // ulimit isn't supported on Windows if (WINDOWS) { @@ -92,8 +119,8 @@ abstract public class Shell { // Parse it to ensure it is legal/sane int memoryLimit = Integer.valueOf(ulimit); - - return new String[] {"ulimit", "-v", String.valueOf(memoryLimit)}; + + return getUlimitMemoryCommand(memoryLimit); } /** Set to true on Windows platforms */ Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=1076978&r1=1076977&r2=1076978&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml Fri Mar 4 03:27:42 2011 @@ -392,10 +392,18 @@ conf/mapred-site.xml - mapred.child.java.opts + mapred.map.child.java.opts -Xmx512M - Larger heap-size for child jvms of maps/reduces. + Larger heap-size for child jvms of maps. + + + + conf/mapred-site.xml + mapred.reduce.child.java.opts + -Xmx512M + + Larger heap-size for child jvms of reduces. @@ -465,9 +473,17 @@ conf/mapred-site.xml - mapred.child.java.opts + mapred.map.child.java.opts + -Xmx512M + + Larger heap-size for child jvms of maps. + + + + conf/mapred-site.xml + mapred.reduce.child.java.opts -Xmx1024M - Larger heap-size for child jvms of maps/reduces. + Larger heap-size for child jvms of reduces. Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=1076978&r1=1076977&r2=1076978&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Fri Mar 4 03:27:42 2011 @@ -1051,24 +1051,25 @@

The child-task inherits the environment of the parent TaskTracker. The user can specify additional options to the - child-jvm via the mapred.child.java.opts configuration - parameter in the JobConf such as non-standard paths for the - run-time linker to search shared libraries via + child-jvm via the mapred.{map|reduce}.child.java.opts + configuration parameter in the JobConf such as non-standard + paths for the run-time linker to search shared libraries via -Djava.library.path=<> etc. If the - mapred.child.java.opts contains the symbol @taskid@ - it is interpolated with value of taskid of the map/reduce - task.

+ mapred.{map|reduce}.child.java.opts parameters contains the + symbol @taskid@ it is interpolated with value of + taskid of the map/reduce task.

Here is an example with multiple arguments and substitutions, showing jvm GC logging, and start of a passwordless JVM JMX agent so that it can connect with jconsole and the likes to watch child memory, threads and get thread dumps. It also sets the maximum heap-size of the - child jvm to 512MB and adds an additional path to the - java.library.path of the child-jvm.

+ map and reduce child jvm to 512MB & 1024MB respectively. It also + adds an additional path to the java.library.path of the + child-jvm.

<property>
-   <name>mapred.child.java.opts</name>
+   <name>mapred.map.child.java.opts</name>
  <value>
     -Xmx512M -Djava.library.path=/home/mycompany/lib @@ -1080,19 +1081,33 @@ </property>

+

+ <property>
+   <name>mapred.reduce.child.java.opts</name>
+   <value>
+      + -Xmx1024M -Djava.library.path=/home/mycompany/lib + -verbose:gc -Xloggc:/tmp/@taskid@.gc
+      + -Dcom.sun.management.jmxremote.authenticate=false + -Dcom.sun.management.jmxremote.ssl=false
+   </value>
+ </property> +

+
Memory management

Users/admins can also specify the maximum virtual memory of the launched child-task, and any sub-process it launches - recursively, using mapred.child.ulimit. Note that - the value set here is a per process limit. - The value for mapred.child.ulimit should be specified - in kilo bytes (KB). And also the value must be greater than + recursively, using mapred.{map|reduce}.child.ulimit. Note + that the value set here is a per process limit. + The value for mapred.{map|reduce}.child.ulimit should be + specified in kilo bytes (KB). And also the value must be greater than or equal to the -Xmx passed to JavaVM, else the VM might not start.

-

Note: mapred.child.java.opts are used only for - configuring the launched child tasks from task tracker. Configuring +

Note: mapred.{map|reduce}.child.java.opts are used only + for configuring the launched child tasks from task tracker. Configuring the memory options for daemons is documented in cluster_setup.html

@@ -1229,7 +1244,7 @@ shuffle. mapred.job.shuffle.input.buffer.percentfloat The percentage of memory- relative to the maximum heapsize - as typically specified in mapred.child.java.opts- + as typically specified in mapred.reduce.child.java.opts- that can be allocated to storing map outputs during the shuffle. Though some memory should be set aside for the framework, in general it is advantageous to set this high Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=1076978&r1=1076977&r2=1076978&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java Fri Mar 4 03:27:42 2011 @@ -158,6 +158,153 @@ public class JobConf extends Configurati "mapred.job.reduce.memory.mb"; /** + * Configuration key to set the java command line options for the child + * map and reduce tasks. + * + * Java opts for the task tracker child processes. + * The following symbol, if present, will be interpolated: @taskid@. + * It is replaced by current TaskID. Any other occurrences of '@' will go + * unchanged. + * For example, to enable verbose gc logging to a file named for the taskid in + * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: + * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc + * + * The configuration variable {@link #MAPRED_TASK_ULIMIT} can be used to + * control the maximum virtual memory of the child processes. + * + * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass + * other environment variables to the child processes. + * + * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or + * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS} + */ + @Deprecated + public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts"; + + /** + * Configuration key to set the java command line options for the map tasks. + * + * Java opts for the task tracker child map processes. + * The following symbol, if present, will be interpolated: @taskid@. + * It is replaced by current TaskID. Any other occurrences of '@' will go + * unchanged. + * For example, to enable verbose gc logging to a file named for the taskid in + * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: + * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc + * + * The configuration variable {@link #MAPRED_MAP_TASK_ULIMIT} can be used to + * control the maximum virtual memory of the map processes. + * + * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass + * other environment variables to the map processes. + */ + public static final String MAPRED_MAP_TASK_JAVA_OPTS = + "mapred.map.child.java.opts"; + + /** + * Configuration key to set the java command line options for the reduce tasks. + * + * Java opts for the task tracker child reduce processes. + * The following symbol, if present, will be interpolated: @taskid@. + * It is replaced by current TaskID. Any other occurrences of '@' will go + * unchanged. + * For example, to enable verbose gc logging to a file named for the taskid in + * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: + * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc + * + * The configuration variable {@link #MAPRED_REDUCE_TASK_ULIMIT} can be used + * to control the maximum virtual memory of the reduce processes. + * + * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to + * pass process environment variables to the reduce processes. + */ + public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = + "mapred.reduce.child.java.opts"; + + public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m"; + + /** + * Configuration key to set the maximum virutal memory available to the child + * map and reduce tasks (in kilo-bytes). + * + * Note: This must be greater than or equal to the -Xmx passed to the JavaVM + * via {@link #MAPRED_TASK_JAVA_OPTS}, else the VM might not start. + * + * @deprecated Use {@link #MAPRED_MAP_TASK_ULIMIT} or + * {@link #MAPRED_REDUCE_TASK_ULIMIT} + */ + @Deprecated + public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit"; + + /** + * Configuration key to set the maximum virutal memory available to the + * map tasks (in kilo-bytes). + * + * Note: This must be greater than or equal to the -Xmx passed to the JavaVM + * via {@link #MAPRED_MAP_TASK_JAVA_OPTS}, else the VM might not start. + */ + public static final String MAPRED_MAP_TASK_ULIMIT = "mapred.map.child.ulimit"; + + /** + * Configuration key to set the maximum virutal memory available to the + * reduce tasks (in kilo-bytes). + * + * Note: This must be greater than or equal to the -Xmx passed to the JavaVM + * via {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}, else the VM might not start. + */ + public static final String MAPRED_REDUCE_TASK_ULIMIT = + "mapred.reduce.child.ulimit"; + + /** + * Configuration key to set the environment of the child map/reduce tasks. + * + * The format of the value is k1=v1,k2=v2. Further it can + * reference existing environment variables via $key. + * + * Example: + *
    + *
  • A=foo - This will set the env variable A to foo.
  • + *
  • B=$X:c This is inherit tasktracker's X env variable.
  • + *
+ * + * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or + * {@link #MAPRED_REDUCE_TASK_ENV} + */ + @Deprecated + public static final String MAPRED_TASK_ENV = "mapred.child.env"; + + /** + * Configuration key to set the maximum virutal memory available to the + * map tasks. + * + * The format of the value is k1=v1,k2=v2. Further it can + * reference existing environment variables via $key. + * + * Example: + *
    + *
  • A=foo - This will set the env variable A to foo.
  • + *
  • B=$X:c This is inherit tasktracker's X env variable.
  • + *
+ */ + public static final String MAPRED_MAP_TASK_ENV = "mapred.map.child.env"; + + /** + * Configuration key to set the maximum virutal memory available to the + * reduce tasks. + * + * The format of the value is k1=v1,k2=v2. Further it can + * reference existing environment variables via $key. + * + * Example: + *
    + *
  • A=foo - This will set the env variable A to foo.
  • + *
  • B=$X:c This is inherit tasktracker's X env variable.
  • + *
+ */ + public static final String MAPRED_REDUCE_TASK_ENV = + "mapred.reduce.child.env"; + + /** * Construct a map/reduce job configuration. */ public JobConf() { Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java?rev=1076978&r1=1076977&r2=1076978&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java Fri Mar 4 03:27:42 2011 @@ -23,7 +23,7 @@ import org.apache.hadoop.mapred.TaskTrac /** Runs a map task. */ class MapTaskRunner extends TaskRunner { - + public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) { super(task, tracker, conf); } @@ -43,4 +43,23 @@ class MapTaskRunner extends TaskRunner { LOG.info(getTask()+" done; removing files."); mapOutputFile.removeAll(getTask().getTaskID()); } + + @Override + public String getChildJavaOpts(JobConf jobConf, String defaultValue) { + return jobConf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, + super.getChildJavaOpts(jobConf, + JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)); + } + + @Override + public int getChildUlimit(JobConf jobConf) { + return jobConf.getInt(JobConf.MAPRED_MAP_TASK_ULIMIT, + super.getChildUlimit(jobConf)); + } + + @Override + public String getChildEnv(JobConf jobConf) { + return jobConf.get(JobConf.MAPRED_MAP_TASK_ENV, super.getChildEnv(jobConf)); + } + } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=1076978&r1=1076977&r2=1076978&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Mar 4 03:27:42 2011 @@ -23,7 +23,7 @@ import org.apache.hadoop.mapred.TaskTrac /** Runs a reduce task. */ class ReduceTaskRunner extends TaskRunner { - + public ReduceTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) throws IOException { @@ -48,4 +48,24 @@ class ReduceTaskRunner extends TaskRunne getTask().getProgress().setStatus("closed"); mapOutputFile.removeAll(getTask().getTaskID()); } + + @Override + public String getChildJavaOpts(JobConf jobConf, String defaultValue) { + return jobConf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, + super.getChildJavaOpts(jobConf, + JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)); + } + + @Override + public int getChildUlimit(JobConf jobConf) { + return jobConf.getInt(JobConf.MAPRED_REDUCE_TASK_ULIMIT, + super.getChildUlimit(jobConf)); + } + + @Override + public String getChildEnv(JobConf jobConf) { + return jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV, + super.getChildEnv(jobConf)); + } + } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1076978&r1=1076977&r2=1076978&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 4 03:27:42 2011 @@ -99,6 +99,47 @@ abstract class TaskRunner extends Thread return str.toString(); } + /** + * Get the java command line options for the child map/reduce tasks. + * @param jobConf job configuration + * @param defaultValue default value + * @return the java command line options for child map/reduce tasks + * @deprecated Use command line options specific to map or reduce tasks set + * via {@link JobConf#MAPRED_MAP_TASK_JAVA_OPTS} or + * {@link JobConf#MAPRED_REDUCE_TASK_JAVA_OPTS} + */ + @Deprecated + public String getChildJavaOpts(JobConf jobConf, String defaultValue) { + return jobConf.get(JobConf.MAPRED_TASK_JAVA_OPTS, defaultValue); + } + + /** + * Get the maximum virtual memory of the child map/reduce tasks. + * @param jobConf job configuration + * @return the maximum virtual memory of the child task or -1 if + * none is specified + * @deprecated Use limits specific to the map or reduce tasks set via + * {@link JobConf#MAPRED_MAP_TASK_ULIMIT} or + * {@link JobConf#MAPRED_REDUCE_TASK_ULIMIT} + */ + @Deprecated + public int getChildUlimit(JobConf jobConf) { + return jobConf.getInt(JobConf.MAPRED_TASK_ULIMIT, -1); + } + + /** + * Get the environment variables for the child map/reduce tasks. + * @param jobConf job configuration + * @return the environment variables for the child map/reduce tasks or + * null if unspecified + * @deprecated Use environment variables specific to the map or reduce tasks + * set via {@link JobConf#MAPRED_MAP_TASK_ENV} or + * {@link JobConf#MAPRED_REDUCE_TASK_ENV} + */ + public String getChildEnv(JobConf jobConf) { + return jobConf.get(JobConf.MAPRED_TASK_ENV); + } + @Override public final void run() { String errorInfo = "Child Error"; @@ -273,8 +314,8 @@ abstract class TaskRunner extends Thread // Add child (task) java-vm options. // - // The following symbols if present in mapred.child.java.opts value are - // replaced: + // The following symbols if present in mapred.{map|reduce}.child.java.opts + // value are replaced: // + @taskid@ is interpolated with value of TaskID. // Other occurrences of @ will not be altered. // @@ -284,14 +325,23 @@ abstract class TaskRunner extends Thread // and get thread dumps. // // - // mapred.child.java.opts - // -verbose:gc -Xloggc:/tmp/@taskid@.gc \ + // mapred.map.child.java.opts + // -Xmx 512M -verbose:gc -Xloggc:/tmp/@taskid@.gc \ + // -Dcom.sun.management.jmxremote.authenticate=false \ + // -Dcom.sun.management.jmxremote.ssl=false \ + // + // + // + // + // mapred.reduce.child.java.opts + // -Xmx 1024M -verbose:gc -Xloggc:/tmp/@taskid@.gc \ // -Dcom.sun.management.jmxremote.authenticate=false \ // -Dcom.sun.management.jmxremote.ssl=false \ // // // - String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m"); + String javaOpts = getChildJavaOpts(conf, + JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS); javaOpts = javaOpts.replace("@taskid@", taskid.toString()); String [] javaOptsSplit = javaOpts.split(" "); @@ -302,7 +352,7 @@ abstract class TaskRunner extends Thread // 2. We also add the 'cwd' of the task to it's java.library.path to help // users distribute native libraries via the DistributedCache. // 3. The user can also specify extra paths to be added to the - // java.library.path via mapred.child.java.opts. + // java.library.path via mapred.{map|reduce}.child.java.opts. // String libraryPath = System.getProperty("java.library.path"); if (libraryPath == null) { @@ -372,7 +422,7 @@ abstract class TaskRunner extends Thread tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf); // set memory limit using ulimit if feasible and necessary ... - String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf); + String[] ulimitCmd = Shell.getUlimitMemoryCommand(getChildUlimit(conf)); List setup = null; if (ulimitCmd != null) { setup = new ArrayList(); @@ -399,7 +449,7 @@ abstract class TaskRunner extends Thread env.put("LD_LIBRARY_PATH", ldLibraryPath.toString()); // add the env variables passed by the user - String mapredChildEnv = conf.get("mapred.child.env"); + String mapredChildEnv = getChildEnv(conf); if (mapredChildEnv != null && mapredChildEnv.length() > 0) { String childEnvs[] = mapredChildEnv.split(","); for (String cEnv : childEnvs) { Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRChildTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRChildTask.java?rev=1076978&r1=1076977&r2=1076978&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRChildTask.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRChildTask.java Fri Mar 4 03:27:42 2011 @@ -18,6 +18,8 @@ package org.apache.hadoop.mapred; import java.io.*; +import java.util.Iterator; + import junit.framework.TestCase; import org.apache.commons.logging.Log; @@ -42,6 +44,11 @@ public class TestMiniMRChildTask extends private static final Log LOG = LogFactory.getLog(TestMiniMRChildTask.class.getName()); + private final static String OLD_CONFIGS = "test.old.configs"; + private final static String TASK_OPTS_VAL = "-Xmx200m"; + private final static String MAP_OPTS_VAL = "-Xmx200m"; + private final static String REDUCE_OPTS_VAL = "-Xmx300m"; + private MiniMRCluster mr; private MiniDFSCluster dfs; private FileSystem fileSys; @@ -85,7 +92,8 @@ public class TestMiniMRChildTask extends // configure a job private void configure(JobConf conf, Path inDir, Path outDir, String input, - Class map) + Class map, + Class reduce) throws IOException { // set up the input file system and write input text. FileSystem inFs = inDir.getFileSystem(conf); @@ -104,7 +112,7 @@ public class TestMiniMRChildTask extends // configure the mapred Job which creates a tempfile in map. conf.setJobName("testmap"); conf.setMapperClass(map); - conf.setReducerClass(IdentityReducer.class); + conf.setReducerClass(reduce); conf.setNumMapTasks(1); conf.setNumReduceTasks(0); FileInputFormat.setInputPaths(conf, inDir); @@ -127,7 +135,8 @@ public class TestMiniMRChildTask extends Path outDir, String input) throws IOException { - configure(conf, inDir, outDir, input, MapClass.class); + configure(conf, inDir, outDir, input, + MapClass.class, IdentityReducer.class); FileSystem outFs = outDir.getFileSystem(conf); @@ -147,16 +156,52 @@ public class TestMiniMRChildTask extends outFs.delete(outDir, true); } + private static void checkEnv(String envName, String expValue, String mode) { + String envValue = System.getenv(envName).trim(); + if ("append".equals(mode)) { + if (envValue == null || !envValue.contains(":")) { + throw new RuntimeException("Missing env variable"); + } else { + String parts[] = envValue.split(":"); + // check if the value is appended + if (!parts[parts.length - 1].equals(expValue)) { + throw new RuntimeException("Wrong env variable in append mode"); + } + } + } else { + if (envValue == null || !envValue.equals(expValue)) { + throw new RuntimeException("Wrong env variable in noappend mode"); + } + } + } + // Mappers that simply checks if the desired user env are present or not static class EnvCheckMapper extends MapReduceBase implements Mapper { - private static String PATH; - public void map(WritableComparable key, Writable value, - OutputCollector out, Reporter reporter) - throws IOException { + public void configure(JobConf job) { + boolean oldConfigs = job.getBoolean(OLD_CONFIGS, false); + if (oldConfigs) { + String javaOpts = job.get(JobConf.MAPRED_TASK_JAVA_OPTS); + assertNotNull(JobConf.MAPRED_TASK_JAVA_OPTS + " is null!", + javaOpts); + assertEquals(JobConf.MAPRED_TASK_JAVA_OPTS + " has value of: " + + javaOpts, + javaOpts, TASK_OPTS_VAL); + } else { + String mapJavaOpts = job.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS); + assertNotNull(JobConf.MAPRED_MAP_TASK_JAVA_OPTS + " is null!", + mapJavaOpts); + assertEquals(JobConf.MAPRED_MAP_TASK_JAVA_OPTS + " has value of: " + + mapJavaOpts, + mapJavaOpts, MAP_OPTS_VAL); + } + + String path = job.get("path"); + // check if the pwd is there in LD_LIBRARY_PATH String pwd = System.getenv("PWD"); + assertTrue("LD doesnt contain pwd", System.getenv("LD_LIBRARY_PATH").contains(pwd)); @@ -170,34 +215,69 @@ public class TestMiniMRChildTask extends checkEnv("NEW_PATH", ":/tmp", "noappend"); // check if X=$(tt's X var):/tmp for an old env variable inherited from // the tt - checkEnv("PATH", PATH + ":/tmp", "noappend"); + checkEnv("PATH", path + ":/tmp", "noappend"); } - private void checkEnv(String envName, String expValue, String mode) - throws IOException { - String envValue = System.getenv(envName).trim(); - if ("append".equals(mode)) { - if (envValue == null || !envValue.contains(":")) { - throw new IOException("Missing env variable"); - } else { - String parts[] = envValue.split(":"); - // check if the value is appended - if (!parts[parts.length - 1].equals(expValue)) { - throw new IOException("Wrong env variable in append mode"); - } - } + public void map(WritableComparable key, Writable value, + OutputCollector out, + Reporter reporter) + throws IOException { + } + } + + static class EnvCheckReducer extends MapReduceBase + implements Reducer { + + @Override + public void configure(JobConf job) { + boolean oldConfigs = job.getBoolean(OLD_CONFIGS, false); + if (oldConfigs) { + String javaOpts = job.get(JobConf.MAPRED_TASK_JAVA_OPTS); + assertNotNull(JobConf.MAPRED_TASK_JAVA_OPTS + " is null!", + javaOpts); + assertEquals(JobConf.MAPRED_TASK_JAVA_OPTS + " has value of: " + + javaOpts, + javaOpts, TASK_OPTS_VAL); } else { - if (envValue == null || !envValue.equals(expValue)) { - throw new IOException("Wrong env variable in noappend mode"); - } + String reduceJavaOpts = job.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS); + assertNotNull(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + " is null!", + reduceJavaOpts); + assertEquals(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + " has value of: " + + reduceJavaOpts, + reduceJavaOpts, REDUCE_OPTS_VAL); } + + String path = job.get("path"); + + // check if the pwd is there in LD_LIBRARY_PATH + String pwd = System.getenv("PWD"); + + assertTrue("LD doesnt contain pwd", + System.getenv("LD_LIBRARY_PATH").contains(pwd)); + + // check if X=$X:/abc works for LD_LIBRARY_PATH + checkEnv("LD_LIBRARY_PATH", "/tmp", "append"); + // check if X=/tmp works for an already existing parameter + checkEnv("HOME", "/tmp", "noappend"); + // check if X=/tmp for a new env variable + checkEnv("MY_PATH", "/tmp", "noappend"); + // check if X=$X:/tmp works for a new env var and results into :/tmp + checkEnv("NEW_PATH", ":/tmp", "noappend"); + // check if X=$(tt's X var):/tmp for an old env variable inherited from + // the tt + checkEnv("PATH", path + ":/tmp", "noappend"); + } - - public void configure(JobConf conf) { - PATH = conf.get("path"); + + @Override + public void reduce(WritableComparable key, Iterator values, + OutputCollector output, + Reporter reporter) + throws IOException { } + } - + @Override public void setUp() { try { @@ -265,28 +345,11 @@ public class TestMiniMRChildTask extends public void testTaskEnv(){ try { JobConf conf = mr.createJobConf(); - // initialize input, output directories Path inDir = new Path("testing/wc/input1"); Path outDir = new Path("testing/wc/output1"); - String input = "The input"; - - configure(conf, inDir, outDir, input, EnvCheckMapper.class); - FileSystem outFs = outDir.getFileSystem(conf); - - // test - // - new SET of new var (MY_PATH) - // - set of old var (HOME) - // - append to an old var from modified env (LD_LIBRARY_PATH) - // - append to an old var from tt's env (PATH) - // - append to a new var (NEW_PATH) - conf.set("mapred.child.env", - "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," - + "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp"); - conf.set("path", System.getenv("PATH")); - - JobClient.runJob(conf); + runTestTaskEnv(conf, inDir, outDir, false); outFs.delete(outDir, true); } catch(Exception e) { e.printStackTrace(); @@ -294,4 +357,62 @@ public class TestMiniMRChildTask extends tearDown(); } } + + /** + * Test to test if the user set *old* env variables reflect in the child + * processes. Mainly + * - x=y (x can be a already existing env variable or a new variable) + * - x=$x:y (replace $x with the current value of x) + */ + public void testTaskOldEnv(){ + try { + JobConf conf = mr.createJobConf(); + // initialize input, output directories + Path inDir = new Path("testing/wc/input1"); + Path outDir = new Path("testing/wc/output1"); + FileSystem outFs = outDir.getFileSystem(conf); + runTestTaskEnv(conf, inDir, outDir, true); + outFs.delete(outDir, true); + } catch(Exception e) { + e.printStackTrace(); + fail("Exception in testing child env"); + tearDown(); + } + } + + void runTestTaskEnv(JobConf conf, Path inDir, Path outDir, boolean oldConfigs) + throws IOException { + String input = "The input"; + configure(conf, inDir, outDir, input, + EnvCheckMapper.class, EnvCheckReducer.class); + // test + // - new SET of new var (MY_PATH) + // - set of old var (HOME) + // - append to an old var from modified env (LD_LIBRARY_PATH) + // - append to an old var from tt's env (PATH) + // - append to a new var (NEW_PATH) + String mapTaskEnvKey = JobConf.MAPRED_MAP_TASK_ENV; + String reduceTaskEnvKey = JobConf.MAPRED_MAP_TASK_ENV; + String mapTaskJavaOptsKey = JobConf.MAPRED_MAP_TASK_JAVA_OPTS; + String reduceTaskJavaOptsKey = JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS; + String mapTaskJavaOpts = MAP_OPTS_VAL; + String reduceTaskJavaOpts = REDUCE_OPTS_VAL; + conf.setBoolean(OLD_CONFIGS, oldConfigs); + if (oldConfigs) { + mapTaskEnvKey = reduceTaskEnvKey = JobConf.MAPRED_TASK_ENV; + mapTaskJavaOptsKey = reduceTaskJavaOptsKey = JobConf.MAPRED_TASK_JAVA_OPTS; + mapTaskJavaOpts = reduceTaskJavaOpts = TASK_OPTS_VAL; + } + conf.set(mapTaskEnvKey, + "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," + + "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp"); + conf.set(reduceTaskEnvKey, + "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp," + + "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp"); + conf.set("path", System.getenv("PATH")); + conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts); + conf.set(reduceTaskJavaOptsKey, reduceTaskJavaOpts); + RunningJob job = JobClient.runJob(conf); + assertTrue("The environment checker job failed.", job.isSuccessful()); + } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java?rev=1076978&r1=1076977&r2=1076978&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java Fri Mar 4 03:27:42 2011 @@ -121,7 +121,7 @@ public class TestReduceFetch extends Tes job.set("mapred.job.reduce.input.buffer.percent", "1.0"); job.setInt("mapred.reduce.parallel.copies", 1); job.setInt("io.sort.mb", 10); - job.set("mapred.child.java.opts", "-Xmx128m"); + job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m"); job.set("mapred.job.shuffle.input.buffer.percent", "0.14"); job.setNumTasksToExecutePerJvm(1); job.set("mapred.job.shuffle.merge.percent", "1.0"); Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=1076978&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Fri Mar 4 03:27:42 2011 @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.text.NumberFormat; +import java.util.Iterator; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.OutputLogFilter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** + * Utility methods used in various Job Control unit tests. + */ +public class MapReduceTestUtil { + public static final Log LOG = + LogFactory.getLog(MapReduceTestUtil.class.getName()); + + static private Random rand = new Random(); + + private static NumberFormat idFormat = NumberFormat.getInstance(); + + static { + idFormat.setMinimumIntegerDigits(4); + idFormat.setGroupingUsed(false); + } + + /** + * Cleans the data from the passed Path in the passed FileSystem. + * + * @param fs FileSystem to delete data from. + * @param dirPath Path to be deleted. + * @throws IOException If an error occurs cleaning the data. + */ + public static void cleanData(FileSystem fs, Path dirPath) + throws IOException { + fs.delete(dirPath, true); + } + + /** + * Generates a string of random digits. + * + * @return A random string. + */ + public static String generateRandomWord() { + return idFormat.format(rand.nextLong()); + } + + /** + * Generates a line of random text. + * + * @return A line of random text. + */ + public static String generateRandomLine() { + long r = rand.nextLong() % 7; + long n = r + 20; + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < n; i++) { + sb.append(generateRandomWord()).append(" "); + } + sb.append("\n"); + return sb.toString(); + } + + /** + * Generates random data consisting of 10000 lines. + * + * @param fs FileSystem to create data in. + * @param dirPath Path to create the data in. + * @throws IOException If an error occurs creating the data. + */ + public static void generateData(FileSystem fs, Path dirPath) + throws IOException { + FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt")); + for (int i = 0; i < 10000; i++) { + String line = generateRandomLine(); + out.write(line.getBytes("UTF-8")); + } + out.close(); + } + + /** + * Creates a simple copy job. + * + * @param conf Configuration object + * @param outdir Output directory. + * @param indirs Comma separated input directories. + * @return Job initialized for a data copy job. + * @throws Exception If an error occurs creating job configuration. + */ + public static Job createCopyJob(Configuration conf, Path outdir, + Path... indirs) throws Exception { + conf.setInt("mapred.map.tasks", 3); + Job theJob = new Job(conf); + theJob.setJobName("DataMoveJob"); + + FileInputFormat.setInputPaths(theJob, indirs); + theJob.setMapperClass(DataCopyMapper.class); + FileOutputFormat.setOutputPath(theJob, outdir); + theJob.setOutputKeyClass(Text.class); + theJob.setOutputValueClass(Text.class); + theJob.setReducerClass(DataCopyReducer.class); + theJob.setNumReduceTasks(1); + return theJob; + } + + /** + * Creates a simple fail job. + * + * @param conf Configuration object + * @param outdir Output directory. + * @param indirs Comma separated input directories. + * @return Job initialized for a simple fail job. + * @throws Exception If an error occurs creating job configuration. + */ + public static Job createFailJob(Configuration conf, Path outdir, + Path... indirs) throws Exception { + + conf.setInt("mapred.map.max.attempts", 2); + Job theJob = new Job(conf); + theJob.setJobName("Fail-Job"); + + FileInputFormat.setInputPaths(theJob, indirs); + theJob.setMapperClass(FailMapper.class); + theJob.setReducerClass(Reducer.class); + theJob.setNumReduceTasks(0); + FileOutputFormat.setOutputPath(theJob, outdir); + theJob.setOutputKeyClass(Text.class); + theJob.setOutputValueClass(Text.class); + return theJob; + } + + /** + * Creates a simple fail job. + * + * @param conf Configuration object + * @param outdir Output directory. + * @param indirs Comma separated input directories. + * @return Job initialized for a simple kill job. + * @throws Exception If an error occurs creating job configuration. + */ + public static Job createKillJob(Configuration conf, Path outdir, + Path... indirs) throws Exception { + + Job theJob = new Job(conf); + theJob.setJobName("Kill-Job"); + + FileInputFormat.setInputPaths(theJob, indirs); + theJob.setMapperClass(KillMapper.class); + theJob.setReducerClass(Reducer.class); + theJob.setNumReduceTasks(0); + FileOutputFormat.setOutputPath(theJob, outdir); + theJob.setOutputKeyClass(Text.class); + theJob.setOutputValueClass(Text.class); + return theJob; + } + + /** + * Simple Mapper and Reducer implementation which copies data it reads in. + */ + public static class DataCopyMapper extends + Mapper { + public void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + context.write(new Text(key.toString()), value); + } + } + + public static class DataCopyReducer extends Reducer { + public void reduce(Text key, Iterator values, Context context) + throws IOException, InterruptedException { + Text dumbKey = new Text(""); + while (values.hasNext()) { + Text data = (Text) values.next(); + context.write(dumbKey, data); + } + } + } + + // Mapper that fails + public static class FailMapper extends + Mapper, Writable, WritableComparable, Writable> { + + public void map(WritableComparable key, Writable value, Context context) + throws IOException { + throw new RuntimeException("failing map"); + } + } + + // Mapper that sleeps for a long time. + // Used for running a job that will be killed + public static class KillMapper extends + Mapper, Writable, WritableComparable, Writable> { + + public void map(WritableComparable key, Writable value, Context context) + throws IOException { + try { + Thread.sleep(1000000); + } catch (InterruptedException e) { + // Do nothing + } + } + } + + public static Job createJob(Configuration conf, Path inDir, Path outDir, + int numInputFiles, int numReds) throws IOException { + String input = "The quick brown fox\n" + "has many silly\n" + + "red fox sox\n"; + return createJob(conf, inDir, outDir, numInputFiles, numReds, input); + } + + public static Job createJob(Configuration conf, Path inDir, Path outDir, + int numInputFiles, int numReds, String input) throws IOException { + Job job = new Job(conf); + FileSystem fs = FileSystem.get(conf); + if (fs.exists(outDir)) { + fs.delete(outDir, true); + } + if (fs.exists(inDir)) { + fs.delete(inDir, true); + } + fs.mkdirs(inDir); + for (int i = 0; i < numInputFiles; ++i) { + DataOutputStream file = fs.create(new Path(inDir, "part-" + i)); + file.writeBytes(input); + file.close(); + } + + FileInputFormat.setInputPaths(job, inDir); + FileOutputFormat.setOutputPath(job, outDir); + job.setNumReduceTasks(numReds); + return job; + } + + public static TaskAttemptContext createDummyMapTaskAttemptContext( + Configuration conf) { + TaskAttemptID tid = new TaskAttemptID("jt", 1, true, 0, 0); + conf.set("mapred.task.id", tid.toString()); + return new TaskAttemptContext(conf, tid); + } + + public static StatusReporter createDummyReporter() { + return new StatusReporter() { + public void setStatus(String s) { + } + public void progress() { + } + public Counter getCounter(Enum name) { + return new Counters().findCounter(name); + } + public Counter getCounter(String group, String name) { + return new Counters().findCounter(group, name); + } + }; + } + + public static String readOutput(Path outDir, Configuration conf) + throws IOException { + FileSystem fs = outDir.getFileSystem(conf); + StringBuffer result = new StringBuffer(); + + Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir, + new OutputLogFilter())); + for (Path outputFile : fileList) { + LOG.info("Path" + ": "+ outputFile); + BufferedReader file = + new BufferedReader(new InputStreamReader(fs.open(outputFile))); + String line = file.readLine(); + while (line != null) { + result.append(line); + result.append("\n"); + line = file.readLine(); + } + file.close(); + } + return result.toString(); + } + +} Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/TestChild.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/TestChild.java?rev=1076978&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/TestChild.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/TestChild.java Fri Mar 4 03:27:42 2011 @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.HadoopTestCase; +import org.apache.hadoop.mapred.JobConf; + +public class TestChild extends HadoopTestCase { + private static String TEST_ROOT_DIR = + new File(System.getProperty("test.build.data","/tmp")) + .toURI().toString().replace(' ', '+'); + private final Path inDir = new Path(TEST_ROOT_DIR, "./wc/input"); + private final Path outDir = new Path(TEST_ROOT_DIR, "./wc/output"); + + private final static String OLD_CONFIGS = "test.old.configs"; + private final static String TASK_OPTS_VAL = "-Xmx200m"; + private final static String MAP_OPTS_VAL = "-Xmx200m"; + private final static String REDUCE_OPTS_VAL = "-Xmx300m"; + + public TestChild() throws IOException { + super(HadoopTestCase.CLUSTER_MR , HadoopTestCase.LOCAL_FS, 2, 2); + } + + static class MyMapper extends Mapper { + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + Configuration conf = context.getConfiguration(); + boolean oldConfigs = conf.getBoolean(OLD_CONFIGS, false); + if (oldConfigs) { + String javaOpts = conf.get(JobConf.MAPRED_TASK_JAVA_OPTS); + assertNotNull(JobConf.MAPRED_TASK_JAVA_OPTS + " is null!", + javaOpts); + assertEquals(JobConf.MAPRED_TASK_JAVA_OPTS + " has value of: " + + javaOpts, + javaOpts, TASK_OPTS_VAL); + } else { + String mapJavaOpts = conf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS); + assertNotNull(JobConf.MAPRED_MAP_TASK_JAVA_OPTS + " is null!", + mapJavaOpts); + assertEquals(JobConf.MAPRED_MAP_TASK_JAVA_OPTS + " has value of: " + + mapJavaOpts, + mapJavaOpts, MAP_OPTS_VAL); + } + } + } + + static class MyReducer + extends Reducer { + + @Override + protected void setup(Context context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + boolean oldConfigs = conf.getBoolean(OLD_CONFIGS, false); + if (oldConfigs) { + String javaOpts = conf.get(JobConf.MAPRED_TASK_JAVA_OPTS); + assertNotNull(JobConf.MAPRED_TASK_JAVA_OPTS + " is null!", + javaOpts); + assertEquals(JobConf.MAPRED_TASK_JAVA_OPTS + " has value of: " + + javaOpts, + javaOpts, TASK_OPTS_VAL); + } else { + String reduceJavaOpts = conf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS); + assertNotNull(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + " is null!", + reduceJavaOpts); + assertEquals(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + " has value of: " + + reduceJavaOpts, + reduceJavaOpts, REDUCE_OPTS_VAL); + } + } + } + + private Job submitAndValidateJob(JobConf conf, int numMaps, int numReds, + boolean oldConfigs) + throws IOException, InterruptedException, ClassNotFoundException { + conf.setBoolean(OLD_CONFIGS, oldConfigs); + if (oldConfigs) { + conf.set(JobConf.MAPRED_TASK_JAVA_OPTS, TASK_OPTS_VAL); + } else { + conf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, MAP_OPTS_VAL); + conf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, REDUCE_OPTS_VAL); + } + + Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, + numMaps, numReds); + job.setMapperClass(MyMapper.class); + job.setReducerClass(MyReducer.class); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + + // Check output directory + FileSystem fs = FileSystem.get(conf); + assertTrue("Job output directory doesn't exit!", fs.exists(outDir)); + FileStatus[] list = fs.listStatus(outDir, new OutputFilter()); + int numPartFiles = numReds == 0 ? numMaps : numReds; + assertTrue("Number of part-files is " + list.length + " and not " + + numPartFiles, list.length == numPartFiles); + return job; + } + + public void testChild() throws Exception { + try { + submitAndValidateJob(createJobConf(), 1, 1, true); + submitAndValidateJob(createJobConf(), 1, 1, false); + } finally { + tearDown(); + } + } + + private static class OutputFilter implements PathFilter { + public boolean accept(Path path) { + return !(path.getName().startsWith("_")); + } + } +}