Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 467 invoked from network); 4 Mar 2011 04:27:44 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 04:27:44 -0000 Received: (qmail 10631 invoked by uid 500); 4 Mar 2011 04:27:44 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 10600 invoked by uid 500); 4 Mar 2011 04:27:44 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 10591 invoked by uid 99); 4 Mar 2011 04:27:44 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:27:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:27:39 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5854E23888CD; Fri, 4 Mar 2011 04:27:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077542 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/ mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/ Date: Fri, 04 Mar 2011 04:27:18 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304042718.5854E23888CD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Fri Mar 4 04:27:17 2011 New Revision: 1077542 URL: http://svn.apache.org/viewvc?rev=1077542&view=rev Log: commit 260b70d6f19f2f97b6cd91572b0961c5544f9397 Author: Mahadev Konar Date: Tue Jul 13 20:49:04 2010 +0000 MAPREDUCE-1521. Protection against incorrectly configured reduces. From https://issues.apache.org/jira/secure/attachment/12449129/MAPREDUCE-1521-0.20-yahoo.patch (mahadev) +++ b/YAHOO-CHANGES.txt + MAPREDUCE-1521. Protection against incorrectly configured reduces + (mahadev) + Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077542&r1=1077541&r2=1077542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar 4 04:27:17 2011 @@ -365,6 +365,14 @@ + mapreduce.reduce.input.limit + -1 + The limit on the input size of the reduce. If the estimated + input size of the reduce is greater than this value, job is failed. A + value of -1 means that there is no limit set. + + + mapred.job.tracker.retiredjobs.cache.size 1000 The number of retired job status to keep in the cache. Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077542&r1=1077541&r2=1077542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar 4 04:27:17 2011 @@ -397,6 +397,15 @@ public class JobClient extends Configure public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException { return jobSubmitClient.getTaskDiagnostics(id); } + + @Override + public String getFailureInfo() throws IOException { + //assuming that this is just being called after + //we realized the job failed. SO we try avoiding + //a rpc by not calling updateStatus + ensureFreshStatus(); + return status.getFailureInfo(); + } } private JobSubmissionProtocol jobSubmitClient; @@ -1192,6 +1201,7 @@ public class JobClient extends Configure RunningJob rj = jc.submitJob(job); try { if (!jc.monitorAndPrintJob(job, rj)) { + LOG.info("Job Failed: " + rj.getFailureInfo()); throw new IOException("Job failed!"); } } catch (InterruptedException ie) { Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077542&r1=1077541&r2=1077542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 04:27:17 2011 @@ -112,7 +112,8 @@ public class JobInProgress { int finishedReduceTasks = 0; int failedMapTasks = 0; int failedReduceTasks = 0; - + private static long DEFAULT_REDUCE_INPUT_LIMIT = -1L; + long reduce_input_limit = -1L; private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f; int completedMapsForReduceSlowstart = 0; @@ -417,6 +418,12 @@ public class JobInProgress { this.jobMetrics.setTag("jobId", jobId.toString()); hasSpeculativeMaps = conf.getMapSpeculativeExecution(); hasSpeculativeReduces = conf.getReduceSpeculativeExecution(); + // a limit on the input size of the reduce. + // we check to see if the estimated input size of + // of each reduce is less than this value. If not + // we fail the job. A value of -1 just means there is no + // limit set. + reduce_input_limit = -1L; this.maxLevel = jobtracker.getNumTaskCacheLevels(); this.anyCacheLevel = this.maxLevel+1; this.nonLocalMaps = new LinkedList(); @@ -425,7 +432,8 @@ public class JobInProgress { this.nonRunningReduces = new LinkedList(); this.runningReduces = new LinkedHashSet(); this.resourceEstimator = new ResourceEstimator(this); - + this.reduce_input_limit = conf.getLong("mapreduce.reduce.input.limit", + DEFAULT_REDUCE_INPUT_LIMIT); // register job's tokens for renewal DelegationTokenRenewal.registerDelegationTokensForRenewal( jobInfo.getJobID(), ts, jobtracker.getConf()); @@ -1539,6 +1547,25 @@ public class JobInProgress { return null; } + /** check to see if we have any misbehaving reducers. If the expected output + * for reducers is huge then we just fail the job and error out. The estimated + * size is divided by 2 since the resource estimator returns the amount of disk + * space the that the reduce will use (which is 2 times the input, space for merge + reduce + * input). **/ + long estimatedReduceInputSize = resourceEstimator.getEstimatedReduceInputSize()/2; + if (((estimatedReduceInputSize) > + reduce_input_limit) && (reduce_input_limit > 0L)) { + // make sure jobtracker lock is held + LOG.info("Exceeded limit for reduce input size: Estimated:" + + estimatedReduceInputSize + " Limit: " + + reduce_input_limit + " Failing Job " + jobId); + status.setFailureInfo("Job Exceeded Reduce Input limit " + + " Limit: " + reduce_input_limit + + " Estimated: " + estimatedReduceInputSize); + jobtracker.failJob(this); + return null; + } + // Ensure we have sufficient map outputs ready to shuffle before // scheduling reduces if (!scheduleReduces()) { Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=1077542&r1=1077541&r2=1077542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java Fri Mar 4 04:27:17 2011 @@ -84,7 +84,8 @@ public class JobStatus implements Writab private String user; private JobPriority priority; private String schedulingInfo="NA"; - + private String failureInfo = "NA"; + /** */ public JobStatus() { @@ -278,8 +279,24 @@ public class JobStatus implements Writab public synchronized String getSchedulingInfo() { return schedulingInfo; } + + /** + * gets any available info on the reason of failure of the job. + * @return diagnostic information on why a job might have failed. + */ + public synchronized String getFailureInfo() { + return this.failureInfo; + } /** + * set the reason for failuire of this job + * @param failureInfo the reason for failure of this job. + */ + public synchronized void setFailureInfo(String failureInfo) { + this.failureInfo = failureInfo; + } + + /** * Used to set the scheduling information associated to a particular Job. * * @param schedulingInfo Scheduling information of the job @@ -343,6 +360,7 @@ public class JobStatus implements Writab WritableUtils.writeEnum(out, entry.getKey()); entry.getValue().write(out); } + Text.writeString(out, failureInfo); } public synchronized void readFields(DataInput in) throws IOException { @@ -365,6 +383,7 @@ public class JobStatus implements Writab acl.readFields(in); this.jobACLs.put(aclType, acl); } + this.failureInfo = Text.readString(in); } // A utility to convert new job runstates to the old ones. Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=1077542&r1=1077541&r2=1077542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Mar 4 04:27:17 2011 @@ -80,8 +80,10 @@ interface JobSubmissionProtocol extends * Version 26: Added the method getQueueAdmins(queueName) as part of * MAPREDUCE-1664. * Version 27: Added queue state to JobQueueInfo as part of HADOOP-5913. + * Version 28: Added a new field to JobStatus to provide user readable + * information on job failure. MAPREDUCE-1521. */ - public static final long versionID = 27L; + public static final long versionID = 28L; /** * Allocate a name for the job. Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077542&r1=1077541&r2=1077542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 04:27:17 2011 @@ -2049,7 +2049,6 @@ public class JobTracker implements MRCon MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100); MAX_BLACKLISTS_PER_TRACKER = conf.getInt("mapred.max.tracker.blacklists", 4); - NUM_HEARTBEATS_IN_SECOND = conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND); if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) { @@ -3947,9 +3946,11 @@ public class JobTracker implements MRCon StringUtils.stringifyException(kie)); killJob(job); } catch (Throwable t) { + String failureInfo = "Job initialization failed:\n" + + StringUtils.stringifyException(t); // If the job initialization is failed, job state will be FAILED - LOG.error("Job initialization failed:\n" + - StringUtils.stringifyException(t)); + LOG.error(failureInfo); + job.getStatus().setFailureInfo(failureInfo); failJob(job); } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java?rev=1077542&r1=1077541&r2=1077542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/RunningJob.java Fri Mar 4 04:27:17 2011 @@ -183,6 +183,13 @@ public interface RunningJob { public Counters getCounters() throws IOException; /** + * Get failure info for the job. + * @return the failure info for the job. + * @throws IOException + */ + public String getFailureInfo() throws IOException; + + /** * Gets the diagnostic messages for a given task attempt. * @param taskid * @return the list of diagnostic messages for the task Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java?rev=1077542&r1=1077541&r2=1077542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLimits.java Fri Mar 4 04:27:17 2011 @@ -63,6 +63,44 @@ public class TestTaskLimits extends Test } /** + * check with a reduce limit of 10 bytes for input to reduce. + * This should fail since input to reduce estimate is greater + * than that! + * @return true on failing the job, false + * @throws IOException + */ + private boolean runReduceLimitCheck() throws IOException { + MiniDFSCluster dfs = null; + MiniMRCluster mr = null; + FileSystem fileSys = null; + boolean success = false; + try { + final int taskTrackers = 2; + + Configuration conf = new Configuration(); + conf.setInt("mapred.jobtracker.maxtasks.per.job", -1); + dfs = new MiniDFSCluster(conf, 4, true, null); + fileSys = dfs.getFileSystem(); + JobConf jconf = new JobConf(conf); + mr = new MiniMRCluster(0, 0, taskTrackers, fileSys.getUri().toString(), 1, + null, null, null, jconf); + + JobConf jc = mr.createJobConf(); + jc.setLong("mapreduce.reduce.input.limit", 10L); + try { + runPI(mr, jc); + success = false; + } catch (IOException e) { + success = true; + } + } finally { + if (dfs != null) { dfs.shutdown(); } + if (mr != null) { mr.shutdown(); } + } + return success; + } + + /** * Run the pi test with a specifix value of * mapred.jobtracker.maxtasks.per.job. Returns true if the job succeeded. */ @@ -73,7 +111,7 @@ public class TestTaskLimits extends Test boolean success = false; try { final int taskTrackers = 2; - + Configuration conf = new Configuration(); conf.setInt("mapred.jobtracker.maxtasks.per.job", maxTasks); dfs = new MiniDFSCluster(conf, 4, true, null); @@ -114,5 +152,8 @@ public class TestTaskLimits extends Test status = runOneTest(-1); assertTrue(status == true); System.out.println("Job 3 succeeded as expected."); + status = runReduceLimitCheck(); + assertTrue(status == true); + System.out.println("Success: Reduce limit as expected"); } }