Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 21538 invoked from network); 29 Apr 2008 22:45:26 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 29 Apr 2008 22:45:26 -0000 Received: (qmail 26271 invoked by uid 500); 29 Apr 2008 22:45:28 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 26226 invoked by uid 500); 29 Apr 2008 22:45:28 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 26215 invoked by uid 99); 29 Apr 2008 22:45:28 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Apr 2008 15:45:28 -0700 X-ASF-Spam-Status: No, hits=-1998.0 required=10.0 tests=ALL_TRUSTED,URIBL_BLACK X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Apr 2008 22:44:31 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id CC41423889F3; Tue, 29 Apr 2008 15:44:49 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r652179 - in /hadoop/core/trunk: ./ conf/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/mapred/ s... Date: Tue, 29 Apr 2008 22:44:49 -0000 To: core-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080429224449.CC41423889F3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Tue Apr 29 15:44:48 2008 New Revision: 652179 URL: http://svn.apache.org/viewvc?rev=652179&view=rev Log: HADOOP-3280. Separate the configuration of the virtual memory size (mapred.child.ulimit) from the jvm heap size, so that 64 bit streaming applications are supported even when running with 32 bit jvms. Contributed by acmurthy. Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/conf/hadoop-default.xml hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java hadoop/core/trunk/src/java/org/apache/hadoop/util/Shell.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=652179&r1=652178&r2=652179&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Tue Apr 29 15:44:48 2008 @@ -205,6 +205,11 @@ HADOOP-3266. Removed HOD changes from CHANGES.txt, as they are now inside src/contrib/hod (Hemanth Yamijala via ddas) + HADOOP-3280. Separate the configuration of the virtual memory size + (mapred.child.ulimit) from the jvm heap size, so that 64 bit + streaming applications are supported even when running with 32 bit + jvms. (acmurthy via omalley) + NEW FEATURES HADOOP-1398. Add HBase in-memory block cache. (tomwhite) Modified: hadoop/core/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=652179&r1=652178&r2=652179&view=diff ============================================================================== --- hadoop/core/trunk/conf/hadoop-default.xml (original) +++ hadoop/core/trunk/conf/hadoop-default.xml Tue Apr 29 15:44:48 2008 @@ -753,8 +753,23 @@ For example, to enable verbose gc logging to a file named for the taskid in /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc - The value of -Xmx will also directly influence the amount of virtual memory - that a streaming/pipes task gets during execution. + + The configuration variable mapred.child.ulimit can be used to control the + maximum virtual memory of the child processes. + + + + + mapred.child.ulimit + + The maximum virtual memory, in KB, of a process launched by the + Map-Reduce framework. This can be used to control both the Mapper/Reducer + tasks and applications using Hadoop Pipes, Hadoop Streaming etc. + By default it is left unspecified to let cluster admins control it via + limits.conf and other such relevant mechanisms. + + Note: mapred.child.ulimit must be greater than or equal to the -Xmx passed to + JavaVM, else the VM might not start. Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=652179&r1=652178&r2=652179&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original) +++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Apr 29 15:44:48 2008 @@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.LineRecordReader.LineReader; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.io.Text; @@ -165,23 +166,11 @@ addEnvironment(childEnv, job_.get("stream.addenvironment")); // add TMPDIR environment variable with the value of java.io.tmpdir envPut(childEnv, "TMPDIR", System.getProperty("java.io.tmpdir")); - if (StreamUtil.isCygwin()) { - sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray()); - } else { - List cmd = new ArrayList(); - for (String arg : argvSplit) { - cmd.add(arg); - } - // set memory limit using ulimit. - ProcessBuilder builder; - List setup = new ArrayList(); - setup.add("ulimit"); - setup.add("-v"); - setup.add(String.valueOf(Runtime.getRuntime().maxMemory() / 1024)); - builder = new ProcessBuilder(wrapCommand(setup, cmd)); - builder.environment().putAll(childEnv.toMap()); - sim = builder.start(); - } + + // Start the process + ProcessBuilder builder = new ProcessBuilder(argvSplit); + builder.environment().putAll(childEnv.toMap()); + sim = builder.start(); clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream())); clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream())); @@ -196,29 +185,6 @@ throw new RuntimeException("configuration exception", e); } } - - /** - * Wrap command with bash -c with setup commands. - * Setup commands such as setting memory limit can be passed which - * will be executed before exec. - * @param setup The setup commands for the execed process. - * @param cmd The command and the arguments that should be run - * @return the modified command that should be run - */ - private List wrapCommand( List setup, - List cmd - ) throws IOException { - List result = new ArrayList(); - result.add("bash"); - result.add("-c"); - StringBuffer mergedCmd = new StringBuffer(); - mergedCmd.append(TaskLog.addCommand(setup, false)); - mergedCmd.append(";"); - mergedCmd.append("exec "); - mergedCmd.append(TaskLog.addCommand(cmd, true)); - result.add(mergedCmd.toString()); - return result; - } void setStreamJobDetails(JobConf job) { jobLog_ = job.get("stream.jobLog_"); Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java?rev=652179&r1=652178&r2=652179&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java (original) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java Tue Apr 29 15:44:48 2008 @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.streaming; - -import java.io.*; - -/** A minimal Java implementation of /bin/cat - * The class also tries to allocate a huge array( 10MB) to test ulimits. - * Look at {@link TestUlimit} - */ -public class CatApp { - public static void main(String args[]) throws IOException{ - char s[] = null; - try { - s = new char[10*1024*1024]; - BufferedReader in = new BufferedReader( - new InputStreamReader(System.in)); - String line; - while ((line = in.readLine()) != null) { - System.out.println(line); - } - } finally { - if (s == null) { - System.exit(-1); - } - } - } -} Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java?rev=652179&r1=652178&r2=652179&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java (original) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java Tue Apr 29 15:44:48 2008 @@ -41,9 +41,6 @@ * is expected to be a failure. */ public class TestUlimit extends TestCase { - private static final Log LOG = - LogFactory.getLog(TestUlimit.class.getName()); - enum RESULT { FAILURE, SUCCESS }; String input = "the dummy input"; Path inputPath = new Path("/testing/in"); Path outputPath = new Path("/testing/out"); @@ -51,6 +48,7 @@ MiniDFSCluster dfs = null; MiniMRCluster mr = null; FileSystem fs = null; + private static String SET_MEMORY_LIMIT = "786432"; // 768MB String[] genArgs(String memLimit) { return new String[] { @@ -59,10 +57,11 @@ "-mapper", map, "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer", "-numReduceTasks", "0", - "-jobconf", "mapred.child.java.opts=" + memLimit, + "-jobconf", "mapred.child.ulimit=" + memLimit, "-jobconf", "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort(), - "-jobconf", "fs.default.name=" + "localhost:" + dfs.getNameNodePort(), + "-jobconf", "fs.default.name=" + "hdfs://localhost:" + + dfs.getNameNodePort(), "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp") }; @@ -87,12 +86,10 @@ mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1); writeInputFile(fs, inputPath); - map = StreamUtil.makeJavaCommand(CatApp.class, new String[]{}); - runProgram("-Xmx2048m", RESULT.SUCCESS); + map = StreamUtil.makeJavaCommand(UlimitApp.class, new String[]{}); + runProgram(SET_MEMORY_LIMIT); FileUtil.fullyDelete(fs, outputPath); assertFalse("output not cleaned up", fs.exists(outputPath)); - // 100MB is not sufficient for launching jvm. This launch should fail. - runProgram("-Xmx0.5m", RESULT.FAILURE); mr.waitUntilIdle(); } catch(IOException e) { fail(e.toString()); @@ -114,24 +111,14 @@ * @param result Expected result * @throws IOException */ - private void runProgram(String memLimit, RESULT result - ) throws IOException { + private void runProgram(String memLimit) throws IOException { boolean mayExit = false; - int ret = 1; - try { - StreamJob job = new StreamJob(genArgs(memLimit), mayExit); - ret = job.go(); - } catch (IOException ioe) { - LOG.warn("Job Failed! " + StringUtils.stringifyException(ioe)); - ioe.printStackTrace(); - } + StreamJob job = new StreamJob(genArgs(memLimit), mayExit); + job.go(); String output = TestMiniMRWithDFS.readOutput(outputPath, mr.createJobConf()); - if (RESULT.SUCCESS.name().equals(result.name())){ - assertEquals("output is wrong", input, output.trim()); - } else { - assertTrue("output is correct", !input.equals(output.trim())); - } + assertEquals("output is wrong", SET_MEMORY_LIMIT, + output.trim()); } public static void main(String[]args) throws Exception Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java?rev=652179&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java Tue Apr 29 15:44:48 2008 @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.*; + +/** + * The UlimitApp discards the input + * and exec's ulimit -v to know the ulimit value. + * And writes the output to the standard out. + * @see {@link TestUlimit} + */ +public class UlimitApp { + public static void main(String args[]) throws IOException{ + BufferedReader in = new BufferedReader( + new InputStreamReader(System.in)); + String line = null; + while ((line = in.readLine()) != null) {} + + Process process = Runtime.getRuntime().exec(new String[]{ + "bash", "-c", "ulimit -v"}); + InputStream is = process.getInputStream(); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + while ((line = br.readLine()) != null) { + System.out.println(line); + } + } +} Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=652179&r1=652178&r2=652179&view=diff ============================================================================== --- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original) +++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml Tue Apr 29 15:44:48 2008 @@ -272,8 +272,7 @@ mapred.child.java.opts -Xmx512M - Larger heap-size for child jvms of maps/reduces. Also controls the amount - of virtual memory that a streaming/pipes task gets. + Larger heap-size for child jvms of maps/reduces. Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=652179&r1=652178&r2=652179&view=diff ============================================================================== --- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original) +++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Tue Apr 29 15:44:48 2008 @@ -1065,6 +1065,9 @@ </property>

+

Users/admins can also specify the maximum virtual memory + of the launched child-task using mapred.child.ulimit.

+

When the job starts, the localized job directory ${mapred.local.dir}/taskTracker/jobcache/$jobid/ has the following directories:

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=652179&r1=652178&r2=652179&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Apr 29 15:44:48 2008 @@ -24,8 +24,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.filecache.*; import org.apache.hadoop.util.*; + import java.io.*; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.List; import java.util.Vector; import java.net.URI; @@ -368,12 +370,22 @@ vargs.add(Integer.toString(address.getPort())); vargs.add(taskid); // pass task identifier + // set memory limit using ulimit if feasible and necessary ... + String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf); + List setup = null; + if (ulimitCmd != null) { + setup = new ArrayList(); + for (String arg : ulimitCmd) { + setup.add(arg); + } + } + // Set up the redirection of the task's stdout and stderr streams File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT); File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR); stdout.getParentFile().mkdirs(); List wrappedCommand = - TaskLog.captureOutAndError(vargs, stdout, stderr, logSize); + TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize); // Run the task as child of the parent TaskTracker process runChild(wrappedCommand, workDir, taskid); Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=652179&r1=652178&r2=652179&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java Tue Apr 29 15:44:48 2008 @@ -84,16 +84,8 @@ File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT); File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR); long logLength = TaskLog.getTaskLogLength(conf); - // set memory limit using ulimit. - if (!WINDOWS) { - List setup = new ArrayList(); - setup.add("ulimit"); - setup.add("-v"); - setup.add(String.valueOf(Runtime.getRuntime().maxMemory() / 1024)); - cmd = TaskLog.captureOutAndError(setup, cmd, stdout, stderr, logLength); - } else { - cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength); - } + cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength); + process = runClient(cmd, env); clientSocket = serverSocket.accept(); handler = new OutputHandler(output, reporter); Modified: hadoop/core/trunk/src/java/org/apache/hadoop/util/Shell.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/Shell.java?rev=652179&r1=652178&r2=652179&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/util/Shell.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/util/Shell.java Tue Apr 29 15:44:48 2008 @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.JobConf; /** * A base class for running a Unix command. @@ -54,6 +55,39 @@ return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"}; } + /** + * Get the Unix command for setting the maximum virtual memory available + * to a given child process. This is only relevant when we are forking a + * process from within the {@link org.apache.hadoop.mapred.Mapper} or the + * {@link org.apache.hadoop.mapred.Reducer} implementations + * e.g. Hadoop Pipes + * or Hadoop Streaming. + * + * It also checks to ensure that we are running on a *nix platform else + * (e.g. in Cygwin/Windows) it returns null. + * @param job job configuration + * @return a String[] with the ulimit command arguments or + * null if we are running on a non *nix platform or + * if the limit is unspecified. + */ + public static String[] getUlimitMemoryCommand(JobConf job) { + // ulimit isn't supported on Windows + if (WINDOWS) { + return null; + } + + // get the memory limit from the JobConf + String ulimit = job.get("mapred.child.ulimit"); + if (ulimit == null) { + return null; + } + + // Parse it to ensure it is legal/sane + int memoryLimit = Integer.valueOf(ulimit); + + return new String[] {"ulimit", "-v", String.valueOf(memoryLimit)}; + } + /** Set to true on Windows platforms */ public static final boolean WINDOWS /* borrowed from Path.WINDOWS */ = System.getProperty("os.name").startsWith("Windows");