Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 9742 invoked from network); 23 Feb 2007 20:13:26 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 23 Feb 2007 20:13:26 -0000 Received: (qmail 88991 invoked by uid 500); 23 Feb 2007 20:13:35 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 88967 invoked by uid 500); 23 Feb 2007 20:13:34 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 88957 invoked by uid 99); 23 Feb 2007 20:13:34 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Feb 2007 12:13:34 -0800 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Feb 2007 12:13:23 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 7D1BB1A981A; Fri, 23 Feb 2007 12:13:03 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r511075 - in /lucene/hadoop/trunk: ./ src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/metrics/ src/test/org/apache/hadoop/fs/ src/test/org/apache/hadoop/mapred/ src/test/org/apache/hado... Date: Fri, 23 Feb 2007 20:13:02 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070223201303.7D1BB1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Fri Feb 23 12:13:00 2007 New Revision: 511075 URL: http://svn.apache.org/viewvc?view=rev&rev=511075 Log: HADOOP-492. Add counters. Contributed by David Bowen. Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java lucene/hadoop/trunk/src/webapps/job/taskstats.jsp Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Feb 23 12:13:00 2007 @@ -122,6 +122,13 @@ 36. HADOOP-1029. Fix streaming's input format to correctly seek to the start of splits. (Arun C Murthy via cutting) +37. HADOOP-492. Add per-job and per-task counters. These are + incremented via the Reporter interface and available through the + web ui and the JobClient API. The mapreduce framework maintains a + few basic counters, and applications may add their own. Counters + are also passed to the metrics system. + (David Bowen via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java (original) +++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java Fri Feb 23 12:13:00 2007 @@ -47,6 +47,9 @@ */ public class WordCount { + // These are just for testing counters + private enum Counter { WORDS, VALUES } + /** * Counts the words in each line. * For each line of input, break the line into words and emit them as @@ -65,6 +68,7 @@ while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); + reporter.incrCounter(Counter.WORDS, 1); } } } @@ -80,6 +84,7 @@ int sum = 0; while (values.hasNext()) { sum += ((IntWritable) values.next()).get(); + reporter.incrCounter(Counter.VALUES, 1); } output.collect(key, new IntWritable(sum)); } Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java?view=auto&rev=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java Fri Feb 23 12:13:00 2007 @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.Writable; + +/** + * A set of named counters. + */ +public class Counters implements Writable { + + private Map counters = new TreeMap(); + + /** + * Returns the names of all counters. + * @return Set of counter names. + */ + public synchronized Set getCounterNames() { + return counters.keySet(); + } + + /** + * Returns the value of the named counter, or 0 if counter doesn't exist. + * @param name name of a counter + * @return value of the counter + */ + public synchronized long getCounter(String name) { + Long result = counters.get(name); + return (result == null ? 0L : result); + } + + /** + * Increments the named counter by the specified amount, creating it if + * it didn't already exist. + * @param name of a counter + * @param amount amount by which counter is to be incremented + */ + public synchronized void incrCounter(String name, long amount) { + counters.put(name, amount + getCounter(name)); + } + + /** + * Increments multiple counters by their amounts in another Counters + * instance. + * @param other the other Counters instance + */ + public synchronized void incrAllCounters(Counters other) { + for (String name : other.getCounterNames()) { + incrCounter(name, other.getCounter(name)); + } + } + + /** + * Returns the number of counters. + */ + public synchronized int size() { + return counters.size(); + } + + // Writable + + public synchronized void write(DataOutput out) throws IOException { + out.writeInt(counters.size()); + for (String name : counters.keySet()) { + UTF8.writeString(out, name); + out.writeLong(counters.get(name)); + } + } + + public synchronized void readFields(DataInput in) throws IOException { + int n = in.readInt(); + while (n-- > 0) { + String name = UTF8.readString(in); + long value = in.readLong(); + counters.put(name, value); + } + } + + /** + * Logs the current counter values. + * @param log The log to use. + */ + public void log(Log log) { + log.info("Counters: " + getCounterNames().size()); + for (String counterName : getCounterNames()) { + log.info(" " + counterName + "=" + getCounter(counterName)); + } + } + +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Fri Feb 23 12:13:00 2007 @@ -25,8 +25,10 @@ import java.util.ArrayList; import java.util.List; -import org.apache.commons.logging.*; -import org.apache.hadoop.fs.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -58,7 +60,9 @@ } public void progress(String taskid, float progress, String state, - TaskStatus.Phase phase) throws IOException { + TaskStatus.Phase phase, Counters counters) + throws IOException + { StringBuffer buf = new StringBuffer("Task "); buf.append(taskid); buf.append(" making progress to "); @@ -69,6 +73,7 @@ } LOG.info(buf.toString()); // ignore phase + // ignore counters } public void reportDiagnosticInfo(String taskid, String trace) throws IOException { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Fri Feb 23 12:13:00 2007 @@ -175,6 +175,10 @@ "map() completion: " + status.mapProgress() + "\n" + "reduce() completion: " + status.reduceProgress(); } + + public Counters getCounters() { + return status.getCounters(); + } } JobSubmissionProtocol jobSubmitClient; @@ -597,6 +601,7 @@ throw new IOException("Job failed!"); } LOG.info("Job complete: " + jobId); + running.getCounters().log(LOG); error = false; } finally { if (error && (running != null)) { @@ -604,6 +609,7 @@ } jc.close(); } + } private static void displayTaskLogs(String taskId, String baseUrl) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Feb 23 12:13:00 2007 @@ -17,18 +17,27 @@ */ package org.apache.hadoop.mapred; -import org.apache.commons.logging.*; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableUtils; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobHistory.Values; import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics; -import org.apache.hadoop.mapred.JobHistory.Values ; -import java.io.*; -import java.net.*; -import java.util.*; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; /////////////////////////////////////////////////////// // JobInProgress maintains all the info for keeping @@ -74,6 +83,9 @@ private LocalFileSystem localFs; private String uniqueString; + + private Counters counters = new Counters(); + private MetricsRecord jobMetrics; /** * Create a JobInProgress with the given job file, plus a handle @@ -113,6 +125,10 @@ JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), System.currentTimeMillis(), jobFile); + MetricsContext metricsContext = MetricsUtil.getContext("mapred"); + this.jobMetrics = metricsContext.createRecord("job"); + this.jobMetrics.setTag("user", conf.getUser()); + this.jobMetrics.setTag("jobName", conf.getJobName()); } /** @@ -338,6 +354,28 @@ (progressDelta / reduces.length))); } } + + // + // Update counters by summing over all tasks in progress + // + Counters newCounters = new Counters(); + for (TaskInProgress mapTask : maps) { + newCounters.incrAllCounters(mapTask.getCounters()); + } + for (TaskInProgress reduceTask : reduces) { + newCounters.incrAllCounters(reduceTask.getCounters()); + } + this.status.setCounters(newCounters); + + // + // Send counter data to the metrics package. + // + for (String counter : newCounters.getCounterNames()) { + long value = newCounters.getCounter(counter); + jobMetrics.setTag("counter", counter); + jobMetrics.setMetric("value", (float) value); + jobMetrics.update(); + } } ///////////////////////////////////////////////////// @@ -787,7 +825,8 @@ TaskStatus.State.FAILED, reason, reason, - trackerName, phase); + trackerName, phase, + tip.getCounters()); updateTaskStatus(tip, status, metrics); JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), @@ -831,8 +870,8 @@ } /** - * Return the TaskInProgress that matches the tipid. - */ + * Return the TaskInProgress that matches the tipid. + */ public TaskInProgress getTaskInProgress(String tipid){ for (int i = 0; i < maps.length; i++) { if (tipid.equals(maps[i].getTIPId())){ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Fri Feb 23 12:13:00 2007 @@ -17,9 +17,14 @@ */ package org.apache.hadoop.mapred; -import org.apache.hadoop.io.*; - -import java.io.*; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; /************************************************** * Describes the current status of a job. This is @@ -49,6 +54,7 @@ private int runState; private long startTime; private String user; + private Counters counters = new Counters(); /** */ public JobStatus() { @@ -133,6 +139,16 @@ * @return the username of the job */ public String getUsername() { return this.user;}; + + /** + * @param counters Counters for the job. + */ + void setCounters(Counters counters) { this.counters = counters; } + /** + * @return the counters for the job + */ + public Counters getCounters() { return counters; } + /////////////////////////////////////// // Writable /////////////////////////////////////// @@ -143,6 +159,7 @@ out.writeInt(runState); out.writeLong(startTime); UTF8.writeString(out, user); + counters.write(out); } public void readFields(DataInput in) throws IOException { this.jobid = UTF8.readString(in); @@ -151,5 +168,6 @@ this.runState = in.readInt(); this.startTime = in.readLong(); this.user = UTF8.readString(in); + counters.readFields(in); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Feb 23 12:13:00 2007 @@ -18,22 +18,36 @@ package org.apache.hadoop.mapred; -import org.apache.commons.logging.*; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.ipc.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.util.*; - -import java.io.*; -import java.net.*; +import java.io.IOException; +import java.net.InetSocketAddress; import java.text.NumberFormat; -import java.util.*; - +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.Updater; +import org.apache.hadoop.util.HostsFileReader; +import org.apache.hadoop.util.StringUtils; /******************************************************* * JobTracker is the central location for submitting and @@ -1485,18 +1499,36 @@ } /** Get all the TaskStatuses from the tipid. */ - TaskStatus[] getTaskStatuses(String jobid, String tipid){ - JobInProgress job = (JobInProgress) jobs.get(jobid); - if (job == null){ - return new TaskStatus[0]; - } - TaskInProgress tip = (TaskInProgress) job.getTaskInProgress(tipid); - if (tip == null){ - return new TaskStatus[0]; - } - return tip.getTaskStatuses(); + TaskStatus[] getTaskStatuses(String jobid, String tipid) { + TaskInProgress tip = getTip(jobid, tipid); + return (tip == null ? new TaskStatus[0] + : tip.getTaskStatuses()); } + /** Returns the TaskStatus for a particular taskid. */ + TaskStatus getTaskStatus(String jobid, String tipid, String taskid) { + TaskInProgress tip = getTip(jobid, tipid); + return (tip == null ? null + : tip.getTaskStatus(taskid)); + } + + /** + * Returns the counters for the specified task in progress. + */ + Counters getTipCounters(String jobid, String tipid) { + TaskInProgress tip = getTip(jobid, tipid); + return (tip == null ? null : tip.getCounters()); + } + + /** + * Returns specified TaskInProgress, or null. + */ + private TaskInProgress getTip(String jobid, String tipid) { + JobInProgress job = (JobInProgress) jobs.get(jobid); + return (job == null ? null + : (TaskInProgress) job.getTaskInProgress(tipid)); + } + /** * Get tracker name for a given task id. * @param taskId the name of the task Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Feb 23 12:13:00 2007 @@ -18,13 +18,16 @@ package org.apache.hadoop.mapred; -import java.io.*; -import java.util.*; - -import org.apache.commons.logging.*; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.conf.*; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics; @@ -58,6 +61,10 @@ private JobProfile profile; private Path localFile; private FileSystem localFs; + + // Contains the counters summed over all the tasks which + // have successfully completed + private Counters counters = new Counters(); public long getProtocolVersion(String protocol, long clientVersion) { return TaskUmbilicalProtocol.versionID; @@ -116,6 +123,7 @@ map.run(localConf, this); myMetrics.completeMap(); map_tasks -= 1; + updateCounters(map); } // move map output to reduce input @@ -144,6 +152,7 @@ reduce.run(localConf, this); myMetrics.completeReduce(); reduce_tasks -= 1; + updateCounters(reduce); } this.mapoutputFile.removeAll(reduceId); @@ -162,7 +171,7 @@ } } } - + private String newId() { return Integer.toString(Math.abs(random.nextInt()),36); } @@ -172,7 +181,7 @@ public Task getTask(String taskid) { return null; } public void progress(String taskId, float progress, String state, - TaskStatus.Phase phase) { + TaskStatus.Phase phase, Counters taskStats) { LOG.info(state); float taskIndex = mapIds.indexOf(taskId); if (taskIndex >= 0) { // mapping @@ -183,6 +192,30 @@ } // ignore phase + updateStatusCounters(taskStats); + } + + /** + * Updates counters corresponding to completed tasks. + * @param task A map or reduce task which has just been + * successfully completed + */ + private void updateCounters(Task task) { + counters.incrAllCounters(task.getCounters()); + status.setCounters(counters); + } + + /** + * Sets status counters to the sum of (1) the counters from + * all completed tasks, and (2) the counters from a particular + * task in progress. + * @param taskCounters Counters from a task that is in progress + */ + private void updateStatusCounters(Counters taskCounters) { + Counters newStats = new Counters(); + newStats.incrAllCounters(counters); + newStats.incrAllCounters(taskCounters); + status.setCounters(newStats); } public void reportDiagnosticInfo(String taskid, String trace) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Feb 23 12:13:00 2007 @@ -18,34 +18,41 @@ package org.apache.hadoop.mapred; -import java.io.*; -import java.util.*; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; -import org.apache.hadoop.io.*; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.io.SequenceFile.Sorter; -import org.apache.hadoop.io.SequenceFile.Sorter.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.metrics.MetricsRecord; - -import org.apache.commons.logging.*; -import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile.Sorter; +import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator; +import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapred.ReduceTask.ValuesIterator; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapred.ReduceTask.ValuesIterator; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.Updater; /** A Map task. */ class MapTask extends Task { - - private MapTaskMetrics myMetrics = null; private BytesWritable split = new BytesWritable(); private String splitClass; @@ -58,28 +65,11 @@ setPhase(TaskStatus.Phase.MAP); } - private class MapTaskMetrics { - private MetricsRecord mapInputMetrics = null; - private MetricsRecord mapOutputMetrics = null; - - MapTaskMetrics(String user) { - MetricsContext context = MetricsUtil.getContext("mapred"); - mapInputMetrics = MetricsUtil.createRecord(context, "mapInput", "user", user); - mapOutputMetrics = MetricsUtil.createRecord(context, "mapOutput", "user", user); - } - - synchronized void mapInput(int numBytes) { - mapInputMetrics.incrMetric("map_input_records", 1); - mapInputMetrics.incrMetric("map_input_bytes", numBytes); - mapInputMetrics.update(); - } - - synchronized void mapOutput(int numBytes) { - mapOutputMetrics.incrMetric("map_output_records", 1); - mapOutputMetrics.incrMetric("map_output_bytes", numBytes); - mapOutputMetrics.update(); - } - + private enum Counter { + INPUT_RECORDS, + INPUT_BYTES, + OUTPUT_RECORDS, + OUTPUT_BYTES } public MapTask() {} @@ -121,16 +111,12 @@ super.readFields(in); splitClass = Text.readString(in); split.readFields(in); - if (myMetrics == null) { - myMetrics = new MapTaskMetrics("unknown"); - } } public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException { - myMetrics = new MapTaskMetrics(job.getUser()); - Reporter reporter = getReporter(umbilical, getProgress()); + final Reporter reporter = getReporter(umbilical); MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter); @@ -175,7 +161,8 @@ setProgress(getProgress()); long beforePos = getPos(); boolean ret = rawIn.next(key, value); - myMetrics.mapInput((int)(getPos() - beforePos)); + reporter.incrCounter(Counter.INPUT_RECORDS, 1); + reporter.incrCounter(Counter.INPUT_BYTES, (getPos() - beforePos)); return ret; } public long getPos() throws IOException { return rawIn.getPos(); } @@ -337,7 +324,9 @@ int partNumber = partitioner.getPartition(key, value, partitions); sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength); - myMetrics.mapOutput((int)(keyValBuffer.getLength() - keyOffset)); + reporter.incrCounter(Counter.OUTPUT_RECORDS, 1); + reporter.incrCounter(Counter.OUTPUT_BYTES, + (keyValBuffer.getLength() - keyOffset)); //now check whether we need to spill to disk long totalMem = 0; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Feb 23 12:13:00 2007 @@ -18,19 +18,31 @@ package org.apache.hadoop.mapred; -import org.apache.hadoop.io.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.util.*; - -import java.io.*; -import java.util.*; -import java.text.*; - -import org.apache.hadoop.metrics.ContextFactory; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; /** A Reduce task. */ class ReduceTask extends Task { @@ -43,27 +55,7 @@ }); } - private class ReduceTaskMetrics { - private final MetricsRecord inputMetrics, outputMetrics; - - ReduceTaskMetrics(String user) { - MetricsContext context = MetricsUtil.getContext("mapred"); - inputMetrics = MetricsUtil.createRecord(context, "reduceInput", "user", user); - outputMetrics = MetricsUtil.createRecord(context, "reduceOutput", "user", user); - } - - synchronized void reduceInput() { - inputMetrics.incrMetric("reduce_input_records", 1); - inputMetrics.update(); - } - - synchronized void reduceOutput() { - outputMetrics.incrMetric("reduce_output_records", 1); - outputMetrics.update(); - } - } - - private ReduceTaskMetrics myMetrics = null; + private enum Counter { INPUT_RECORDS, OUTPUT_RECORDS } private int numMaps; private boolean sortComplete; @@ -115,9 +107,6 @@ super.readFields(in); numMaps = in.readInt(); - if (myMetrics == null) { - myMetrics = new ReduceTaskMetrics("unknown"); - } } /** Iterates values while keys match in sorted input. */ @@ -224,7 +213,6 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException { - myMetrics = new ReduceTaskMetrics(job.getUser()); Class valueClass = job.getMapOutputValueClass(); Reducer reducer = (Reducer)ReflectionUtils.newInstance( job.getReducerClass(), job); @@ -289,7 +277,7 @@ sortPhase.complete(); // sort is complete setPhase(TaskStatus.Phase.REDUCE); - Reporter reporter = getReporter(umbilical, getProgress()); + final Reporter reporter = getReporter(umbilical); // make output collector String finalName = getOutputName(getPartition()); @@ -308,7 +296,7 @@ public void collect(WritableComparable key, Writable value) throws IOException { out.write(key, value); - myMetrics.reduceOutput(); + reporter.incrCounter(Counter.OUTPUT_RECORDS, 1); reportProgress(umbilical); } }; @@ -321,7 +309,7 @@ keyClass, valClass, umbilical, job); values.informReduceProgress(); while (values.more()) { - myMetrics.reduceInput(); + reporter.incrCounter(Counter.INPUT_RECORDS, 1); reducer.reduce(values.getKey(), values, collector, reporter); values.nextKey(); values.informReduceProgress(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Feb 23 12:13:00 2007 @@ -405,8 +405,9 @@ this.lastPollTime = 0; MetricsContext metricsContext = MetricsUtil.getContext("mapred"); - this.shuffleMetrics = MetricsUtil.createRecord( - metricsContext, "shuffleInput", "user", conf.getUser()); + this.shuffleMetrics = + MetricsUtil.createRecord(metricsContext, "shuffleInput"); + this.shuffleMetrics.setTag("user", conf.getUser()); } /** Assemble all of the map output files */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Fri Feb 23 12:13:00 2007 @@ -19,6 +19,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; + import org.apache.hadoop.util.Progressable; /** Passed to application code to permit alteration of status. */ @@ -30,9 +31,10 @@ public static final Reporter NULL = new Reporter() { public void setStatus(String s) { } - public void progress() throws IOException { } + public void incrCounter(Enum key, long amount) { + } }; /** @@ -41,5 +43,14 @@ * @param status * a brief description of the current status */ - void setStatus(String status) throws IOException; + public abstract void setStatus(String status) throws IOException; + + /** + * Increments the counter identified by the key, which can be of + * any enum type, by the specified amount. + * @param key A value of any enum type + * @param amount A non-negative amount by which the counter is to + * be incremented + */ + public abstract void incrCounter(Enum key, long amount); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Fri Feb 23 12:13:00 2007 @@ -80,4 +80,5 @@ public TaskCompletionEvent[] getTaskCompletionEvents( int startFrom) throws IOException; + public Counters getCounters(); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Feb 23 12:13:00 2007 @@ -18,13 +18,17 @@ package org.apache.hadoop.mapred; -import org.apache.commons.logging.*; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.util.*; - -import java.io.*; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.StringUtils; /** Base class for tasks. */ abstract class Task implements Writable, Configurable { @@ -40,7 +44,8 @@ private String jobId; // unique jobid private String tipId ; private int partition; // id within job - private TaskStatus.Phase phase ; // current phase of the task + private TaskStatus.Phase phase ; // current phase of the task + //////////////////////////////////////////// // Constructors @@ -64,6 +69,7 @@ public String getJobFile() { return jobFile; } public String getTaskId() { return taskId; } public String getTipId(){ return tipId ; } + public Counters getCounters() { return counters; } /** * Get the job name for this task. @@ -142,26 +148,36 @@ /** The number of milliseconds between progress reports. */ public static final int PROGRESS_INTERVAL = 1000; - private volatile Progress taskProgress = new Progress(); + private transient Progress taskProgress = new Progress(); private transient long nextProgressTime = System.currentTimeMillis() + PROGRESS_INTERVAL; + private transient Counters counters = new Counters(); + public abstract boolean isMapTask(); public Progress getProgress() { return taskProgress; } - protected Reporter getReporter(final TaskUmbilicalProtocol umbilical, - final Progress progress) throws IOException { + protected Reporter getReporter(final TaskUmbilicalProtocol umbilical) + throws IOException + { return new Reporter() { public void setStatus(String status) throws IOException { synchronized (this) { - progress.setStatus(status); + taskProgress.setStatus(status); progress(); } } public void progress() throws IOException { reportProgress(umbilical); } + public void incrCounter(Enum key, long amount) { + Counters counters = getCounters(); + if (counters != null) { + String name = key.getDeclaringClass().getName()+"#"+key.toString(); + counters.incrCounter(name, amount); + } + } }; } @@ -169,21 +185,15 @@ taskProgress.set(progress); } - public void reportProgress(TaskUmbilicalProtocol umbilical, float progress) - throws IOException { - setProgress(progress); - reportProgress(umbilical); - } - public void reportProgress(TaskUmbilicalProtocol umbilical) { long now = System.currentTimeMillis(); - if (now > nextProgressTime) { - synchronized (this) { + synchronized (this) { + if (now > nextProgressTime) { nextProgressTime = now + PROGRESS_INTERVAL; float progress = taskProgress.get(); String status = taskProgress.toString(); try { - umbilical.progress(getTaskId(), progress, status, phase); + umbilical.progress(getTaskId(), progress, status, phase, counters); } catch (IOException ie) { LOG.warn(StringUtils.stringifyException(ie)); } @@ -199,7 +209,7 @@ if (needProgress) { // send a final status report umbilical.progress(getTaskId(), taskProgress.get(), - taskProgress.toString(), phase); + taskProgress.toString(), phase, counters); needProgress = false; } umbilical.done(getTaskId()); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Fri Feb 23 12:13:00 2007 @@ -17,12 +17,19 @@ */ package org.apache.hadoop.mapred; -import org.apache.commons.logging.*; -import org.apache.hadoop.io.BytesWritable; -import java.text.NumberFormat; import java.io.IOException; -import java.util.*; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.BytesWritable; //////////////////////////////////////////////////////// @@ -89,10 +96,13 @@ /** * Map from taskId -> TaskStatus */ - private TreeMap taskStatuses = new TreeMap(); + private TreeMap taskStatuses = + new TreeMap(); private TreeSet machinesWhereFailed = new TreeSet(); private TreeSet tasksReportedClosed = new TreeSet(); + + private Counters counters = new Counters(); /** * Constructor for MapTask @@ -240,6 +250,13 @@ public double getProgress() { return progress; } + + /** + * Get the task's counters + */ + public Counters getCounters() { + return counters; + } /** * Returns whether a component task-thread should be * closed because the containing JobInProgress has completed. @@ -278,7 +295,7 @@ TaskReport report = new TaskReport (getTIPId(), (float)progress, state, (String[])diagnostics.toArray(new String[diagnostics.size()]), - execStartTime, execFinishTime); + execStartTime, execFinishTime, counters); return report ; } @@ -361,7 +378,7 @@ // Note the failure and its location // LOG.info("Task '" + taskid + "' has been lost."); - TaskStatus status = (TaskStatus) taskStatuses.get(taskid); + TaskStatus status = taskStatuses.get(taskid); if (status != null) { status.setRunState(TaskStatus.State.FAILED); // tasktracker went down and failed time was not reported. @@ -392,7 +409,7 @@ */ void completedTask(String taskid) { LOG.info("Task '" + taskid + "' has completed."); - TaskStatus status = (TaskStatus) taskStatuses.get(taskid); + TaskStatus status = taskStatuses.get(taskid); status.setRunState(TaskStatus.State.SUCCEEDED); activeTasks.remove(taskid); } @@ -421,9 +438,17 @@ * Get the Status of the tasks managed by this TIP */ public TaskStatus[] getTaskStatuses() { - return (TaskStatus[])taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]); + return taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]); } + /** + * Get the status of the specified task + * @param taskid + * @return + */ + public TaskStatus getTaskStatus(String taskid) { + return taskStatuses.get(taskid); + } /** * The TIP's been ordered kill()ed. */ @@ -460,22 +485,26 @@ } else { double bestProgress = 0; String bestState = ""; + Counters bestCounters = new Counters(); for (Iterator it = taskStatuses.keySet().iterator(); it.hasNext(); ) { String taskid = (String) it.next(); - TaskStatus status = (TaskStatus) taskStatuses.get(taskid); + TaskStatus status = taskStatuses.get(taskid); if (status.getRunState() == TaskStatus.State.SUCCEEDED) { bestProgress = 1; bestState = status.getStateString(); + bestCounters = status.getCounters(); break; } else if (status.getRunState() == TaskStatus.State.RUNNING) { if (status.getProgress() >= bestProgress) { bestProgress = status.getProgress(); bestState = status.getStateString(); + bestCounters = status.getCounters(); } } } this.progress = bestProgress; this.state = bestState; + this.counters = bestCounters; } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java Fri Feb 23 12:13:00 2007 @@ -17,9 +17,13 @@ */ package org.apache.hadoop.mapred; -import org.apache.hadoop.io.*; - -import java.io.*; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; /** A report on the state of a task. */ public class TaskReport implements Writable { @@ -29,17 +33,20 @@ private String[] diagnostics; private long startTime ; private long finishTime; + private Counters counters; public TaskReport() {} TaskReport(String taskid, float progress, String state, - String[] diagnostics, long startTime, long finishTime) { + String[] diagnostics, long startTime, long finishTime, + Counters counters) { this.taskid = taskid; this.progress = progress; this.state = state; this.diagnostics = diagnostics; this.startTime = startTime ; this.finishTime = finishTime ; + this.counters = counters; } /** The id of the task. */ @@ -50,6 +57,9 @@ public String getState() { return state; } /** A list of error messages. */ public String[] getDiagnostics() { return diagnostics; } + /** A table of counters. */ + public Counters getCounters() { return counters; } + /** * Get finish time of task. * @return 0, if finish time was not set else returns finish time. @@ -90,6 +100,7 @@ out.writeLong(startTime); out.writeLong(finishTime); WritableUtils.writeStringArray(out, diagnostics); + counters.write(out); } public void readFields(DataInput in) throws IOException { @@ -100,5 +111,7 @@ this.finishTime = in.readLong() ; diagnostics = WritableUtils.readStringArray(in); + counters = new Counters(); + counters.readFields(in); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Fri Feb 23 12:13:00 2007 @@ -17,9 +17,13 @@ */ package org.apache.hadoop.mapred; -import org.apache.hadoop.io.*; - -import java.io.*; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; /************************************************** * Describes the current status of a task. This is * not intended to be a comprehensive piece of data. @@ -49,13 +53,14 @@ private long sortFinishTime ; private Phase phase = Phase.STARTING; + private Counters counters; public TaskStatus() {} public TaskStatus(String taskid, boolean isMap, float progress, State runState, String diagnosticInfo, String stateString, String taskTracker, - Phase phase) { + Phase phase, Counters counters) { this.taskid = taskid; this.isMap = isMap; this.progress = progress; @@ -64,6 +69,7 @@ this.stateString = stateString; this.taskTracker = taskTracker; this.phase = phase ; + this.counters = counters; } public String getTaskId() { return taskid; } @@ -176,6 +182,20 @@ void setPhase(Phase p){ this.phase = p ; } + /** + * Get task's counters. + */ + public Counters getCounters() { + return counters; + } + /** + * Set the task's counters. + * @param counters + */ + public void setCounters(Counters counters) { + this.counters = counters; + } + ////////////////////////////////////////////// // Writable ////////////////////////////////////////////// @@ -193,6 +213,7 @@ out.writeLong(shuffleFinishTime); out.writeLong(sortFinishTime); } + counters.write(out); } public void readFields(DataInput in) throws IOException { @@ -209,6 +230,8 @@ shuffleFinishTime = in.readLong(); sortFinishTime = in.readLong(); } + counters = new Counters(); + counters.readFields(in); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Feb 23 12:13:00 2007 @@ -17,18 +17,22 @@ */ package org.apache.hadoop.mapred; -import org.apache.commons.logging.*; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.ipc.*; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.MetricsException; -import org.apache.hadoop.util.*; -import org.apache.hadoop.util.DiskChecker.DiskErrorException; - -import java.io.*; -import java.net.*; -import java.util.*; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Pattern; @@ -38,10 +42,27 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.hadoop.metrics.MetricsContext; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.DF; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSError; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsException; import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.RunJar; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; /******************************************************* * TaskTracker is a process that starts and tracks MR Tasks @@ -939,7 +960,8 @@ diagnosticInfo.toString(), "initializing", getName(), task.isMapTask()? TaskStatus.Phase.MAP: - TaskStatus.Phase.SHUFFLE); + TaskStatus.Phase.SHUFFLE, + task.getCounters()); keepJobFiles = false; taskTimeout = (10 * 60 * 1000); } @@ -1022,7 +1044,9 @@ * The task is reporting its progress */ public synchronized void reportProgress(float p, String state, - TaskStatus.Phase newPhase) { + TaskStatus.Phase newPhase, + Counters counters) + { LOG.info(task.getTaskId()+" "+p+"% "+state); this.progress = p; this.runstate = TaskStatus.State.RUNNING; @@ -1038,6 +1062,7 @@ this.taskStatus.setPhase(newPhase); } this.taskStatus.setStateString(state); + this.taskStatus.setCounters(counters); } /** @@ -1272,11 +1297,12 @@ */ public synchronized void progress(String taskid, float progress, String state, - TaskStatus.Phase phase + TaskStatus.Phase phase, + Counters counters ) throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { - tip.reportProgress(progress, state, phase); + tip.reportProgress(progress, state, phase, counters); } else { LOG.warn("Progress from unknown child task: "+taskid+". Ignored."); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Feb 23 12:13:00 2007 @@ -38,9 +38,10 @@ * @param progress value between zero and one * @param state description of task's current state * @param phase current phase of the task. + * @param counters the counters for this task. */ void progress(String taskid, float progress, String state, - TaskStatus.Phase phase) + TaskStatus.Phase phase, Counters counters) throws IOException; /** Report error messages back to parent. Calls should be sparing, since all Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java Fri Feb 23 12:13:00 2007 @@ -25,8 +25,6 @@ /** * Utility class to simplify creation and reporting of hadoop metrics. - * This class makes the simplifying assumption that each metrics record has - * exactly one tag, which defaults to being the hostName. * * For examples of usage, see {@link org.apache.hadoop.dfs.DataNode}. * @see org.apache.hadoop.metrics.MetricsRecord @@ -62,26 +60,6 @@ } return metricsContext; } - - /** - * Utility method to create and return a new metrics record instance - * within the given name and the specified tag. - * - * @param context the context - * @param recordName the name of the record - * @param tagName name of the tag field of metrics record - * @param tagValue value of the tag field - * @return newly created metrics record - */ - public static MetricsRecord createRecord(MetricsContext context, - String recordName, - String tagName, - String tagValue) - { - MetricsRecord metricsRecord = context.createRecord(recordName); - metricsRecord.setTag(tagName, tagValue); - return metricsRecord; - } /** * Utility method to create and return new metrics record instance within the @@ -92,9 +70,11 @@ * @return newly created metrics record */ public static MetricsRecord createRecord(MetricsContext context, - String recordName) + String recordName) { - return createRecord(context, recordName, "hostName", getHostName()); + MetricsRecord metricsRecord = context.createRecord(recordName); + metricsRecord.setTag("hostName", getHostName()); + return metricsRecord; } /** Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java Fri Feb 23 12:13:00 2007 @@ -383,10 +383,7 @@ else return; for( int i=0; i < nrFiles; i++) - ioer.doIO(new Reporter() { - public void setStatus(String status) throws IOException {} - public void progress() throws IOException {} - }, + ioer.doIO(Reporter.NULL, BASE_FILE_NAME+Integer.toString(i), MEGA*fileSize ); } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java Fri Feb 23 12:13:00 2007 @@ -290,10 +290,7 @@ else return; for( int i=0; i < nrFiles; i++) - ioer.doIO(new Reporter() { - public void setStatus(String status) throws IOException {} - public void progress() throws IOException {} - }, + ioer.doIO(Reporter.NULL, BASE_FILE_NAME+Integer.toString(i), MEGA*fileSize ); } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java Fri Feb 23 12:13:00 2007 @@ -38,10 +38,7 @@ private static final Path inDir = new Path(System.getProperty("test.build.data",".") + "/mapred"); private static final Path inFile = new Path(inDir, "test.seq"); private static final Random random = new Random(1); - private static final Reporter reporter = new Reporter() { - public void setStatus(String status) throws IOException {} - public void progress() throws IOException {} - }; + private static final Reporter reporter = Reporter.NULL; static { job.setInputPath(inDir); Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java Fri Feb 23 12:13:00 2007 @@ -40,10 +40,7 @@ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); Path file = new Path(dir, "test.seq"); - Reporter reporter = new Reporter() { - public void setStatus(String status) throws IOException {} - public void progress() throws IOException {} - }; + Reporter reporter = Reporter.NULL; int seed = new Random().nextInt(); //LOG.info("seed = "+seed); Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Fri Feb 23 12:13:00 2007 @@ -51,10 +51,8 @@ JobConf job = new JobConf(); Path file = new Path(workDir, "test.txt"); - Reporter reporter = new Reporter() { - public void setStatus(String status) throws IOException {} - public void progress() throws IOException {} - }; + // A reporter that does nothing + Reporter reporter = Reporter.NULL; int seed = new Random().nextInt(); LOG.info("seed = "+seed); @@ -178,11 +176,7 @@ stm.close(); } - private static class VoidReporter implements Reporter { - public void progress() {} - public void setStatus(String msg) {} - } - private static final Reporter voidReporter = new VoidReporter(); + private static final Reporter voidReporter = Reporter.NULL; private static List readSplit(InputFormat format, InputSplit split, Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java Fri Feb 23 12:13:00 2007 @@ -46,10 +46,8 @@ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); Path file = new Path(dir, "test.seq"); - Reporter reporter = new Reporter() { - public void setStatus(String status) throws IOException {} - public void progress() throws IOException {} - }; + // A reporter that does nothing + Reporter reporter = Reporter.NULL; int seed = new Random().nextInt(); //LOG.info("seed = "+seed); Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Fri Feb 23 12:13:00 2007 @@ -98,6 +98,16 @@ printTaskSummary(out, jobId, "reduce", status.reduceProgress(), job.getReduceTasks()); out.print("\n"); + + Counters counters = status.getCounters(); + out.println("

"); + out.println(""); + out.println(""); + for (String counter : counters.getCounterNames()) { + out.print("\n"); + } + out.print("
CounterValue
" + counter + "" + counters.getCounter(counter) + + "
\n"); } %> Modified: lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp Fri Feb 23 12:13:00 2007 @@ -55,14 +55,15 @@ out.print("

Tasks

"); out.print("
"); out.print(""); - out.print(""); + out.print("" + + ""); if (end_index > report_len){ end_index = report_len; } for (int i = start_index ; i < end_index; i++) { TaskReport report = reports[i]; out.print(""); out.print(""); @@ -71,10 +72,16 @@ out.println(""); String[] diagnostics = report.getDiagnostics(); + out.print(""); + out.println(diagnostics[j]); } - out.print("\n"); + out.println("
"); + out.println(""); } out.print("
TaskCompleteStatusStart TimeFinish TimeErrors
TaskCompleteStatusStart TimeFinish TimeErrorsCounters
" + + "&tipid=" + report.getTaskId() + "\">" + report.getTaskId() + "" + StringUtils.formatPercent(report.getProgress(),2) + "" + StringUtils.getFormattedTimeWithDiff(dateFormat, report.getFinishTime(), report.getStartTime()) + "
");
          for (int j = 0; j < diagnostics.length ; j++) {
-                out.print("
" + diagnostics[j] + "
" + + "" + report.getCounters().size() + + "
"); out.print("
"); Modified: lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp?view=diff&rev=511075&r1=511074&r2=511075 ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Fri Feb 23 12:13:00 2007 @@ -15,7 +15,7 @@ String jobid = request.getParameter("jobid"); JobTracker tracker = JobTracker.getTracker(); JobInProgress job = (JobInProgress) tracker.getJob(jobid); - String tipid = request.getParameter("taskid"); + String tipid = request.getParameter("tipid"); TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(jobid, tipid): null; %> @@ -44,7 +44,7 @@ <% } %> -Finish TimeErrorsTask Logs +Finish TimeErrorsTask LogsCounters <% for (int i = 0; i < ts.length; i++) { TaskStatus status = ts[i]; @@ -101,7 +101,11 @@ out.print("Last 8KB
"); out.print("All
"); } - out.println("\n"); + out.println("" + + "" + status.getCounters().size() + + ""); } } %> Added: lucene/hadoop/trunk/src/webapps/job/taskstats.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskstats.jsp?view=auto&rev=511075 ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/taskstats.jsp (added) +++ lucene/hadoop/trunk/src/webapps/job/taskstats.jsp Fri Feb 23 12:13:00 2007 @@ -0,0 +1,64 @@ +<%@ page + contentType="text/html; charset=UTF-8" + import="javax.servlet.*" + import="javax.servlet.http.*" + import="java.io.*" + import="java.lang.String" + import="java.util.*" + import="org.apache.hadoop.mapred.*" + import="org.apache.hadoop.util.*" + import="java.text.SimpleDateFormat" + import="org.apache.hadoop.util.*" +%> +<% + String jobid = request.getParameter("jobid"); + JobTracker tracker = JobTracker.getTracker(); + JobInProgress job = (JobInProgress) tracker.getJob(jobid); + String tipid = request.getParameter("tipid"); + String taskid = request.getParameter("taskid"); + Counters counters; + if (taskid == null) { + counters = tracker.getTipCounters(jobid, tipid); + taskid = tipid; // for page title etc + } + else { + TaskStatus taskStatus = tracker.getTaskStatus(jobid, tipid, taskid); + counters = taskStatus.getCounters(); + } +%> + + +Counters for <%=taskid%> + +

Counters for <%=taskid%>

+ +
+ +<% + if( counters == null ) { +%> +

No counter information found for this task

+<% + }else{ +%> + + + <% + for (String counter : counters.getCounterNames()) { + long value = counters.getCounter(counter); + %> + + <% + } + %> +
CounterValue
<%=counter%><%=value%>
+<% + } +%> + +
+Go back to the job
+Go back to JobTracker
+Hadoop, 2006.
+ +