Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 90218 invoked from network); 4 Mar 2011 03:26:38 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:26:38 -0000 Received: (qmail 97940 invoked by uid 500); 4 Mar 2011 03:26:38 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 97898 invoked by uid 500); 4 Mar 2011 03:26:38 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 97891 invoked by uid 99); 4 Mar 2011 03:26:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:26:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:26:37 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1AB652388C29; Fri, 4 Mar 2011 03:26:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1076962 - in /hadoop/common/branches/branch-0.20-security-patches: conf/ src/mapred/org/apache/hadoop/mapred/ Date: Fri, 04 Mar 2011 03:26:16 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304032617.1AB652388C29@eris.apache.org> Author: omalley Date: Fri Mar 4 03:26:16 2011 New Revision: 1076962 URL: http://svn.apache.org/viewvc?rev=1076962&view=rev Log: commit 2a423584ebf75afbb08b49c9f8267be6973a9e26 Author: Lee Tucker Date: Thu Jul 30 17:40:48 2009 -0700 Applying patch 2899836.mr740.patch Modified: hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Modified: hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties?rev=1076962&r1=1076961&r2=1076962&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties (original) +++ hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties Fri Mar 4 03:26:16 2011 @@ -3,6 +3,16 @@ hadoop.root.logger=INFO,console hadoop.log.dir=. hadoop.log.file=hadoop.log +# +# Job Summary Appender +# +# Use following logger to send summary to separate file defined by +# hadoop.mapreduce.jobsummary.log.file rolled daily: +# hadoop.mapreduce.jobsummary.logger=INFO,JSA +# +hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger} +hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log + # Define the root logger to the system property "hadoop.root.logger". log4j.rootLogger=${hadoop.root.logger}, EventCounter @@ -92,3 +102,14 @@ log4j.logger.org.jets3t.service.impl.res # Sends counts of logging messages at different severity levels to Hadoop Metrics. # log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter + +# +# Job Summary Appender +# +log4j.appender.JSA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.JSA.File=${hadoop.log.dir}/${hadoop.mapreduce.jobsummary.log.file} +log4j.appender.JSA.layout=org.apache.log4j.PatternLayout +log4j.appender.JSA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n +log4j.appender.JSA.DatePattern=.yyyy-MM-dd +log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${hadoop.mapreduce.jobsummary.logger} +log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1076962&r1=1076961&r2=1076962&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:26:16 2011 @@ -203,6 +203,8 @@ class JobInProgress { OTHER_LOCAL_MAPS, DATA_LOCAL_MAPS, RACK_LOCAL_MAPS, + SLOTS_MILLIS_MAPS, + SLOTS_MILLIS_REDUCES, FALLOW_SLOTS_MILLIS_MAPS, FALLOW_SLOTS_MILLIS_REDUCES } @@ -2129,8 +2131,25 @@ class JobInProgress { } return true; } + /** + * Metering: Occupied Slots * (Finish - Start) + * @param tip {@link TaskInProgress} to be metered which just completed, + * cannot be null + * @param status {@link TaskStatus} of the completed task, cannot be + * null + */ + private void meterTaskAttempt(TaskInProgress tip, TaskStatus status) { + Counter slotCounter = + (tip.isMapTask()) ? Counter.SLOTS_MILLIS_MAPS : + Counter.SLOTS_MILLIS_REDUCES; + jobCounters.incrCounter(slotCounter, + tip.getNumSlotsRequired() * + (status.getFinishTime() - status.getStartTime())); + } + + /** * A taskid assigned to this JobInProgress has reported in successfully. */ public synchronized boolean completedTask(TaskInProgress tip, @@ -2140,6 +2159,9 @@ class JobInProgress { int oldNumAttempts = tip.getActiveTasks().size(); final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation(); + // Metering + meterTaskAttempt(tip, status); + // Sanity check: is the TIP already complete? // It _is_ safe to not decrement running{Map|Reduce}Tasks and // finished{Map|Reduce}Tasks variables here because one and only @@ -2281,6 +2303,12 @@ class JobInProgress { this.finishTime = System.currentTimeMillis(); LOG.info("Job " + this.status.getJobID() + " has completed successfully."); + + // Log the job summary (this should be done prior to logging to + // job-history to ensure job-counters are in-sync + JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false)); + + // Log job-history JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, @@ -2296,6 +2324,9 @@ class JobInProgress { private synchronized void terminateJob(int jobTerminationState) { if ((status.getRunState() == JobStatus.RUNNING) || (status.getRunState() == JobStatus.PREP)) { + // Log the job summary + JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false)); + if (jobTerminationState == JobStatus.FAILED) { this.status = new JobStatus(status.getJobID(), 1.0f, 1.0f, 1.0f, JobStatus.FAILED, @@ -2314,6 +2345,7 @@ class JobInProgress { this.finishedReduceTasks); } garbageCollect(); + jobtracker.getInstrumentation().terminateJob( this.conf, this.status.getJobID()); } @@ -2474,9 +2506,13 @@ class JobInProgress { failReduce(tip); } } + + // Metering + meterTaskAttempt(tip, status); } - // the case when the map was complete but the task tracker went down. + // The case when the map was complete but the task tracker went down. + // However, we don't need to do any metering here... if (wasComplete && !isComplete) { if (tip.isMapTask()) { // Put the task back in the cache. This will help locality for cases @@ -2667,8 +2703,9 @@ class JobInProgress { * from the various tables. */ synchronized void garbageCollect() { - //Cancel task tracker reservation + // Cancel task tracker reservation cancelReservedSlots(); + // Let the JobTracker know that a job is complete jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps()); jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces()); @@ -2853,4 +2890,64 @@ class JobInProgress { void setClusterSize(int clusterSize) { this.clusterSize = clusterSize; } + + static class JobSummary { + static final Log LOG = LogFactory.getLog(JobSummary.class); + + // Escape sequences + static final char EQUALS = '='; + static final char[] charsToEscape = + {StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR}; + + /** + * Log a summary of the job's runtime. + * + * @param job {@link JobInProgress} whose summary is to be logged, cannot + * be null. + * @param cluster {@link ClusterStatus} of the cluster on which the job was + * run, cannot be null + */ + public static void logJobSummary(JobInProgress job, ClusterStatus cluster) { + JobStatus status = job.getStatus(); + JobProfile profile = job.getProfile(); + String user = StringUtils.escapeString(profile.getUser(), + StringUtils.ESCAPE_CHAR, + charsToEscape); + String queue = StringUtils.escapeString(profile.getQueueName(), + StringUtils.ESCAPE_CHAR, + charsToEscape); + Counters jobCounters = job.getJobCounters(); + long mapSlotSeconds = + (jobCounters.getCounter(Counter.SLOTS_MILLIS_MAPS) + + jobCounters.getCounter(Counter.FALLOW_SLOTS_MILLIS_MAPS)) / 1000; + long reduceSlotSeconds = + (jobCounters.getCounter(Counter.SLOTS_MILLIS_REDUCES) + + jobCounters.getCounter(Counter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000; + + LOG.info("jobId=" + job.getJobID() + StringUtils.COMMA + + "submitTime" + EQUALS + job.getStartTime() + StringUtils.COMMA + + "launchTime" + EQUALS + job.getLaunchTime() + StringUtils.COMMA + + "finishTime" + EQUALS + job.getFinishTime() + StringUtils.COMMA + + "numMaps" + EQUALS + job.getMapTasks().length + + StringUtils.COMMA + + "numSlotsPerMap" + EQUALS + job.getNumSlotsPerMap() + + StringUtils.COMMA + + "numReduces" + EQUALS + job.getReduceTasks().length + + StringUtils.COMMA + + "numSlotsPerReduce" + EQUALS + job.getNumSlotsPerReduce() + + StringUtils.COMMA + + "user" + EQUALS + user + StringUtils.COMMA + + "queue" + EQUALS + queue + StringUtils.COMMA + + "status" + EQUALS + + JobStatus.getJobRunState(status.getRunState()) + + StringUtils.COMMA + + "mapSlotSeconds" + EQUALS + mapSlotSeconds + StringUtils.COMMA + + "reduceSlotsSeconds" + EQUALS + reduceSlotSeconds + + StringUtils.COMMA + + "clusterMapCapacity" + EQUALS + cluster.getMaxMapTasks() + + StringUtils.COMMA + + "clusterReduceCapacity" + EQUALS + cluster.getMaxReduceTasks() + ); + } + } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=1076962&r1=1076961&r2=1076962&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobStatus.java Fri Mar 4 03:26:16 2011 @@ -48,6 +48,22 @@ public class JobStatus implements Writab public static final int PREP = 4; public static final int KILLED = 5; + private static final String UNKNOWN = "UNKNOWN"; + private static final String[] runStates = + {UNKNOWN, "RUNNING", "SUCCEEDED", "FAILED", "PREP", "KILLED"}; + + /** + * Helper method to get human-readable state of the job. + * @param state job state + * @return human-readable state of the job + */ + public static String getJobRunState(int state) { + if (state < 1 || state >= runStates.length) { + return UNKNOWN; + } + return runStates[state]; + } + private JobID jobid; private float mapProgress; private float reduceProgress; Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1076962&r1=1076961&r2=1076962&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Mar 4 03:26:16 2011 @@ -1186,4 +1186,8 @@ class TaskInProgress { TreeMap getActiveTasks() { return activeTasks; } + + int getNumSlotsRequired() { + return numSlotsRequired; + } }