hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r652179 - in /hadoop/core/trunk: ./ conf/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/mapred/ s...
Date Tue, 29 Apr 2008 22:44:49 GMT
Author: omalley
Date: Tue Apr 29 15:44:48 2008
New Revision: 652179

URL: http://svn.apache.org/viewvc?rev=652179&view=rev
Log:
HADOOP-3280. Separate the configuration of the virtual memory size
(mapred.child.ulimit) from the jvm heap size, so that 64 bit
streaming applications are supported even when running with 32 bit
jvms. Contributed by acmurthy.

Added:
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
    hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml
    hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
    hadoop/core/trunk/src/java/org/apache/hadoop/util/Shell.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=652179&r1=652178&r2=652179&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Apr 29 15:44:48 2008
@@ -205,6 +205,11 @@
     HADOOP-3266. Removed HOD changes from CHANGES.txt, as they are now inside 
     src/contrib/hod  (Hemanth Yamijala via ddas)
 
+    HADOOP-3280. Separate the configuration of the virtual memory size
+    (mapred.child.ulimit) from the jvm heap size, so that 64 bit
+    streaming applications are supported even when running with 32 bit
+    jvms. (acmurthy via omalley)
+
   NEW FEATURES
 
     HADOOP-1398.  Add HBase in-memory block cache.  (tomwhite)

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=652179&r1=652178&r2=652179&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Tue Apr 29 15:44:48 2008
@@ -753,8 +753,23 @@
   For example, to enable verbose gc logging to a file named for the taskid in
   /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
         -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
-  The value of -Xmx will also directly influence the amount of virtual memory
-  that a streaming/pipes task gets during execution.
+  
+  The configuration variable mapred.child.ulimit can be used to control the
+  maximum virtual memory of the child processes. 
+  </description>
+</property>
+
+<property>
+  <name>mapred.child.ulimit</name>
+  <value></value>
+  <description>The maximum virtual memory, in KB, of a process launched by the 
+  Map-Reduce framework. This can be used to control both the Mapper/Reducer 
+  tasks and applications using Hadoop Pipes, Hadoop Streaming etc. 
+  By default it is left unspecified to let cluster admins control it via 
+  limits.conf and other such relevant mechanisms.
+  
+  Note: mapred.child.ulimit must be greater than or equal to the -Xmx passed to
+  JavaVM, else the VM might not start. 
   </description>
 </property>
 

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=652179&r1=652178&r2=652179&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
(original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
Tue Apr 29 15:44:48 2008
@@ -37,6 +37,7 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.Text;
@@ -165,23 +166,11 @@
       addEnvironment(childEnv, job_.get("stream.addenvironment"));
       // add TMPDIR environment variable with the value of java.io.tmpdir
       envPut(childEnv, "TMPDIR", System.getProperty("java.io.tmpdir"));
-      if (StreamUtil.isCygwin()) {
-        sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
-      } else {
-        List<String> cmd = new ArrayList<String>();
-        for (String arg : argvSplit) {
-          cmd.add(arg);
-        }
-        // set memory limit using ulimit.
-        ProcessBuilder builder;
-        List<String> setup = new ArrayList<String>();
-        setup.add("ulimit");
-        setup.add("-v"); 
-        setup.add(String.valueOf(Runtime.getRuntime().maxMemory() / 1024));
-        builder = new ProcessBuilder(wrapCommand(setup, cmd));
-        builder.environment().putAll(childEnv.toMap());
-        sim = builder.start();
-      }
+
+      // Start the process
+      ProcessBuilder builder = new ProcessBuilder(argvSplit);
+      builder.environment().putAll(childEnv.toMap());
+      sim = builder.start();
 
       clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
       clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
@@ -196,29 +185,6 @@
       throw new RuntimeException("configuration exception", e);
     }
   }
-
-  /**
-   * Wrap command with bash -c with setup commands.
-   * Setup commands such as setting memory limit can be passed which 
-   * will be executed before exec.
-   * @param setup The setup commands for the execed process.
-   * @param cmd The command and the arguments that should be run
-   * @return the modified command that should be run
-   */
-  private List<String> wrapCommand( List<String> setup,
-                                    List<String> cmd 
-                                   ) throws IOException {
-    List<String> result = new ArrayList<String>();
-    result.add("bash");
-    result.add("-c");
-    StringBuffer mergedCmd = new StringBuffer();
-    mergedCmd.append(TaskLog.addCommand(setup, false));
-    mergedCmd.append(";");
-    mergedCmd.append("exec ");
-    mergedCmd.append(TaskLog.addCommand(cmd, true));
-    result.add(mergedCmd.toString());
-    return result;
-  }
   
   void setStreamJobDetails(JobConf job) {
     jobLog_ = job.get("stream.jobLog_");

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java?rev=652179&r1=652178&r2=652179&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java
(original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java
Tue Apr 29 15:44:48 2008
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.*;
-
-/** A minimal Java implementation of /bin/cat
- *  The class also tries to allocate a huge array( 10MB) to test ulimits.
- *  Look at {@link TestUlimit}
- */
-public class CatApp {
-  public static void main(String args[]) throws IOException{
-    char s[] = null;
-    try {
-      s = new char[10*1024*1024];
-      BufferedReader in = new BufferedReader(
-                              new InputStreamReader(System.in));
-      String line;
-      while ((line = in.readLine()) != null) {
-        System.out.println(line);
-      }
-    } finally {
-      if (s == null) {
-        System.exit(-1);
-      }
-    }
-  }
-}

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java?rev=652179&r1=652178&r2=652179&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
(original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
Tue Apr 29 15:44:48 2008
@@ -41,9 +41,6 @@
  * is expected to be a failure.  
  */
 public class TestUlimit extends TestCase {
-  private static final Log LOG =
-         LogFactory.getLog(TestUlimit.class.getName());
-  enum RESULT { FAILURE, SUCCESS };
   String input = "the dummy input";
   Path inputPath = new Path("/testing/in");
   Path outputPath = new Path("/testing/out");
@@ -51,6 +48,7 @@
   MiniDFSCluster dfs = null;
   MiniMRCluster mr = null;
   FileSystem fs = null;
+  private static String SET_MEMORY_LIMIT = "786432"; // 768MB
 
   String[] genArgs(String memLimit) {
     return new String[] {
@@ -59,10 +57,11 @@
       "-mapper", map,
       "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer",
       "-numReduceTasks", "0",
-      "-jobconf", "mapred.child.java.opts=" + memLimit,
+      "-jobconf", "mapred.child.ulimit=" + memLimit,
       "-jobconf", "mapred.job.tracker=" + "localhost:" +
                                            mr.getJobTrackerPort(),
-      "-jobconf", "fs.default.name=" + "localhost:" + dfs.getNameNodePort(),
+      "-jobconf", "fs.default.name=" + "hdfs://localhost:" 
+                   + dfs.getNameNodePort(),
       "-jobconf", "stream.tmpdir=" + 
                    System.getProperty("test.build.data","/tmp")
     };
@@ -87,12 +86,10 @@
       
       mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1);
       writeInputFile(fs, inputPath);
-      map = StreamUtil.makeJavaCommand(CatApp.class, new String[]{});  
-      runProgram("-Xmx2048m", RESULT.SUCCESS);
+      map = StreamUtil.makeJavaCommand(UlimitApp.class, new String[]{});  
+      runProgram(SET_MEMORY_LIMIT);
       FileUtil.fullyDelete(fs, outputPath);
       assertFalse("output not cleaned up", fs.exists(outputPath));
-      // 100MB is not sufficient for launching jvm. This launch should fail.
-      runProgram("-Xmx0.5m", RESULT.FAILURE);
       mr.waitUntilIdle();
     } catch(IOException e) {
       fail(e.toString());
@@ -114,24 +111,14 @@
    * @param result Expected result
    * @throws IOException
    */
-  private void runProgram(String memLimit, RESULT result
-                          ) throws IOException {
+  private void runProgram(String memLimit) throws IOException {
     boolean mayExit = false;
-    int ret = 1;
-    try {
-      StreamJob job = new StreamJob(genArgs(memLimit), mayExit);
-      ret = job.go();
-    } catch (IOException ioe) {
-      LOG.warn("Job Failed! " + StringUtils.stringifyException(ioe));
-      ioe.printStackTrace();
-    }
+    StreamJob job = new StreamJob(genArgs(memLimit), mayExit);
+    job.go();
     String output = TestMiniMRWithDFS.readOutput(outputPath,
                                         mr.createJobConf());
-    if (RESULT.SUCCESS.name().equals(result.name())){
-      assertEquals("output is wrong", input, output.trim());
-    } else {
-      assertTrue("output is correct", !input.equals(output.trim()));
-    }
+    assertEquals("output is wrong", SET_MEMORY_LIMIT,
+                                    output.trim());
   }
   
   public static void main(String[]args) throws Exception

Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java?rev=652179&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java
(added)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java
Tue Apr 29 15:44:48 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+/** 
+ *  The UlimitApp discards the input
+ *  and exec's ulimit -v to know the ulimit value.
+ *  And writes the output to the standard out. 
+ *  @see {@link TestUlimit}
+ */
+public class UlimitApp {
+  public static void main(String args[]) throws IOException{
+    BufferedReader in = new BufferedReader(
+                            new InputStreamReader(System.in));
+    String line = null;
+    while ((line = in.readLine()) != null) {}
+
+    Process process = Runtime.getRuntime().exec(new String[]{
+                                 "bash", "-c", "ulimit -v"});
+    InputStream is = process.getInputStream();
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
+    while ((line = br.readLine()) != null) {
+      System.out.println(line);
+    }
+  }
+}

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=652179&r1=652178&r2=652179&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml Tue Apr 29
15:44:48 2008
@@ -272,8 +272,7 @@
                     <td>mapred.child.java.opts</td>
                     <td>-Xmx512M</td>
                     <td>
-                      Larger heap-size for child jvms of maps/reduces. Also controls the
amount 
-                      of virtual memory that a streaming/pipes task gets.
+                      Larger heap-size for child jvms of maps/reduces. 
                     </td>
                   </tr>
                   <tr>

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=652179&r1=652178&r2=652179&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Tue Apr
29 15:44:48 2008
@@ -1065,6 +1065,9 @@
           <code>&lt;/property&gt;</code>
         </p>
         
+        <p>Users/admins can also specify the maximum virtual memory 
+        of the launched child-task using <code>mapred.child.ulimit</code>.</p>
+        
         <p>When the job starts, the localized job directory
         <code> ${mapred.local.dir}/taskTracker/jobcache/$jobid/</code>
         has the following directories: </p>

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=652179&r1=652178&r2=652179&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Apr 29 15:44:48
2008
@@ -24,8 +24,10 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
+
 import java.io.*;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Vector;
 import java.net.URI;
@@ -368,12 +370,22 @@
       vargs.add(Integer.toString(address.getPort())); 
       vargs.add(taskid);                      // pass task identifier
 
+      // set memory limit using ulimit if feasible and necessary ...
+      String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
+      List<String> setup = null;
+      if (ulimitCmd != null) {
+        setup = new ArrayList<String>();
+        for (String arg : ulimitCmd) {
+          setup.add(arg);
+        }
+      }
+
       // Set up the redirection of the task's stdout and stderr streams
       File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
       File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
       stdout.getParentFile().mkdirs();
       List<String> wrappedCommand = 
-        TaskLog.captureOutAndError(vargs, stdout, stderr, logSize);
+        TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize);
       
       // Run the task as child of the parent TaskTracker process
       runChild(wrappedCommand, workDir, taskid);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=652179&r1=652178&r2=652179&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java Tue Apr 29
15:44:48 2008
@@ -84,16 +84,8 @@
     File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
     File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
     long logLength = TaskLog.getTaskLogLength(conf);
-    // set memory limit using ulimit.
-    if (!WINDOWS) {
-      List<String> setup = new ArrayList<String>();
-      setup.add("ulimit");
-      setup.add("-v"); 
-      setup.add(String.valueOf(Runtime.getRuntime().maxMemory() / 1024));
-      cmd = TaskLog.captureOutAndError(setup, cmd, stdout, stderr, logLength);
-    } else {
-      cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
-    }
+    cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
+
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();
     handler = new OutputHandler<K2, V2>(output, reporter);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/util/Shell.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/Shell.java?rev=652179&r1=652178&r2=652179&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/Shell.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/Shell.java Tue Apr 29 15:44:48 2008
@@ -25,6 +25,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
 
 /** 
  * A base class for running a Unix command.
@@ -54,6 +55,39 @@
     return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"};
   }
 
+  /** 
+   * Get the Unix command for setting the maximum virtual memory available
+   * to a given child process. This is only relevant when we are forking a
+   * process from within the {@link org.apache.hadoop.mapred.Mapper} or the 
+   * {@link org.apache.hadoop.mapred.Reducer} implementations 
+   * e.g. <a href="{@docRoot}/org/apache/hadoop/mapred/pipes/package-summary.html">Hadoop
Pipes</a> 
+   * or <a href="{@docRoot}/org/apache/hadoop/streaming/package-summary.html">Hadoop
Streaming</a>.
+   * 
+   * It also checks to ensure that we are running on a *nix platform else 
+   * (e.g. in Cygwin/Windows) it returns <code>null</code>.
+   * @param job job configuration
+   * @return a <code>String[]</code> with the ulimit command arguments or 
+   *         <code>null</code> if we are running on a non *nix platform or
+   *         if the limit is unspecified.
+   */
+  public static String[] getUlimitMemoryCommand(JobConf job) {
+    // ulimit isn't supported on Windows
+    if (WINDOWS) {
+      return null;
+    }
+    
+    // get the memory limit from the JobConf
+    String ulimit = job.get("mapred.child.ulimit");
+    if (ulimit == null) {
+      return null;
+    }
+    
+    // Parse it to ensure it is legal/sane
+    int memoryLimit = Integer.valueOf(ulimit);
+
+    return new String[] {"ulimit", "-v", String.valueOf(memoryLimit)};
+  }
+  
   /** Set to true on Windows platforms */
   public static final boolean WINDOWS /* borrowed from Path.WINDOWS */
                 = System.getProperty("os.name").startsWith("Windows");



Mime
View raw message