hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1158072 [2/3] - in /hadoop/common/branches/HDFS-1623/mapreduce: ./ conf/ src/c++/ src/contrib/ src/contrib/block_forensics/ src/contrib/capacity-scheduler/ src/contrib/data_join/ src/contrib/dynamic-scheduler/ src/contrib/eclipse-plugin/ s...
Date Tue, 16 Aug 2011 00:37:31 GMT
Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/MapOutputFile.java Tue Aug 16 00:37:15 2011
@@ -23,9 +23,8 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.MRConfig;
 
 /**
  * Manipulate the working area for the transient store for maps and reduces.
@@ -38,164 +37,132 @@ import org.apache.hadoop.mapreduce.MRCon
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class MapOutputFile {
+public abstract class MapOutputFile implements Configurable {
 
-  private JobConf conf;
+  private Configuration conf;
 
+  static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
+  static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
   static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
 
   public MapOutputFile() {
   }
 
-  private LocalDirAllocator lDirAlloc = 
-    new LocalDirAllocator(MRConfig.LOCAL_DIR);
-  
   /**
    * Return the path to local map output file created earlier
-   * 
+   *
    * @return path
    * @throws IOException
    */
-  public Path getOutputFile()
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
-        + "file.out", conf);
-  }
+  public abstract Path getOutputFile() throws IOException;
 
   /**
    * Create a local map output file name.
-   * 
+   *
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getOutputFileForWrite(long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
-        + "file.out", size, conf);
-  }
+  public abstract Path getOutputFileForWrite(long size) throws IOException;
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public abstract Path getOutputFileForWriteInVolume(Path existing);
 
   /**
    * Return the path to a local map output index file created earlier
-   * 
+   *
    * @return path
    * @throws IOException
    */
-  public Path getOutputIndexFile()
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
-        + "file.out.index", conf);
-  }
+  public abstract Path getOutputIndexFile() throws IOException;
 
   /**
    * Create a local map output index file name.
-   * 
+   *
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getOutputIndexFileForWrite(long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
-        + "file.out.index", size, conf);
-  }
+  public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
 
   /**
    * Return a local map spill file created earlier.
-   * 
+   *
    * @param spillNumber the number
    * @return path
    * @throws IOException
    */
-  public Path getSpillFile(int spillNumber)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
-        + spillNumber + ".out", conf);
-  }
+  public abstract Path getSpillFile(int spillNumber) throws IOException;
 
   /**
    * Create a local map spill file name.
-   * 
+   *
    * @param spillNumber the number
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getSpillFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
-        + spillNumber + ".out", size, conf);
-  }
+  public abstract Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException;
 
   /**
    * Return a local map spill index file created earlier
-   * 
+   *
    * @param spillNumber the number
    * @return path
    * @throws IOException
    */
-  public Path getSpillIndexFile(int spillNumber)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
-        + spillNumber + ".out.index", conf);
-  }
+  public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
 
   /**
    * Create a local map spill index file name.
-   * 
+   *
    * @param spillNumber the number
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getSpillIndexFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
-        + spillNumber + ".out.index", size, conf);
-  }
+  public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException;
 
   /**
    * Return a local reduce input file created earlier
-   * 
+   *
    * @param mapId a map task id
    * @return path
-   * @throws IOException 
+   * @throws IOException
    */
-  public Path getInputFile(int mapId)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(String.format(
-        REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer
-            .valueOf(mapId)), conf);
-  }
+  public abstract Path getInputFile(int mapId) throws IOException;
 
   /**
    * Create a local reduce input file name.
-   * 
+   *
    * @param mapId a map task id
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, 
-                                   long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(String.format(
-        REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
-        size, conf);
-  }
+  public abstract Path getInputFileForWrite(
+      org.apache.hadoop.mapreduce.TaskID mapId, long size) throws IOException;
 
   /** Removes all of the files related to a task. */
-  public void removeAll()
-      throws IOException {
-    conf.deleteLocalFiles(TaskTracker.OUTPUT);
-  }
+  public abstract void removeAll() throws IOException;
 
+  @Override
   public void setConf(Configuration conf) {
-    if (conf instanceof JobConf) {
-      this.conf = (JobConf) conf;
-    } else {
-      this.conf = new JobConf(conf);
-    }
+    this.conf = conf;
   }
-  
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
 }

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java Tue Aug 16 00:37:15 2011
@@ -1735,13 +1735,13 @@ class MapTask extends Task {
       }
       if (numSpills == 1) { //the spill is the final output
         rfs.rename(filename[0],
-            new Path(filename[0].getParent(), "file.out"));
+            mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
         if (indexCacheList.size() == 0) {
           rfs.rename(mapOutputFile.getSpillIndexFile(0),
-              new Path(filename[0].getParent(),"file.out.index"));
+            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
         } else {
           indexCacheList.get(0).writeToFile(
-                new Path(filename[0].getParent(),"file.out.index"), job);
+            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
         }
         return;
       }

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Aug 16 00:37:15 2011
@@ -362,7 +362,8 @@ public class ReduceTask extends Task {
                     shuffledMapsCounter,
                     reduceShuffleBytes, failedShuffleCounter,
                     mergedMapOutputsCounter,
-                    taskStatus, copyPhase, sortPhase, this);
+                    taskStatus, copyPhase, sortPhase, this,
+                    mapOutputFile);
       rIter = shuffle.run();
     } else {
       // local job runner doesn't have a copy phase

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/Task.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/Task.java Tue Aug 16 00:37:15 2011
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-
 import javax.crypto.SecretKey;
 
 import org.apache.commons.logging.Log;
@@ -54,6 +53,7 @@ import org.apache.hadoop.io.serializer.D
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.JobStatus;
@@ -146,7 +146,7 @@ abstract public class Task implements Wr
   private long initCpuCumulativeTime = 0;
 
   protected JobConf conf;
-  protected MapOutputFile mapOutputFile = new MapOutputFile();
+  protected MapOutputFile mapOutputFile;
   protected LocalDirAllocator lDirAlloc;
   private final static int MAX_RETRIES = 10;
   protected JobContext jobContext;
@@ -819,37 +819,41 @@ abstract public class Task implements Wr
    * system and only creates the counters when they are needed.
    */
   class FileSystemStatisticUpdater {
-    private long prevReadBytes = 0;
-    private long prevWriteBytes = 0;
     private FileSystem.Statistics stats;
-    private Counters.Counter readCounter = null;
-    private Counters.Counter writeCounter = null;
-    private String[] counterNames;
+    private Counters.Counter readBytesCounter, writeBytesCounter,
+        readOpsCounter, largeReadOpsCounter, writeOpsCounter;
     
-    FileSystemStatisticUpdater(String uriScheme, FileSystem.Statistics stats) {
+    FileSystemStatisticUpdater(FileSystem.Statistics stats) {
       this.stats = stats;
-      this.counterNames = getFileSystemCounterNames(uriScheme);
     }
 
     void updateCounters() {
-      long newReadBytes = stats.getBytesRead();
-      long newWriteBytes = stats.getBytesWritten();
-      if (prevReadBytes != newReadBytes) {
-        if (readCounter == null) {
-          readCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP, 
-              counterNames[0]);
-        }
-        readCounter.increment(newReadBytes - prevReadBytes);
-        prevReadBytes = newReadBytes;
-      }
-      if (prevWriteBytes != newWriteBytes) {
-        if (writeCounter == null) {
-          writeCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP, 
-              counterNames[1]);
-        }
-        writeCounter.increment(newWriteBytes - prevWriteBytes);
-        prevWriteBytes = newWriteBytes;
+      String scheme = stats.getScheme();
+      if (readBytesCounter == null) {
+        readBytesCounter = counters.findCounter(scheme,
+            FileSystemCounter.BYTES_READ);
+      }
+      readBytesCounter.setValue(stats.getBytesRead());
+      if (writeBytesCounter == null) {
+        writeBytesCounter = counters.findCounter(scheme,
+            FileSystemCounter.BYTES_WRITTEN);
+      }
+      writeBytesCounter.setValue(stats.getBytesWritten());
+      if (readOpsCounter == null) {
+        readOpsCounter = counters.findCounter(scheme,
+            FileSystemCounter.READ_OPS);
+      }
+      readOpsCounter.setValue(stats.getReadOps());
+      if (largeReadOpsCounter == null) {
+        largeReadOpsCounter = counters.findCounter(scheme,
+            FileSystemCounter.LARGE_READ_OPS);
+      }
+      largeReadOpsCounter.setValue(stats.getLargeReadOps());
+      if (writeOpsCounter == null) {
+        writeOpsCounter = counters.findCounter(scheme,
+            FileSystemCounter.WRITE_OPS);
       }
+      writeOpsCounter.setValue(stats.getWriteOps());
     }
   }
   
@@ -864,7 +868,7 @@ abstract public class Task implements Wr
       String uriScheme = stat.getScheme();
       FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme);
       if(updater==null) {//new FileSystem has been found in the cache
-        updater = new FileSystemStatisticUpdater(uriScheme, stat);
+        updater = new FileSystemStatisticUpdater(stat);
         statisticUpdaters.put(uriScheme, updater);
       }
       updater.updateCounters();      
@@ -1150,7 +1154,9 @@ abstract public class Task implements Wr
     } else {
       this.conf = new JobConf(conf);
     }
-    this.mapOutputFile.setConf(this.conf);
+    this.mapOutputFile = ReflectionUtils.newInstance(
+        conf.getClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
+          MROutputFiles.class, MapOutputFile.class), conf);
     this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
     // add the static resolutions (this is required for the junit to
     // work on testcases that simulate multiple nodes on a single physical

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Aug 16 00:37:15 2011
@@ -31,25 +31,32 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
 import org.apache.hadoop.mapred.SortedRanges.Range;
+
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
 import org.apache.hadoop.net.Node;
 
 
+
 /*************************************************************
  * TaskInProgress maintains all the info needed for a
  * Task in the lifetime of its owning Job.  A given Task
  * might be speculatively executed or reexecuted, so we
  * need a level of indirection above the running-id itself.
  * <br>
- * A given TaskInProgress contains multiple taskids,
+ * A given TaskInProgress contains multiple task attempt ids,
  * 0 or more of which might be executing at any one time.
- * (That's what allows speculative execution.)  A taskid
- * is now *never* recycled.  A TIP allocates enough taskids
+ * (That's what allows speculative execution.)  A task attempt id
+ * is now *never* recycled.  A TIP allocates enough task attempt ids
  * to account for all the speculation and failures it will
  * ever have to handle.  Once those are up, the TIP is dead.
  * **************************************************************
@@ -60,6 +67,10 @@ class TaskInProgress {
   static final long SPECULATIVE_LAG = 60 * 1000;
   private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
 
+  private static final long MEMORY_SPLITS_RESOLUTION = 1024;
+
+  static final int DEFAULT_STATISTICS_INTERVALS = 12;
+
   public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
 
   // Defines the TIP
@@ -91,6 +102,10 @@ class TaskInProgress {
   private volatile boolean skipping = false;
   private boolean jobCleanup = false; 
   private boolean jobSetup = false;
+
+  static final Enum<?> CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
+  static final Enum<?> VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
+  static final Enum<?> PHYSICAL_BYTES_KEY = TaskCounter.PHYSICAL_MEMORY_BYTES;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -109,12 +124,20 @@ class TaskInProgress {
   private JobConf conf;
   private Map<TaskAttemptID,List<String>> taskDiagnosticData =
     new TreeMap<TaskAttemptID,List<String>>();
+
   /**
-   * Map from taskId -> TaskStatus
+   * Map from task attempt Id -> TaskStatus
    */
   TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
     new TreeMap<TaskAttemptID,TaskStatus>();
 
+  
+  /**
+   * Map from task attempt Id -> splits block
+   */
+  private Map<TaskAttemptID, ProgressSplitsBlock> splitsBlocks
+    = new TreeMap<TaskAttemptID, ProgressSplitsBlock>();
+
   // Map from taskId -> TaskTracker Id, 
   // contains cleanup attempts and where they ran, if any
   private TreeMap<TaskAttemptID, String> cleanupTasks =
@@ -183,6 +206,62 @@ class TaskInProgress {
     }
     this.user = job.getUser();
   }
+
+  synchronized ProgressSplitsBlock getSplits(TaskAttemptID statusAttemptID) {
+    ProgressSplitsBlock result = splitsBlocks.get(statusAttemptID);
+
+    if (result == null) {
+      result
+        = new ProgressSplitsBlock
+            (conf.getInt(JTConfig.JT_JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS,
+                         ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS));
+      splitsBlocks.put(statusAttemptID, result);
+    }
+
+    return result;
+  }
+
+  private void updateProgressSplits(TaskStatus taskStatus) {
+    double newProgress = taskStatus.getProgress();
+
+    Counters counters = taskStatus.getCounters();
+    if (counters == null) return;
+
+    TaskAttemptID statusAttemptID = taskStatus.getTaskID();
+    ProgressSplitsBlock splitsBlock = getSplits(statusAttemptID);
+
+    if (splitsBlock != null) {
+
+      long now = JobTracker.getClock().getTime();
+      Long start = getDispatchTime(statusAttemptID);
+
+      if (start != null && now - start <= Integer.MAX_VALUE) {
+        splitsBlock.progressWallclockTime.extend
+          (newProgress, (int)(now - start));
+      }
+
+      Counters.Counter cpuCounter = counters.findCounter(CPU_COUNTER_KEY);
+      if (cpuCounter != null
+          && cpuCounter.getCounter() <= Integer.MAX_VALUE) {
+        splitsBlock.progressCPUTime.extend
+          (newProgress, (int)(cpuCounter.getCounter()));
+      }
+
+      Counters.Counter virtualBytes = counters.findCounter(VM_BYTES_KEY);
+      if (virtualBytes != null) {
+        splitsBlock.progressVirtualMemoryKbytes.extend
+          (newProgress,
+           (int)(virtualBytes.getCounter() / (MEMORY_SPLITS_RESOLUTION)));
+      }
+
+      Counters.Counter physicalBytes = counters.findCounter(PHYSICAL_BYTES_KEY);
+      if (physicalBytes != null) {
+        splitsBlock.progressPhysicalMemoryKbytes.extend
+          (newProgress,
+           (int)(physicalBytes.getCounter() / (MEMORY_SPLITS_RESOLUTION)));
+      }
+    }
+  }
   
   /**
    * Set the max number of attempts before we declare a TIP as "failed"
@@ -294,6 +373,7 @@ class TaskInProgress {
     return execFinishTime;
   }
 
+
   /**
    * Set the exec finish time
    */
@@ -582,23 +662,24 @@ class TaskInProgress {
    * @return has the task changed its state noticeably?
    */
   synchronized boolean updateStatus(TaskStatus status) {
-    TaskAttemptID taskid = status.getTaskID();
-    String tracker = status.getTaskTracker();
-    String diagInfo = status.getDiagnosticInfo();
-    TaskStatus oldStatus = taskStatuses.get(taskid);
-    boolean changed = true;
-    if (diagInfo != null && diagInfo.length() > 0) {
-      LOG.info("Error from " + taskid + " on " +  tracker + ": "+ diagInfo);
-      addDiagnosticInfo(taskid, diagInfo);
-    }
-    
-    if(skipping) {
-      failedRanges.updateState(status);
-    }
-    
-    if (oldStatus != null) {
-      TaskStatus.State oldState = oldStatus.getRunState();
-      TaskStatus.State newState = status.getRunState();
+    try {
+      TaskAttemptID taskid = status.getTaskID();
+      String tracker = status.getTaskTracker();
+      String diagInfo = status.getDiagnosticInfo();
+      TaskStatus oldStatus = taskStatuses.get(taskid);
+      boolean changed = true;
+      if (diagInfo != null && diagInfo.length() > 0) {
+        LOG.info("Error from " + taskid + " on " +  tracker + ": "+ diagInfo);
+        addDiagnosticInfo(taskid, diagInfo);
+      }
+    
+      if(skipping) {
+        failedRanges.updateState(status);
+      }
+    
+      if (oldStatus != null) {
+        TaskStatus.State oldState = oldStatus.getRunState();
+        TaskStatus.State newState = status.getRunState();
           
       // We should never receive a duplicate success/failure/killed
       // status update for the same taskid! This is a safety check, 
@@ -617,60 +698,63 @@ class TaskInProgress {
         return false;
       }
 
-      // The task is not allowed to move from completed back to running.
-      // We have seen out of order status messagesmoving tasks from complete
-      // to running. This is a spot fix, but it should be addressed more
-      // globally.
-      if ((newState == TaskStatus.State.RUNNING || 
-          newState == TaskStatus.State.UNASSIGNED) &&
-          (oldState == TaskStatus.State.FAILED || 
-           oldState == TaskStatus.State.KILLED || 
-           oldState == TaskStatus.State.FAILED_UNCLEAN || 
-           oldState == TaskStatus.State.KILLED_UNCLEAN || 
-           oldState == TaskStatus.State.SUCCEEDED ||
-           oldState == TaskStatus.State.COMMIT_PENDING)) {
-        return false;
-      }
+        // The task is not allowed to move from completed back to running.
+        // We have seen out of order status messagesmoving tasks from complete
+        // to running. This is a spot fix, but it should be addressed more
+        // globally.
+        if ((newState == TaskStatus.State.RUNNING || 
+             newState == TaskStatus.State.UNASSIGNED) &&
+            (oldState == TaskStatus.State.FAILED || 
+             oldState == TaskStatus.State.KILLED || 
+             oldState == TaskStatus.State.FAILED_UNCLEAN || 
+             oldState == TaskStatus.State.KILLED_UNCLEAN || 
+             oldState == TaskStatus.State.SUCCEEDED ||
+             oldState == TaskStatus.State.COMMIT_PENDING)) {
+          return false;
+        }
       
-      //Do not accept any status once the task is marked FAILED/KILLED
-      //This is to handle the case of the JobTracker timing out a task
-      //due to launch delay, but the TT comes back with any state or 
-      //TT got expired
-      if (oldState == TaskStatus.State.FAILED ||
-          oldState == TaskStatus.State.KILLED) {
-        tasksToKill.put(taskid, true);
-        return false;	  
-      }
+        //Do not accept any status once the task is marked FAILED/KILLED
+        //This is to handle the case of the JobTracker timing out a task
+        //due to launch delay, but the TT comes back with any state or 
+        //TT got expired
+        if (oldState == TaskStatus.State.FAILED ||
+            oldState == TaskStatus.State.KILLED) {
+          tasksToKill.put(taskid, true);
+          return false;	  
+        }
           
-      changed = oldState != newState;
-    }
-    // if task is a cleanup attempt, do not replace the complete status,
-    // update only specific fields.
-    // For example, startTime should not be updated, 
-    // but finishTime has to be updated.
-    if (!isCleanupAttempt(taskid)) {
-      taskStatuses.put(taskid, status);
-      //we don't want to include setup tasks in the task execution stats
-      if (!isJobSetupTask() && ((isMapTask() && job.hasSpeculativeMaps()) || 
-          (!isMapTask() && job.hasSpeculativeReduces()))) {
-        long now = JobTracker.getClock().getTime();
-        double oldProgRate = getOldProgressRate();
-        double currProgRate = getCurrentProgressRate(now);
-        job.updateStatistics(oldProgRate, currProgRate, isMapTask());
-        //we need to store the current progress rate, so that we can
-        //update statistics accurately the next time we invoke
-        //updateStatistics
-        setProgressRate(currProgRate);
+        changed = oldState != newState;
+      }
+      // if task is a cleanup attempt, do not replace the complete status,
+      // update only specific fields.
+      // For example, startTime should not be updated, 
+      // but finishTime has to be updated.
+      if (!isCleanupAttempt(taskid)) {
+        taskStatuses.put(taskid, status);
+        //we don't want to include setup tasks in the task execution stats
+        if (!isJobSetupTask() && ((isMapTask() && job.hasSpeculativeMaps()) || 
+                                  (!isMapTask() && job.hasSpeculativeReduces()))) {
+          long now = JobTracker.getClock().getTime();
+          double oldProgRate = getOldProgressRate();
+          double currProgRate = getCurrentProgressRate(now);
+          job.updateStatistics(oldProgRate, currProgRate, isMapTask());
+          //we need to store the current progress rate, so that we can
+          //update statistics accurately the next time we invoke
+          //updateStatistics
+          setProgressRate(currProgRate);
+        }
+      } else {
+        taskStatuses.get(taskid).statusUpdate(status.getRunState(),
+                                              status.getProgress(), status.getStateString(), status.getPhase(),
+                                              status.getFinishTime());
       }
-    } else {
-      taskStatuses.get(taskid).statusUpdate(status.getRunState(),
-        status.getProgress(), status.getStateString(), status.getPhase(),
-        status.getFinishTime());
-    }
 
-    // Recompute progress
-    recomputeProgress();
-    return changed;
+      // Recompute progress
+      recomputeProgress();
+      return changed;
+    } finally {
+      updateProgressSplits(status);
+    }
   }
 
   /**
@@ -953,7 +1037,7 @@ class TaskInProgress {
           if (status.getProgress() >= bestProgress) {
             bestProgress = status.getProgress();
             bestState = status.getStateString();
-            if (status.getIncludeCounters()) {
+            if (status.getIncludeAllCounters()) {
               bestCounters = status.getCounters();
             } else {
               bestCounters = this.counters;

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue Aug 16 00:37:15 2011
@@ -227,8 +227,12 @@ class TaskMemoryManagerThread extends Th
             continue; // processTree cannot be tracked
           }
 
-          if (taskTracker.runningTasks.get(tid).wasKilled()) {
-            continue; // this task has been killed already
+          TaskInProgress tip = taskTracker.getRunningTask(tid);
+          if (tip == null) {
+            continue;
+          }
+          if (tip.wasKilled()) {
+            continue;
           }
 
           LOG.debug("Constructing ProcessTree for : PID = " + pId + " TID = "
@@ -514,6 +518,12 @@ class TaskMemoryManagerThread extends Th
    * @param msg diagnostics message
    */
   private void killTask(TaskAttemptID tid, String msg) {
+    TaskInProgress tip = taskTracker.getRunningTask(tid);
+    if (tip != null) {
+      //for the task identified to be killed update taskDiagnostic 
+      TaskStatus taskStatus = tip.getStatus();
+      taskStatus.setDiagnosticInfo(msg);
+    }
     // Kill the task and mark it as killed.
     taskTracker.cleanUpOverMemoryTask(tid, false, msg);
     // Now destroy the ProcessTree, remove it from monitoring map.
@@ -530,7 +540,7 @@ class TaskMemoryManagerThread extends Th
    * @return true if the task can be killed
    */
   private boolean isKillable(TaskAttemptID tid) {
-      TaskInProgress tip = taskTracker.runningTasks.get(tid);
+      TaskInProgress tip = taskTracker.getRunningTask(tid);
       return tip != null && !tip.wasKilled() &&
              (tip.getRunState() == TaskStatus.State.RUNNING ||
               tip.getRunState() == TaskStatus.State.COMMIT_PENDING);

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Aug 16 00:37:15 2011
@@ -66,7 +66,7 @@ public abstract class TaskStatus impleme
     
   private volatile Phase phase = Phase.STARTING; 
   private Counters counters;
-  private boolean includeCounters;
+  private boolean includeAllCounters;
   private SortedRanges.Range nextRecordRange = new SortedRanges.Range();
   
   // max task-status string size
@@ -100,7 +100,7 @@ public abstract class TaskStatus impleme
     this.taskTracker = taskTracker;
     this.phase = phase;
     this.counters = counters;
-    this.includeCounters = true;
+    this.includeAllCounters = true;
   }
   
   public TaskAttemptID getTaskID() { return taskid; }
@@ -311,12 +311,13 @@ public abstract class TaskStatus impleme
       this.runState == TaskStatus.State.KILLED_UNCLEAN));
   }
   
-  public boolean getIncludeCounters() {
-    return includeCounters; 
+  public boolean getIncludeAllCounters() {
+    return includeAllCounters;
   }
   
-  public void setIncludeCounters(boolean send) {
-    includeCounters = send;
+  public void setIncludeAllCounters(boolean send) {
+    includeAllCounters = send;
+    counters.setWriteAllCounters(send);
   }
   
   /**
@@ -465,11 +466,9 @@ public abstract class TaskStatus impleme
     WritableUtils.writeEnum(out, phase);
     out.writeLong(startTime);
     out.writeLong(finishTime);
-    out.writeBoolean(includeCounters);
+    out.writeBoolean(includeAllCounters);
     out.writeLong(outputSize);
-    if (includeCounters) {
-      counters.write(out);
-    }
+    counters.write(out);
     nextRecordRange.write(out);
   }
 
@@ -484,11 +483,9 @@ public abstract class TaskStatus impleme
     this.startTime = in.readLong(); 
     this.finishTime = in.readLong(); 
     counters = new Counters();
-    this.includeCounters = in.readBoolean();
+    this.includeAllCounters = in.readBoolean();
     this.outputSize = in.readLong();
-    if (includeCounters) {
-      counters.readFields(in);
-    }
+    counters.readFields(in);
     nextRecordRange.readFields(in);
   }
   

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Aug 16 00:37:15 2011
@@ -1617,13 +1617,13 @@ public class TaskTracker 
    */
   HeartbeatResponse transmitHeartBeat(long now) throws IOException {
     // Send Counters in the status once every COUNTER_UPDATE_INTERVAL
-    boolean sendCounters;
+    boolean sendAllCounters;
     if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
-      sendCounters = true;
+      sendAllCounters = true;
       previousUpdate = now;
     }
     else {
-      sendCounters = false;
+      sendAllCounters = false;
     }
 
     // 
@@ -1636,7 +1636,7 @@ public class TaskTracker 
         status = new TaskTrackerStatus(taskTrackerName, localHostname, 
                                        httpPort, 
                                        cloneAndResetRunningTaskStatuses(
-                                         sendCounters), 
+                                         sendAllCounters),
                                        failures, 
                                        maxMapSlots,
                                        maxReduceSlots); 
@@ -3521,10 +3521,10 @@ public class TaskTracker 
     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
     for(TaskInProgress tip: runningTasks.values()) {
       TaskStatus status = tip.getStatus();
-      status.setIncludeCounters(sendCounters);
+      status.setIncludeAllCounters(sendCounters);
       // send counters for finished or failed tasks and commit pending tasks
       if (status.getRunState() != TaskStatus.State.RUNNING) {
-        status.setIncludeCounters(true);
+        status.setIncludeAllCounters(true);
       }
       result.add((TaskStatus)status.clone());
       status.clearStatus();
@@ -4218,4 +4218,8 @@ public class TaskTracker 
     ACLsManager getACLsManager() {
       return aclsManager;
     }
+
+    synchronized TaskInProgress getRunningTask(TaskAttemptID tid) {
+      return runningTasks.get(tid);
+    }
 }

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/Counter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/Counter.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/Counter.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/Counter.java Tue Aug 16 00:37:15 2011
@@ -18,137 +18,58 @@
 
 package org.apache.hadoop.mapreduce;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 
 /**
  * A named counter that tracks the progress of a map/reduce job.
- * 
- * <p><code>Counters</code> represent global counters, defined either by the 
+ *
+ * <p><code>Counters</code> represent global counters, defined either by the
  * Map-Reduce framework or applications. Each <code>Counter</code> is named by
  * an {@link Enum} and has a long for the value.</p>
- * 
+ *
  * <p><code>Counters</code> are bunched into Groups, each comprising of
- * counters from a particular <code>Enum</code> class. 
+ * counters from a particular <code>Enum</code> class.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class Counter implements Writable {
+public interface Counter extends Writable {
 
-  private String name;
-  private String displayName;
-  private long value = 0;
-    
-  protected Counter() { 
-  }
-
-  protected Counter(String name, String displayName) {
-    this.name = name;
-    this.displayName = displayName;
-  }
-  
-  /** Create a counter.
-   * @param name the name within the group's enum.
-   * @param displayName a name to be displayed.
-   * @param value the counter value.
-   */
-  public Counter(String name, String displayName, long value) {
-    this.name = name;
-    this.displayName = displayName;
-    this.value = value;
-  }
-  
+  /**
+   * Set the display name of the counter
+   * @param displayName of the counter
+   * @deprecated (and no-op by default)
+   */
   @Deprecated
-  protected synchronized void setDisplayName(String displayName) {
-    this.displayName = displayName;
-  }
-    
-  /**
-   * Read the binary representation of the counter
-   */
-  @Override
-  public synchronized void readFields(DataInput in) throws IOException {
-    name = Text.readString(in);
-    if (in.readBoolean()) {
-      displayName = Text.readString(in);
-    } else {
-      displayName = name;
-    }
-    value = WritableUtils.readVLong(in);
-  }
-    
-  /**
-   * Write the binary representation of the counter
-   */
-  @Override
-  public synchronized void write(DataOutput out) throws IOException {
-    Text.writeString(out, name);
-    boolean distinctDisplayName = ! name.equals(displayName);
-    out.writeBoolean(distinctDisplayName);
-    if (distinctDisplayName) {
-      Text.writeString(out, displayName);
-    }
-    WritableUtils.writeVLong(out, value);
-  }
-
-  public synchronized String getName() {
-    return name;
-  }
+  void setDisplayName(String displayName);
 
   /**
-   * Get the name of the counter.
+   * @return the name of the counter
+   */
+  String getName();
+
+  /**
+   * Get the display name of the counter.
    * @return the user facing name of the counter
    */
-  public synchronized String getDisplayName() {
-    return displayName;
-  }
-    
+  String getDisplayName();
+
   /**
    * What is the current value of this counter?
    * @return the current value
    */
-  public synchronized long getValue() {
-    return value;
-  }
+  long getValue();
 
   /**
    * Set this counter by the given value
    * @param value the value to set
    */
-  public synchronized void setValue(long value) {
-    this.value = value;
-  }
+  void setValue(long value);
 
   /**
    * Increment this counter by the given value
    * @param incr the value to increase this counter by
    */
-  public synchronized void increment(long incr) {
-    value += incr;
-  }
-
-  @Override
-  public synchronized boolean equals(Object genericRight) {
-    if (genericRight instanceof Counter) {
-      synchronized (genericRight) {
-        Counter right = (Counter) genericRight;
-        return name.equals(right.name) && 
-               displayName.equals(right.displayName) &&
-               value == right.value;
-      }
-    }
-    return false;
-  }
-  
-  @Override
-  public synchronized int hashCode() {
-    return name.hashCode() + displayName.hashCode();
-  }
+  void increment(long incr);
 }

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/CounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/CounterGroup.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/CounterGroup.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/CounterGroup.java Tue Aug 16 00:37:15 2011
@@ -18,19 +18,9 @@
 
 package org.apache.hadoop.mapreduce;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.MissingResourceException;
-import java.util.ResourceBundle;
-import java.util.TreeMap;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
 
 /**
  * A group of {@link Counter}s that logically belong together. Typically,
@@ -38,156 +28,6 @@ import org.apache.hadoop.io.WritableUtil
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class CounterGroup implements Writable, Iterable<Counter> {
-  private String name;
-  private String displayName;
-  private TreeMap<String, Counter> counters = new TreeMap<String, Counter>();
-  // Optional ResourceBundle for localization of group and counter names.
-  private ResourceBundle bundle = null;    
-  
-  /**
-   * Returns the specified resource bundle, or throws an exception.
-   * @throws MissingResourceException if the bundle isn't found
-   */
-  private static ResourceBundle getResourceBundle(String enumClassName) {
-    String bundleName = enumClassName.replace('$','_');
-    return ResourceBundle.getBundle(bundleName);
-  }
-
-  protected CounterGroup(String name) {
-    this.name = name;
-    try {
-      bundle = getResourceBundle(name);
-    }
-    catch (MissingResourceException neverMind) {
-    }
-    displayName = localize("CounterGroupName", name);
-  }
-  
-  /** Create a CounterGroup.
-   * @param name the name of the group's enum.
-   * @param displayName a name to be displayed for the group.
-   */
-  public CounterGroup(String name, String displayName) {
-    this.name = name;
-    this.displayName = displayName;
-  }
- 
-  /**
-   * Get the internal name of the group
-   * @return the internal name
-   */
-  public synchronized String getName() {
-    return name;
-  }
-  
-  /**
-   * Get the display name of the group.
-   * @return the human readable name
-   */
-  public synchronized String getDisplayName() {
-    return displayName;
-  }
-
-  /** Add a counter to this group. */
-  public synchronized void addCounter(Counter counter) {
-    counters.put(counter.getName(), counter);
-  }
-
-  /**
-   * Find a counter in a group.
-   * @param counterName the name of the counter
-   * @param displayName the display name of the counter
-   * @return the counter that was found or added
-   */
-  public Counter findCounter(String counterName, String displayName) {
-    Counter result = counters.get(counterName);
-    if (result == null) {
-      result = new Counter(counterName, displayName);
-      counters.put(counterName, result);
-    }
-    return result;
-  }
-
-  public synchronized Counter findCounter(String counterName) {
-    Counter result = counters.get(counterName);
-    if (result == null) {
-      String displayName = localize(counterName, counterName);
-      result = new Counter(counterName, displayName);
-      counters.put(counterName, result);
-    }
-    return result;
-  }
-  
-  public synchronized Iterator<Counter> iterator() {
-    return counters.values().iterator();
-  }
-
-  public synchronized void write(DataOutput out) throws IOException {
-    Text.writeString(out, displayName);
-    WritableUtils.writeVInt(out, counters.size());
-    for(Counter counter: counters.values()) {
-      counter.write(out);
-    }
-  }
-  
-  public synchronized void readFields(DataInput in) throws IOException {
-    displayName = Text.readString(in);
-    counters.clear();
-    int size = WritableUtils.readVInt(in);
-    for(int i=0; i < size; i++) {
-      Counter counter = new Counter();
-      counter.readFields(in);
-      counters.put(counter.getName(), counter);
-    }
-  }
-
-  /**
-   * Looks up key in the ResourceBundle and returns the corresponding value.
-   * If the bundle or the key doesn't exist, returns the default value.
-   */
-  private String localize(String key, String defaultValue) {
-    String result = defaultValue;
-    if (bundle != null) {
-      try {
-        result = bundle.getString(key);
-      }
-      catch (MissingResourceException mre) {
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Returns the number of counters in this group.
-   */
-  public synchronized int size() {
-    return counters.size();
-  }
-
-  public synchronized boolean equals(Object genericRight) {
-    if (genericRight instanceof CounterGroup) {
-      Iterator<Counter> right = ((CounterGroup) genericRight).counters.
-                                       values().iterator();
-      Iterator<Counter> left = counters.values().iterator();
-      while (left.hasNext()) {
-        if (!right.hasNext() || !left.next().equals(right.next())) {
-          return false;
-        }
-      }
-      return !right.hasNext();
-    }
-    return false;
-  }
-
-  public synchronized int hashCode() {
-    return counters.hashCode();
-  }
-  
-  public synchronized void incrAllCounters(CounterGroup rightGroup) {
-    for(Counter right: rightGroup.counters.values()) {
-      Counter left = findCounter(right.getName(), right.getDisplayName());
-      left.increment(right.getValue());
-    }
-  }
+public interface CounterGroup extends CounterGroupBase<Counter> {
+  // essentially a typedef so user doesn't have to use generic syntax
 }

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/Counters.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/Counters.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/Counters.java Tue Aug 16 00:37:15 2011
@@ -17,200 +17,121 @@
  */
 package org.apache.hadoop.mapreduce;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.mapreduce.counters.Limits;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.apache.hadoop.mapreduce.counters.AbstractCounterGroup;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup;
+import org.apache.hadoop.mapreduce.counters.AbstractCounters;
+import org.apache.hadoop.mapreduce.counters.CounterGroupFactory;
+import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
+
+/**
+ * <p><code>Counters</code> holds per job/task counters, defined either by the
+ * Map-Reduce framework or applications. Each <code>Counter</code> can be of
+ * any {@link Enum} type.</p>
+ *
+ * <p><code>Counters</code> are bunched into {@link CounterGroup}s, each
+ * comprising of counters from a particular <code>Enum</code> class.
+ */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class Counters implements Writable,Iterable<CounterGroup> {
-  /**
-   * A cache from enum values to the associated counter. Dramatically speeds up
-   * typical usage.
-   */
-  private Map<Enum<?>, Counter> cache = new IdentityHashMap<Enum<?>, Counter>();
+public class Counters extends AbstractCounters<Counter, CounterGroup> {
 
-  private TreeMap<String, CounterGroup> groups = 
-      new TreeMap<String, CounterGroup>();
-  
-  public Counters() {
-  }
-  
-  /**
-   * Utility method to  create a Counters object from the 
-   * org.apache.hadoop.mapred counters
-   * @param counters
-   */
-  public Counters(org.apache.hadoop.mapred.Counters counters) {
-    for(org.apache.hadoop.mapred.Counters.Group group: counters) {
-      String name = group.getName();
-      CounterGroup newGroup = new CounterGroup(name, group.getDisplayName());
-      groups.put(name, newGroup);
-      for(Counter counter: group) {
-        newGroup.addCounter(counter);
-      }
+  // Mix framework group implementation into CounterGroup interface
+  private static class FrameworkGroupImpl<T extends Enum<T>>
+      extends FrameworkCounterGroup<T, Counter> implements CounterGroup {
+
+    FrameworkGroupImpl(Class<T> cls) {
+      super(cls);
     }
-  }
 
-  /** Add a group. */
-  public void addGroup(CounterGroup group) {
-    groups.put(group.getName(), group);
+    @Override
+    protected FrameworkCounter newCounter(T key) {
+      return new FrameworkCounter(key);
+    }
   }
 
-  public Counter findCounter(String groupName, String counterName) {
-    CounterGroup grp = getGroup(groupName);
-    return grp.findCounter(counterName);
-  }
+  // Mix generic group implementation into CounterGroup interface
+  // and provide some mandatory group factory methods.
+  private static class GenericGroup extends AbstractCounterGroup<Counter>
+      implements CounterGroup {
 
-  /**
-   * Find the counter for the given enum. The same enum will always return the
-   * same counter.
-   * @param key the counter key
-   * @return the matching counter object
-   */
-  public synchronized Counter findCounter(Enum<?> key) {
-    Counter counter = cache.get(key);
-    if (counter == null) {
-      counter = findCounter(key.getDeclaringClass().getName(), key.toString());
-      cache.put(key, counter);
+    GenericGroup(String name, String displayName, Limits limits) {
+      super(name, displayName, limits);
     }
-    return counter;    
-  }
 
-  /**
-   * Returns the names of all counter classes.
-   * @return Set of counter names.
-   */
-  public synchronized Collection<String> getGroupNames() {
-    return groups.keySet();
-  }
+    @Override
+    protected Counter newCounter(String name, String displayName, long value) {
+      return new GenericCounter(name, displayName, value);
+    }
 
-  @Override
-  public Iterator<CounterGroup> iterator() {
-    return groups.values().iterator();
+    @Override
+    protected Counter newCounter() {
+      return new GenericCounter();
+    }
   }
 
-  /**
-   * Returns the named counter group, or an empty group if there is none
-   * with the specified name.
-   */
-  public synchronized CounterGroup getGroup(String groupName) {
-    CounterGroup grp = groups.get(groupName);
-    if (grp == null) {
-      grp = new CounterGroup(groupName);
-      groups.put(groupName, grp);
+  // Mix file system group implementation into the CounterGroup interface
+  private static class FileSystemGroup extends FileSystemCounterGroup<Counter>
+      implements CounterGroup {
+
+    @Override
+    protected Counter newCounter(String scheme, FileSystemCounter key) {
+      return new FSCounter(scheme, key);
     }
-    return grp;
   }
 
   /**
-   * Returns the total number of counters, by summing the number of counters
-   * in each group.
+   * Provide factory methods for counter group factory implementation.
+   * See also the GroupFactory in
+   *  {@link org.apache.hadoop.mapred.Counters mapred.Counters}
    */
-  public synchronized  int countCounters() {
-    int result = 0;
-    for (CounterGroup group : this) {
-      result += group.size();
+  private static class GroupFactory
+      extends CounterGroupFactory<Counter, CounterGroup> {
+
+    @Override
+    protected <T extends Enum<T>>
+    FrameworkGroupFactory<CounterGroup>
+        newFrameworkGroupFactory(final Class<T> cls) {
+      return new FrameworkGroupFactory<CounterGroup>() {
+        @Override public CounterGroup newGroup(String name) {
+          return new FrameworkGroupImpl<T>(cls); // impl in this package
+        }
+      };
     }
-    return result;
-  }
 
-  /**
-   * Write the set of groups.
-   * The external format is:
-   *     #groups (groupName group)*
-   *
-   * i.e. the number of groups followed by 0 or more groups, where each 
-   * group is of the form:
-   *
-   *     groupDisplayName #counters (false | true counter)*
-   *
-   * where each counter is of the form:
-   *
-   *     name (false | true displayName) value
-   */
-  @Override
-  public synchronized void write(DataOutput out) throws IOException {
-    out.writeInt(groups.size());
-    for (org.apache.hadoop.mapreduce.CounterGroup group: groups.values()) {
-      Text.writeString(out, group.getName());
-      group.write(out);
+    @Override
+    protected CounterGroup newGenericGroup(String name, String displayName,
+                                           Limits limits) {
+      return new GenericGroup(name, displayName, limits);
     }
-  }
-  
-  /**
-   * Read a set of groups.
-   */
-  @Override
-  public synchronized void readFields(DataInput in) throws IOException {
-    int numClasses = in.readInt();
-    groups.clear();
-    while (numClasses-- > 0) {
-      String groupName = Text.readString(in);
-      CounterGroup group = new CounterGroup(groupName);
-      group.readFields(in);
-      groups.put(groupName, group);
+
+    @Override
+    protected CounterGroup newFileSystemGroup() {
+      return new FileSystemGroup();
     }
   }
 
+  private static final GroupFactory groupFactory = new GroupFactory();
+
   /**
-   * Return textual representation of the counter values.
+   * Default constructor
    */
-  public synchronized String toString() {
-    StringBuilder sb = new StringBuilder("Counters: " + countCounters());
-    for (CounterGroup group: this) {
-      sb.append("\n\t" + group.getDisplayName());
-      for (Counter counter: group) {
-        sb.append("\n\t\t" + counter.getDisplayName() + "=" + 
-                  counter.getValue());
-      }
-    }
-    return sb.toString();
+  public Counters() {
+    super(groupFactory);
   }
 
   /**
-   * Increments multiple counters by their amounts in another Counters 
-   * instance.
-   * @param other the other Counters instance
+   * Construct the Counters object from the another counters object
+   * @param <C> the type of counter
+   * @param <G> the type of counter group
+   * @param counters the old counters object
    */
-  public synchronized void incrAllCounters(Counters other) {
-    for(Map.Entry<String, CounterGroup> rightEntry: other.groups.entrySet()) {
-      CounterGroup left = groups.get(rightEntry.getKey());
-      CounterGroup right = rightEntry.getValue();
-      if (left == null) {
-        left = new CounterGroup(right.getName(), right.getDisplayName());
-        groups.put(rightEntry.getKey(), left);
-      }
-      left.incrAllCounters(right);
-    }
-  }
-
-  public boolean equals(Object genericRight) {
-    if (genericRight instanceof Counters) {
-      Iterator<CounterGroup> right = ((Counters) genericRight).groups.
-                                       values().iterator();
-      Iterator<CounterGroup> left = groups.values().iterator();
-      while (left.hasNext()) {
-        if (!right.hasNext() || !left.next().equals(right.next())) {
-          return false;
-        }
-      }
-      return !right.hasNext();
-    }
-    return false;
-  }
-  
-  public int hashCode() {
-    return groups.hashCode();
+  public <C extends Counter, G extends CounterGroupBase<C>>
+  Counters(AbstractCounters<C, G> counters) {
+    super(counters, groupFactory);
   }
 }

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/Job.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/Job.java Tue Aug 16 00:37:15 2011
@@ -122,7 +122,7 @@ public class Job extends JobContextImpl 
   private JobStatus status;
   private long statustime;
   private Cluster cluster;
-  
+
   @Deprecated
   public Job() throws IOException {
     this(new Configuration());
@@ -360,8 +360,11 @@ public class Job extends JobContextImpl 
   @Override
   public String toString() {
     ensureState(JobState.RUNNING);
+    String reasonforFailure = " ";
     try {
       updateStatus();
+      if (status.getState().equals(JobStatus.State.FAILED))
+        reasonforFailure = getTaskFailureEventString();
     } catch (IOException e) {
     } catch (InterruptedException ie) {
     }
@@ -378,10 +381,34 @@ public class Job extends JobContextImpl 
     sb.append(status.getState()).append("\n");
     sb.append("history URL: ");
     sb.append(status.getHistoryFile()).append("\n");
-    sb.append("retired: ").append(status.isRetired());
+    sb.append("retired: ").append(status.isRetired()).append("\n");
+    sb.append("reason for failure: ").append(reasonforFailure);
     return sb.toString();
   }
-      
+
+  /**
+   * @return taskid which caused job failure
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  String getTaskFailureEventString() throws IOException,
+      InterruptedException {
+    int failCount = 1;
+    TaskCompletionEvent lastEvent = null;
+    for (TaskCompletionEvent event : cluster.getClient().getTaskCompletionEvents(
+        status.getJobID(), 0, 10)) {
+      if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
+        failCount++;
+        lastEvent = event;
+      }
+    }
+    String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
+    String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
+    return (" task " + taskID + " failed " +
+      failCount + " times " + "For details check tasktracker at: " +
+      lastEvent.getTaskTrackerHttp());
+  }
+
   /**
    * Get the information of the current state of the tasks of a job.
    * 

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/JobCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/JobCounter.properties?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/JobCounter.properties (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/JobCounter.properties Tue Aug 16 00:37:15 2011
@@ -21,5 +21,7 @@ TOTAL_LAUNCHED_REDUCES.name=       Launc
 OTHER_LOCAL_MAPS.name=             Other local map tasks
 DATA_LOCAL_MAPS.name=              Data-local map tasks
 RACK_LOCAL_MAPS.name=              Rack-local map tasks
+SLOTS_MILLIS_MAPS.name=            Total time spent by all maps in occupied slots (ms)
+SLOTS_MILLIS_REDUCES.name=         Total time spent by all reduces in occupied slots (ms)
 FALLOW_SLOTS_MILLIS_MAPS.name=     Total time spent by all maps waiting after reserving slots (ms)
 FALLOW_SLOTS_MILLIS_REDUCES.name=  Total time spent by all reduces waiting after reserving slots (ms)

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java Tue Aug 16 00:37:15 2011
@@ -59,4 +59,6 @@ public interface MRConfig {
     7*24*60*60*1000; // 7 days
   
   public static final String FRAMEWORK_NAME  = "mapreduce.framework.name";
+  public static final String TASK_LOCAL_OUTPUT_CLASS =
+  "mapreduce.task.local.output.class";
 }

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Aug 16 00:37:15 2011
@@ -275,4 +275,16 @@ public interface MRJobConfig {
     "mapreduce.job.submithostname";
   public static final String JOB_SUBMITHOSTADDR =
     "mapreduce.job.submithostaddress";
+
+  public static final String COUNTERS_MAX_KEY = "mapreduce.job.counters.max";
+  public static final int COUNTERS_MAX_DEFAULT = 120;
+
+  public static final String COUNTER_GROUP_NAME_MAX_KEY = "mapreduce.job.counters.group.name.max";
+  public static final int COUNTER_GROUP_NAME_MAX_DEFAULT = 128;
+
+  public static final String COUNTER_NAME_MAX_KEY = "mapreduce.job.counters.counter.name.max";
+  public static final int COUNTER_NAME_MAX_DEFAULT = 64;
+
+  public static final String COUNTER_GROUPS_MAX_KEY = "mapreduce.job.counters.groups.max";
+  public static final int COUNTER_GROUPS_MAX_DEFAULT = 50;
 }

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties Tue Aug 16 00:37:15 2011
@@ -27,9 +27,13 @@ REDUCE_INPUT_RECORDS.name=     Reduce in
 REDUCE_OUTPUT_RECORDS.name=    Reduce output records
 REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
 REDUCE_SKIPPED_GROUPS.name=    Reduce skipped groups
+SPLIT_RAW_BYTES.name=          Input split bytes
 SPILLED_RECORDS.name=          Spilled Records
 SHUFFLED_MAPS.name=            Shuffled Maps 
 FAILED_SHUFFLE.name=           Failed Shuffles
 MERGED_MAP_OUTPUTS.name=       Merged Map outputs
 GC_TIME_MILLIS.name=           GC time elapsed (ms)
 COMMITTED_HEAP_BYTES.name=     Total committed heap usage (bytes)
+CPU_MILLISECONDS.name=         CPU time spent (ms)
+PHYSICAL_MEMORY_BYTES.name=    Physical memory (bytes) snapshot
+VIRTUAL_MEMORY_BYTES.name=     Virtual memory (bytes) snapshot

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java Tue Aug 16 00:37:15 2011
@@ -22,18 +22,15 @@ import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.EOFException;
-import java.io.StringBufferInputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
 
 import org.apache.avro.Schema;
-import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.JsonDecoder;
 import org.apache.avro.io.DatumReader;
@@ -171,13 +168,10 @@ public class EventReader implements Clos
     Counters result = new Counters();
     for (JhCounterGroup g : counters.groups) {
       CounterGroup group =
-        new CounterGroup(g.name.toString(), g.displayName.toString());
+          result.addGroup(g.name.toString(), g.displayName.toString());
       for (JhCounter c : g.counts) {
-        group.addCounter(new Counter(c.name.toString(),
-                                     c.displayName.toString(),
-                                     c.value));
+        group.addCounter(c.name.toString(), c.displayName.toString(), c.value);
       }
-      result.addGroup(group);
     }
     return result;
   }

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Tue Aug 16 00:37:15 2011
@@ -125,7 +125,11 @@
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
           {"name": "state", "type": "string"},
-          {"name": "counters", "type": "JhCounters"}
+          {"name": "counters", "type": "JhCounters"},
+          {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
+          {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
+          {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
+          {"name": "physMemKbytes", "type": { "type": "array", "items": "int"}}
       ]
      },
 
@@ -140,7 +144,11 @@
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
           {"name": "state", "type": "string"},
-          {"name": "counters", "type": "JhCounters"}
+          {"name": "counters", "type": "JhCounters"},
+          {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
+          {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
+          {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
+          {"name": "physMemKbytes", "type": { "type": "array", "items": "int"}}
       ]
      },
 
@@ -176,7 +184,11 @@
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
           {"name": "status", "type": "string"},
-          {"name": "error", "type": "string"}
+          {"name": "error", "type": "string"},
+          {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
+          {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
+          {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
+          {"name": "physMemKbytes", "type": { "type": "array", "items": "int"}}
       ]
      },
 

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Tue Aug 16 00:37:15 2011
@@ -262,6 +262,8 @@ public class JobHistoryParser {
     taskInfo.finishTime = event.getFinishTime();
     taskInfo.error = event.getError();
     taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
+    info.errorInfo = "Task " + taskInfo.taskId +" failed " +
+    taskInfo.attemptsMap.size() + " times ";
   }
 
   private void handleTaskStartedEvent(TaskStartedEvent event) {
@@ -321,6 +323,7 @@ public class JobHistoryParser {
    * The class where job information is aggregated into after parsing
    */
   public static class JobInfo {
+    String errorInfo = "None";
     long submitTime;
     long finishTime;
     JobID jobid;
@@ -406,6 +409,7 @@ public class JobHistoryParser {
     public long getFinishedReduces() { return finishedReduces; }
     /** Get the job status */
     public String getJobStatus() { return jobStatus; }
+    public String getErrorInfo() { return errorInfo; }
     /** Get the counters for the job */
     public Counters getTotalCounters() { return totalCounters; }
     /** Get the map counters for the job */

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java Tue Aug 16 00:37:15 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
 
 import org.apache.avro.util.Utf8;
 
@@ -48,11 +49,19 @@ public class MapAttemptFinishedEvent  im
    * @param hostname Name of the host where the map executed
    * @param state State string for the attempt
    * @param counters Counters for the attempt
+   * @param allSplits the "splits", or a pixelated graph of various
+   *        measurable worker node state variables against progress.
+   *        Currently there are four; wallclock time, CPU time,
+   *        virtual memory and physical memory. 
+   *
+   *        If you have no splits data, code {@code null} for this
+   *        parameter. 
    */
-  public MapAttemptFinishedEvent(TaskAttemptID id, 
-      TaskType taskType, String taskStatus, 
-      long mapFinishTime, long finishTime,
-      String hostname, String state, Counters counters) {
+  public MapAttemptFinishedEvent
+      (TaskAttemptID id, TaskType taskType, String taskStatus, 
+       long mapFinishTime, long finishTime, String hostname,
+       String state, Counters counters,
+       int[][] allSplits) {
     datum.taskid = new Utf8(id.getTaskID().toString());
     datum.attemptId = new Utf8(id.toString());
     datum.taskType = new Utf8(taskType.name());
@@ -62,8 +71,46 @@ public class MapAttemptFinishedEvent  im
     datum.hostname = new Utf8(hostname);
     datum.state = new Utf8(state);
     datum.counters = EventWriter.toAvro(counters);
+
+    datum.clockSplits
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
+    datum.cpuUsages 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
+    datum.vMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
+    datum.physMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+  }
+
+  /** 
+   * @deprecated please use the constructor with an additional
+   *              argument, an array of splits arrays instead.  See
+   *              {@link org.apache.hadoop.mapred.ProgressSplitsBlock}
+   *              for an explanation of the meaning of that parameter.
+   *
+   * Create an event for successful completion of map attempts
+   * @param id Task Attempt ID
+   * @param taskType Type of the task
+   * @param taskStatus Status of the task
+   * @param mapFinishTime Finish time of the map phase
+   * @param finishTime Finish time of the attempt
+   * @param hostname Name of the host where the map executed
+   * @param state State string for the attempt
+   * @param counters Counters for the attempt
+   */
+  @Deprecated
+  public MapAttemptFinishedEvent
+      (TaskAttemptID id, TaskType taskType, String taskStatus, 
+       long mapFinishTime, long finishTime, String hostname,
+       String state, Counters counters) {
+    this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, state, counters, null);
   }
   
+  
   MapAttemptFinishedEvent() {}
 
   public Object getDatum() { return datum; }
@@ -97,5 +144,18 @@ public class MapAttemptFinishedEvent  im
    public EventType getEventType() {
     return EventType.MAP_ATTEMPT_FINISHED;
   }
+
+  public int[] getClockSplits() {
+    return AvroArrayUtils.fromAvro(datum.clockSplits);
+  }
+  public int[] getCpuUsages() {
+    return AvroArrayUtils.fromAvro(datum.cpuUsages);
+  }
+  public int[] getVMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+  }
+  public int[] getPhysMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+  }
   
 }

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java Tue Aug 16 00:37:15 2011
@@ -27,6 +27,8 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
+
 import org.apache.avro.util.Utf8;
 
 /**
@@ -50,12 +52,16 @@ public class ReduceAttemptFinishedEvent 
    * @param hostname Name of the host where the attempt executed
    * @param state State of the attempt
    * @param counters Counters for the attempt
+   * @param allSplits the "splits", or a pixelated graph of various
+   *        measurable worker node state variables against progress.
+   *        Currently there are four; wallclock time, CPU time,
+   *        virtual memory and physical memory.  
    */
-  public ReduceAttemptFinishedEvent(TaskAttemptID id, 
-      TaskType taskType, String taskStatus, 
-      long shuffleFinishTime, long sortFinishTime, 
-      long finishTime,
-      String hostname, String state, Counters counters) {
+  public ReduceAttemptFinishedEvent
+    (TaskAttemptID id, TaskType taskType, String taskStatus, 
+     long shuffleFinishTime, long sortFinishTime, long finishTime,
+     String hostname, String state, Counters counters,
+     int[][] allSplits) {
     datum.taskid = new Utf8(id.getTaskID().toString());
     datum.attemptId = new Utf8(id.toString());
     datum.taskType = new Utf8(taskType.name());
@@ -66,6 +72,45 @@ public class ReduceAttemptFinishedEvent 
     datum.hostname = new Utf8(hostname);
     datum.state = new Utf8(state);
     datum.counters = EventWriter.toAvro(counters);
+
+    datum.clockSplits 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
+    datum.cpuUsages 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
+    datum.vMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
+    datum.physMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+  }
+
+  /**
+   * @deprecated please use the constructor with an additional
+   *              argument, an array of splits arrays instead.  See
+   *              {@link org.apache.hadoop.mapred.ProgressSplitsBlock}
+   *              for an explanation of the meaning of that parameter.
+   *
+   * Create an event to record completion of a reduce attempt
+   * @param id Attempt Id
+   * @param taskType Type of task
+   * @param taskStatus Status of the task
+   * @param shuffleFinishTime Finish time of the shuffle phase
+   * @param sortFinishTime Finish time of the sort phase
+   * @param finishTime Finish time of the attempt
+   * @param hostname Name of the host where the attempt executed
+   * @param state State of the attempt
+   * @param counters Counters for the attempt
+   */
+  public ReduceAttemptFinishedEvent
+    (TaskAttemptID id, TaskType taskType, String taskStatus, 
+     long shuffleFinishTime, long sortFinishTime, long finishTime,
+     String hostname, String state, Counters counters) {
+    this(id, taskType, taskStatus,
+         shuffleFinishTime, sortFinishTime, finishTime,
+         hostname, state, counters, null);
   }
 
   ReduceAttemptFinishedEvent() {}
@@ -105,4 +150,17 @@ public class ReduceAttemptFinishedEvent 
   }
 
 
+  public int[] getClockSplits() {
+    return AvroArrayUtils.fromAvro(datum.clockSplits);
+  }
+  public int[] getCpuUsages() {
+    return AvroArrayUtils.fromAvro(datum.cpuUsages);
+  }
+  public int[] getVMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+  }
+  public int[] getPhysMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+  }
+
 }

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java Tue Aug 16 00:37:15 2011
@@ -27,6 +27,9 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
+import org.apache.hadoop.mapred.TaskStatus;
+
 import org.apache.avro.util.Utf8;
 
 /**
@@ -47,11 +50,16 @@ public class TaskAttemptUnsuccessfulComp
    * @param finishTime Finish time of the attempt
    * @param hostname Name of the host where the attempt executed
    * @param error Error string
+   * @param allSplits the "splits", or a pixelated graph of various
+   *        measurable worker node state variables against progress.
+   *        Currently there are four; wallclock time, CPU time,
+   *        virtual memory and physical memory.  
    */
-  public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id, 
-      TaskType taskType,
-      String status, long finishTime, 
-      String hostname, String error) {
+  public TaskAttemptUnsuccessfulCompletionEvent
+       (TaskAttemptID id, TaskType taskType,
+        String status, long finishTime, 
+        String hostname, String error,
+        int[][] allSplits) {
     datum.taskid = new Utf8(id.getTaskID().toString());
     datum.taskType = new Utf8(taskType.name());
     datum.attemptId = new Utf8(id.toString());
@@ -59,6 +67,40 @@ public class TaskAttemptUnsuccessfulComp
     datum.hostname = new Utf8(hostname);
     datum.error = new Utf8(error);
     datum.status = new Utf8(status);
+
+    datum.clockSplits 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
+    datum.cpuUsages 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
+    datum.vMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
+    datum.physMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+  }
+
+  /** 
+   * @deprecated please use the constructor with an additional
+   *              argument, an array of splits arrays instead.  See
+   *              {@link org.apache.hadoop.mapred.ProgressSplitsBlock}
+   *              for an explanation of the meaning of that parameter.
+   *
+   * Create an event to record the unsuccessful completion of attempts
+   * @param id Attempt ID
+   * @param taskType Type of the task
+   * @param status Status of the attempt
+   * @param finishTime Finish time of the attempt
+   * @param hostname Name of the host where the attempt executed
+   * @param error Error string
+   */
+  public TaskAttemptUnsuccessfulCompletionEvent
+       (TaskAttemptID id, TaskType taskType,
+        String status, long finishTime, 
+        String hostname, String error) {
+    this(id, taskType, status, finishTime, hostname, error, null);
   }
 
   TaskAttemptUnsuccessfulCompletionEvent() {}
@@ -101,4 +143,19 @@ public class TaskAttemptUnsuccessfulComp
               : EventType.REDUCE_ATTEMPT_KILLED);
   }
 
+
+
+  public int[] getClockSplits() {
+    return AvroArrayUtils.fromAvro(datum.clockSplits);
+  }
+  public int[] getCpuUsages() {
+    return AvroArrayUtils.fromAvro(datum.cpuUsages);
+  }
+  public int[] getVMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+  }
+  public int[] getPhysMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+  }
+
 }

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java Tue Aug 16 00:37:15 2011
@@ -67,7 +67,7 @@ public class TaskFinishedEvent implement
   /** Get the task finish time */
   public long getFinishTime() { return datum.finishTime; }
   /** Get task counters */
-  Counters getCounters() { return EventReader.fromAvro(datum.counters); }
+  public Counters getCounters() { return EventReader.fromAvro(datum.counters); }
   /** Get task type */
   public TaskType getTaskType() {
     return TaskType.valueOf(datum.taskType.toString());

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Tue Aug 16 00:37:15 2011
@@ -114,8 +114,10 @@ public interface ClientProtocol extends 
    *             MAPREDUCE-1664.
    * Version 36: Added the method getJobTrackerStatus() as part of
    *             MAPREDUCE-2337.
+   * Version 37: More efficient serialization format for framework counters
+   *             (MAPREDUCE-901)
    */
-  public static final long versionID = 36L;
+  public static final long versionID = 37L;
 
   /**
    * Allocate a name for the job.

Modified: hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1158072&r1=1158071&r2=1158072&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/HDFS-1623/mapreduce/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Tue Aug 16 00:37:15 2011
@@ -19,7 +19,9 @@
 package org.apache.hadoop.mapreduce.security;
 
 import java.io.IOException;
-import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,16 +30,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.KerberosName;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -92,6 +91,13 @@ public class TokenCache {
     }
   }
   
+  static String getJTPrincipal(Configuration conf) throws IOException {
+    String jtHostname = JobTracker.getAddress(conf).getHostName();
+    // get jobtracker principal for use as delegation token renewer
+    return SecurityUtil.getServerPrincipal(conf.get(JTConfig.JT_USER_NAME),
+        jtHostname);
+  }
+  
   /**
    * get delegation token for a specific FS
    * @param fs
@@ -102,12 +108,11 @@ public class TokenCache {
    */
   static void obtainTokensForNamenodesInternal(FileSystem fs, 
       Credentials credentials, Configuration conf) throws IOException {
-
-    // get jobtracker principal id (for the renewer)
-    KerberosName jtKrbName = 
-      new KerberosName(conf.get(JTConfig.JT_USER_NAME,""));
-    
-    String delegTokenRenewer = jtKrbName.getShortName();
+    String delegTokenRenewer = getJTPrincipal(conf);
+    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+      throw new IOException(
+          "Can't get JobTracker Kerberos principal for use as renewer");
+    }
     boolean readFile = true;
 
     String fsName = fs.getCanonicalServiceName();
@@ -133,6 +138,16 @@ public class TokenCache {
           return;
         }
       }
+      List<Token<?>> tokens = fs.getDelegationTokens(delegTokenRenewer);
+      if (tokens != null) {
+        for (Token<?> token : tokens) {
+          credentials.addToken(token.getService(), token);
+          LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName + 
+              ";t.service="+token.getService());
+        }
+      }
+      //Call getDelegationToken as well for now - for FS implementations
+      // which may not have implmented getDelegationTokens (hftp)
       Token<?> token = fs.getDelegationToken(delegTokenRenewer);
       if (token != null) {
         Text fsNameText = new Text(fsName);



Mime
View raw message