hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r903227 [11/16] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ conf/ ivy/ src/benchmarks/gridmix/ src/benchmarks/gridmix/javasort/ src/benchmarks/gridmix/maxent/ src/benchmarks/gridmix/monsterQuery/ src/benchmarks/gri...
Date Tue, 26 Jan 2010 14:03:09 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java Tue Jan 26 14:02:53 2010
@@ -19,6 +19,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.Queue.QueueOperation;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
@@ -87,9 +88,30 @@
     
   }
 
-  QueueConfigurationParser(String file) {
+  QueueConfigurationParser(String confFile) {
+    File file = new File(confFile).getAbsoluteFile();
+    if (!file.exists()) {
+      throw new RuntimeException("Configuration file not found at " +
+                                 confFile);
+    }
+    InputStream in = null;
+    try {
+      in = new BufferedInputStream(new FileInputStream(file));
+      loadFrom(in);
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+
+  QueueConfigurationParser(InputStream xmlInput) {
+    loadFrom(xmlInput);
+  }
+
+  private void loadFrom(InputStream xmlInput) {
     try {
-      this.root = loadResource(file);
+      this.root = loadResource(xmlInput);
     } catch (ParserConfigurationException e) {
       throw new RuntimeException(e);
     } catch (SAXException e) {
@@ -120,13 +142,13 @@
    * Method to load the resource file.
    * generates the root.
    * 
-   * @param confFile
+   * @param resourceInput InputStream that provides the XML to parse
    * @return
    * @throws ParserConfigurationException
    * @throws SAXException
    * @throws IOException
    */
-  protected Queue loadResource(String confFile)
+  protected Queue loadResource(InputStream resourceInput)
     throws ParserConfigurationException, SAXException, IOException {
     DocumentBuilderFactory docBuilderFactory
       = DocumentBuilderFactory.newInstance();
@@ -146,19 +168,8 @@
     DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
     Document doc = null;
     Element queuesNode = null;
-    File file = new File(confFile).getAbsoluteFile();
-    if (file.exists()) {
-      InputStream in = new BufferedInputStream(new FileInputStream(file));
-      try {
-        doc = builder.parse(in);
-      } finally {
-        in.close();
-      }
-    }
 
-    if (doc == null) {
-      throw new RuntimeException(file.getAbsolutePath() + " not found");
-    }
+    doc = builder.parse(resourceInput);
     queuesNode = doc.getDocumentElement();
     return this.parseResource(queuesNode);
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java Tue Jan 26 14:02:53 2010
@@ -22,6 +22,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.TaskScheduler.QueueRefresher;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
@@ -31,6 +32,8 @@
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.JsonGenerator;
 
+import java.io.BufferedInputStream;
+import java.io.InputStream;
 import java.io.IOException;
 import java.io.Writer;
 import java.util.ArrayList;
@@ -87,6 +90,7 @@
   private Map<String, Queue> leafQueues = new HashMap<String,Queue>();
   private Map<String, Queue> allQueues = new HashMap<String, Queue>();
   static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
+  static final String QUEUE_CONF_DEFAULT_FILE_NAME = "mapred-queues-default.xml";
 
   // Prefix in configuration for queue related keys
   static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
@@ -120,10 +124,24 @@
       }
       return new DeprecatedQueueConfigurationParser(conf);
     } else {
-      URL filePath =
+      URL xmlInUrl =
         Thread.currentThread().getContextClassLoader()
           .getResource(QUEUE_CONF_FILE_NAME);
-      return new QueueConfigurationParser(filePath.getPath());
+      if (xmlInUrl == null) {
+        xmlInUrl = Thread.currentThread().getContextClassLoader()
+          .getResource(QUEUE_CONF_DEFAULT_FILE_NAME);
+        assert xmlInUrl != null; // this should be in our jar
+      }
+      InputStream stream = null;
+      try {
+        stream = xmlInUrl.openStream();
+        return new QueueConfigurationParser(new BufferedInputStream(stream));
+      } catch (IOException ioe) {
+        throw new RuntimeException("Couldn't open queue configuration at " +
+                                   xmlInUrl, ioe);
+      } finally {
+        IOUtils.closeStream(stream);
+      }
     }
   }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Jan 26 14:02:53 2010
@@ -45,6 +45,7 @@
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
 import org.apache.hadoop.util.Progress;
@@ -77,19 +78,19 @@
   private Progress sortPhase;
   private Progress reducePhase;
   private Counters.Counter shuffledMapsCounter = 
-    getCounters().findCounter(Counter.SHUFFLED_MAPS);
+    getCounters().findCounter(TaskCounter.SHUFFLED_MAPS);
   private Counters.Counter reduceShuffleBytes = 
-    getCounters().findCounter(Counter.REDUCE_SHUFFLE_BYTES);
+    getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
   private Counters.Counter reduceInputKeyCounter = 
-    getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
+    getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
   private Counters.Counter reduceInputValueCounter = 
-    getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS);
+    getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
   private Counters.Counter reduceOutputCounter = 
-    getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS);
+    getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
   private Counters.Counter reduceCombineInputCounter =
-    getCounters().findCounter(Counter.COMBINE_INPUT_RECORDS);
+    getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
   private Counters.Counter reduceCombineOutputCounter =
-    getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS);
+    getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
 
   // A custom comparator for map output files. Here the ordering is determined
   // by the file's size and path. In case of files with same size and different
@@ -237,9 +238,9 @@
        super(in, comparator, keyClass, valClass, conf, reporter);
        this.umbilical = umbilical;
        this.skipGroupCounter = 
-         reporter.getCounter(Counter.REDUCE_SKIPPED_GROUPS);
+         reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
        this.skipRecCounter = 
-         reporter.getCounter(Counter.REDUCE_SKIPPED_RECORDS);
+         reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
        this.toWriteSkipRecs = toWriteSkipRecs() &&  
          SkipBadRecords.getSkipOutputPath(conf)!=null;
        this.keyClass = keyClass;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java Tue Jan 26 14:02:53 2010
@@ -28,6 +28,8 @@
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.crypto.SecretKey;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
@@ -44,13 +46,12 @@
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
-import org.apache.hadoop.mapreduce.security.JobTokens;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
@@ -68,27 +69,6 @@
   private static final Log LOG =
     LogFactory.getLog(Task.class);
 
-  // Counters used by Task subclasses
-  protected static enum Counter { 
-    MAP_INPUT_RECORDS, 
-    MAP_OUTPUT_RECORDS,
-    MAP_SKIPPED_RECORDS,
-    MAP_INPUT_BYTES, 
-    MAP_OUTPUT_BYTES,
-    COMBINE_INPUT_RECORDS,
-    COMBINE_OUTPUT_RECORDS,
-    REDUCE_INPUT_GROUPS,
-    REDUCE_SHUFFLE_BYTES,
-    REDUCE_INPUT_RECORDS,
-    REDUCE_OUTPUT_RECORDS,
-    REDUCE_SKIPPED_GROUPS,
-    REDUCE_SKIPPED_RECORDS,
-    SPILLED_RECORDS,
-    FAILED_SHUFFLE,
-    SHUFFLED_MAPS,
-    MERGED_MAP_OUTPUTS,
-  }
-  
   public static String MERGED_OUTPUT_PREFIX = ".merged";
   
 
@@ -160,7 +140,7 @@
   protected final Counters.Counter mergedMapOutputsCounter;
   private int numSlotsRequired;
   protected TaskUmbilicalProtocol umbilical;
-  protected JobTokens jobTokens=null; // storage of the secret keys
+  protected SecretKey tokenSecret;
 
   ////////////////////////////////////////////
   // Constructors
@@ -169,9 +149,12 @@
   public Task() {
     taskStatus = TaskStatus.createTaskStatus(isMapTask());
     taskId = new TaskAttemptID();
-    spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
-    failedShuffleCounter = counters.findCounter(Counter.FAILED_SHUFFLE);
-    mergedMapOutputsCounter = counters.findCounter(Counter.MERGED_MAP_OUTPUTS);
+    spilledRecordsCounter = 
+      counters.findCounter(TaskCounter.SPILLED_RECORDS);
+    failedShuffleCounter = 
+      counters.findCounter(TaskCounter.FAILED_SHUFFLE);
+    mergedMapOutputsCounter = 
+      counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
   }
 
   public Task(String jobFile, TaskAttemptID taskId, int partition, 
@@ -190,8 +173,9 @@
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
     spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
-    failedShuffleCounter = counters.findCounter(Counter.FAILED_SHUFFLE);
-    mergedMapOutputsCounter = counters.findCounter(Counter.MERGED_MAP_OUTPUTS);
+    failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE);
+    mergedMapOutputsCounter = 
+      counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
   }
 
   ////////////////////////////////////////////
@@ -215,19 +199,19 @@
   }
 
   /**
-   * set JobToken storage 
-   * @param jt
+   * Set the job token secret 
+   * @param tokenSecret the secret
    */
-  public void setJobTokens(JobTokens jt) {
-    this.jobTokens = jt;
+  public void setJobTokenSecret(SecretKey tokenSecret) {
+    this.tokenSecret = tokenSecret;
   }
 
   /**
-   * get JobToken storage
-   * @return storage object
+   * Get the job token secret
+   * @return the token secret
    */
-  public JobTokens getJobTokens() {
-    return this.jobTokens;
+  public SecretKey getJobTokenSecret() {
+    return this.tokenSecret;
   }
 
   
@@ -938,8 +922,22 @@
                             + JobStatus.State.FAILED + " or "
                             + JobStatus.State.KILLED);
     }
+    
+    // delete the staging area for the job
+    JobConf conf = new JobConf(jobContext.getConfiguration());
+    if (!supportIsolationRunner(conf)) {
+      String jobTempDir = conf.get("mapreduce.job.dir");
+      Path jobTempDirPath = new Path(jobTempDir);
+      FileSystem fs = jobTempDirPath.getFileSystem(conf);
+      fs.delete(jobTempDirPath, true);
+    }
     done(umbilical, reporter);
   }
+  
+  protected boolean supportIsolationRunner(JobConf conf) {
+    return (conf.getKeepTaskFilesPattern() != null || conf
+        .getKeepFailedTaskFiles());
+  }
 
   protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
                              TaskReporter reporter

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java Tue Jan 26 14:02:53 2010
@@ -25,6 +25,9 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.MRConfig;
@@ -69,12 +72,10 @@
    * disks:
    * <ul>
    * <li>mapreduce.cluster.local.directories</li>
-   * <li>Job cache directories</li>
-   * <li>Archive directories</li>
    * <li>Hadoop log directories</li>
    * </ul>
    */
-  void setup() {
+  public void setup() {
     for (String localDir : this.mapredLocalDirs) {
       // Set up the mapreduce.cluster.local.directories.
       File mapredlocalDir = new File(localDir);
@@ -108,13 +109,13 @@
 
   /**
    * Take task-controller specific actions to initialize the distributed cache
-   * files. This involves setting appropriate permissions for these files so as
+   * file. This involves setting appropriate permissions for these files so as
    * to secure them to be accessible only their owners.
    * 
    * @param context
    * @throws IOException
    */
-  public abstract void initializeDistributedCache(InitializationContext context)
+  public abstract void initializeDistributedCacheFile(DistributedCacheFileContext context)
       throws IOException;
 
   /**
@@ -134,26 +135,26 @@
 
   /**
    * Top level cleanup a task JVM method.
-   *
-   * The current implementation does the following.
    * <ol>
-   * <li>Sends a graceful terminate signal to task JVM allowing its sub-process
+   * <li>Sends a graceful termiante signal to task JVM to allow subprocesses
    * to cleanup.</li>
-   * <li>Waits for stipulated period</li>
    * <li>Sends a forceful kill signal to task JVM, terminating all its
-   * sub-process forcefully.</li>
+   * sub-processes forcefully.</li>
    * </ol>
-   * 
+   *
    * @param context the task for which kill signal has to be sent.
    */
   final void destroyTaskJVM(TaskControllerContext context) {
+    // Send SIGTERM to try to ask for a polite exit.
     terminateTask(context);
+
     try {
       Thread.sleep(context.sleeptimeBeforeSigkill);
     } catch (InterruptedException e) {
-      LOG.warn("Sleep interrupted : " + 
+      LOG.warn("Sleep interrupted : " +
           StringUtils.stringifyException(e));
     }
+
     killTask(context);
   }
 
@@ -191,12 +192,104 @@
   }
 
   /**
+   * Contains info related to the path of the file/dir to be deleted. This info
+   * is needed by task-controller to build the full path of the file/dir
+   */
+  static class TaskControllerPathDeletionContext extends PathDeletionContext {
+    Task task;
+    boolean isWorkDir;
+    TaskController taskController;
+
+    /**
+     * mapredLocalDir is the base dir under which to-be-deleted taskWorkDir or
+     * taskAttemptDir exists. fullPath of taskAttemptDir or taskWorkDir
+     * is built using mapredLocalDir, jobId, taskId, etc.
+     */
+    Path mapredLocalDir;
+
+    public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
+        Task task, boolean isWorkDir, TaskController taskController) {
+      super(fs, null);
+      this.task = task;
+      this.isWorkDir = isWorkDir;
+      this.taskController = taskController;
+      this.mapredLocalDir = mapredLocalDir;
+    }
+
+    @Override
+    protected String getPathForCleanup() {
+      if (fullPath == null) {
+        fullPath = buildPathForDeletion();
+      }
+      return fullPath;
+    }
+
+    /**
+     * Builds the path of taskAttemptDir OR taskWorkDir based on
+     * mapredLocalDir, jobId, taskId, etc
+     */
+    String buildPathForDeletion() {
+      String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(),
+          task.getJobID().toString(), task.getTaskID().toString(),
+          task.isTaskCleanupTask())
+        : TaskTracker.getLocalTaskDir(task.getUser(),
+          task.getJobID().toString(), task.getTaskID().toString(),
+          task.isTaskCleanupTask());
+
+      return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + subDir;
+    }
+
+    /**
+     * Makes the path(and its subdirectories recursively) fully deletable by
+     * setting proper permissions(770) by task-controller
+     */
+    @Override
+    protected void enablePathForCleanup() throws IOException {
+      getPathForCleanup();// allow init of fullPath, if not inited already
+      if (fs.exists(new Path(fullPath))) {
+        taskController.enableTaskForCleanup(this);
+      }
+    }
+  }
+
+  /**
    * NOTE: This class is internal only class and not intended for users!!
    * 
    */
   public static class InitializationContext {
     public File workDir;
     public String user;
+    
+    public InitializationContext() {
+    }
+    
+    public InitializationContext(String user, File workDir) {
+      this.user = user;
+      this.workDir = workDir;
+    }
+  }
+  
+  /**
+   * This is used for initializing the private localized files in distributed
+   * cache. Initialization would involve changing permission, ownership and etc.
+   */
+  public static class DistributedCacheFileContext extends InitializationContext {
+    // base directory under which file has been localized
+    Path localizedBaseDir;
+    // the unique string used to construct the localized path
+    String uniqueString;
+
+    public DistributedCacheFileContext(String user, File workDir,
+        Path localizedBaseDir, String uniqueString) {
+      super(user, workDir);
+      this.localizedBaseDir = localizedBaseDir;
+      this.uniqueString = uniqueString;
+    }
+
+    public Path getLocalizedUniqueDir() {
+      return new Path(localizedBaseDir, new Path(TaskTracker
+          .getPrivateDistributedCacheDir(user), uniqueString));
+    }
   }
 
   static class JobInitializationContext extends InitializationContext {
@@ -224,6 +317,15 @@
    */
   abstract void killTask(TaskControllerContext context);
 
+
+  /**
+   * Sends a QUIT signal to direct the task JVM (and sub-processes) to
+   * dump their stack to stdout.
+   *
+   * @param context task context.
+   */
+  abstract void dumpTaskStack(TaskControllerContext context);
+
   /**
    * Initialize user on this TaskTracer in a TaskController specific manner.
    * 
@@ -242,4 +344,11 @@
   abstract void runDebugScript(DebugScriptContext context) 
       throws IOException;
   
+  /**
+   * Enable the task for cleanup by changing permissions of the path
+   * @param context   path deletion context
+   * @throws IOException
+   */
+  abstract void enableTaskForCleanup(PathDeletionContext context)
+      throws IOException;
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Jan 26 14:02:53 2010
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -31,13 +32,12 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
 import org.apache.hadoop.mapred.SortedRanges.Range;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.net.Node;
 
 
@@ -65,7 +65,7 @@
 
   // Defines the TIP
   private String jobFile = null;
-  private Job.RawSplit rawSplit;
+  private TaskSplitMetaInfo splitInfo;
   private int numMaps;
   private int partition;
   private JobTracker jobtracker;
@@ -140,12 +140,12 @@
    * Constructor for MapTask
    */
   public TaskInProgress(JobID jobid, String jobFile, 
-                        Job.RawSplit rawSplit, 
+                        TaskSplitMetaInfo split, 
                         JobTracker jobtracker, JobConf conf, 
                         JobInProgress job, int partition,
                         int numSlotsRequired) {
     this.jobFile = jobFile;
-    this.rawSplit = rawSplit;
+    this.splitInfo = split;
     this.jobtracker = jobtracker;
     this.job = job;
     this.conf = conf;
@@ -316,10 +316,36 @@
    * Whether this is a map task
    */
   public boolean isMapTask() {
-    return rawSplit != null;
+    return splitInfo != null;
   }
     
   /**
+   * Returns the {@link TaskType} of the {@link TaskAttemptID} passed. 
+   * The type of an attempt is determined by the nature of the task and not its 
+   * id. 
+   * For example,
+   * - Attempt 'attempt_123_01_m_01_0' might be a job-setup task even though it 
+   *   has a _m_ in its id. Hence the task type of this attempt is JOB_SETUP 
+   *   instead of MAP.
+   * - Similarly reduce attempt 'attempt_123_01_r_01_0' might have failed and is
+   *   now supposed to do the task-level cleanup. In such a case this attempt 
+   *   will be of type TASK_CLEANUP instead of REDUCE.
+   */
+  TaskType getAttemptType (TaskAttemptID id) {
+    if (isCleanupAttempt(id)) {
+      return TaskType.TASK_CLEANUP;
+    } else if (isJobSetupTask()) {
+      return TaskType.JOB_SETUP;
+    } else if (isJobCleanupTask()) {
+      return TaskType.JOB_CLEANUP;
+    } else if (isMapTask()) {
+      return TaskType.MAP;
+    } else {
+      return TaskType.REDUCE;
+    }
+  }
+  
+  /**
    * Is the Task associated with taskid is the first attempt of the tip? 
    * @param taskId
    * @return Returns true if the Task is the first attempt of the tip
@@ -335,6 +361,15 @@
   public boolean isRunning() {
     return !activeTasks.isEmpty();
   }
+
+  /**
+   * Is this TaskAttemptid running
+   * @param taskId
+   * @return true if taskId attempt is running.
+   */
+  boolean isAttemptRunning(TaskAttemptID taskId) {
+    return activeTasks.containsKey(taskId);
+  }
     
   TaskAttemptID getSuccessfulTaskid() {
     return successfulTaskId;
@@ -534,15 +569,16 @@
    * A status message from a client has arrived.
    * It updates the status of a single component-thread-task,
    * which might result in an overall TaskInProgress status update.
-   * @return has the task changed its state noticably?
+   * @return has the task changed its state noticeably?
    */
   synchronized boolean updateStatus(TaskStatus status) {
     TaskAttemptID taskid = status.getTaskID();
+    String tracker = status.getTaskTracker();
     String diagInfo = status.getDiagnosticInfo();
     TaskStatus oldStatus = taskStatuses.get(taskid);
     boolean changed = true;
     if (diagInfo != null && diagInfo.length() > 0) {
-      LOG.info("Error from "+taskid+": "+diagInfo);
+      LOG.info("Error from " + taskid + " on " +  tracker + ": "+ diagInfo);
       addDiagnosticInfo(taskid, diagInfo);
     }
     
@@ -697,6 +733,12 @@
     if (tasks.contains(taskid)) {
       if (taskState == TaskStatus.State.FAILED) {
         numTaskFailures++;
+        if (isMapTask()) {
+          jobtracker.getInstrumentation().failedMap(taskid);
+        } else {
+          jobtracker.getInstrumentation().failedReduce(taskid);
+        }
+        
         machinesWhereFailed.add(trackerHostName);
         if(maxSkipRecords>0) {
           //skipping feature enabled
@@ -707,6 +749,11 @@
 
       } else if (taskState == TaskStatus.State.KILLED) {
         numKilledTasks++;
+        if (isMapTask()) {
+            jobtracker.getInstrumentation().killedMap(taskid);
+          } else {
+            jobtracker.getInstrumentation().killedReduce(taskid);
+          }
       }
     }
 
@@ -787,7 +834,7 @@
    */
   public String[] getSplitLocations() {
     if (isMapTask() && !jobSetup && !jobCleanup) {
-      return rawSplit.getLocations();
+      return splitInfo.getLocations();
     }
     return new String[0];
   }
@@ -800,6 +847,13 @@
   }
 
   /**
+   * Get all the {@link TaskAttemptID}s in this {@link TaskInProgress}
+   */
+  TaskAttemptID[] getAllTaskAttemptIDs() {
+    return tasks.toArray(new TaskAttemptID[tasks.size()]);
+  }
+  
+  /**
    * Get the status of the specified task
    * @param taskid
    * @return
@@ -992,16 +1046,8 @@
     if (isMapTask()) {
       LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
           + failedRanges.getIndicesCount());
-      String splitClass = null;
-      BytesWritable split;
-      if (!jobSetup && !jobCleanup) {
-        splitClass = rawSplit.getClassName();
-        split = rawSplit.getBytes();
-      } else {
-        split = new BytesWritable();
-      }
-      t = new MapTask(jobFile, taskid, partition, splitClass, split,
-                      numSlotsNeeded);
+      t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(),
+          numSlotsNeeded);
     } else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
     }
@@ -1114,7 +1160,7 @@
     if (!isMapTask() || jobSetup || jobCleanup) {
       return "";
     }
-    String[] splits = rawSplit.getLocations();
+    String[] splits = splitInfo.getLocations();
     Node[] nodes = new Node[splits.length];
     for (int i = 0; i < splits.length; i++) {
       nodes[i] = jobtracker.getNode(splits[i]);
@@ -1144,16 +1190,12 @@
 
   public long getMapInputSize() {
     if(isMapTask() && !jobSetup && !jobCleanup) {
-      return rawSplit.getDataLength();
+      return splitInfo.getInputDataLength();
     } else {
       return 0;
     }
   }
   
-  public void clearSplit() {
-    rawSplit.clearBytes();
-  }
-  
   /**
    * Compare most recent task attempts dispatch time to current system time so
    * that task progress rate will slow down as time proceeds even if no progress

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Jan 26 14:02:53 2010
@@ -43,7 +43,6 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
@@ -93,6 +92,8 @@
   public TaskTracker.TaskInProgress getTaskInProgress() { return tip; }
   public TaskTracker getTracker() { return tracker; }
 
+  public JvmManager getJvmManager() { return jvmManager; }
+
   /** Called to assemble this task's input.  This method is run in the parent
    * process before the child is spawned.  It should not execute user code,
    * only system code. */
@@ -172,18 +173,14 @@
       taskDistributedCacheManager = tracker.getTrackerDistributedCacheManager()
           .newTaskDistributedCacheManager(conf);
       taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker
-          .getDistributedCacheDir(conf.getUser()));
+          .getPrivateDistributedCacheDir(conf.getUser()), 
+          TaskTracker.getPublicDistributedCacheDir());
 
       // Set up the child task's configuration. After this call, no localization
       // of files should happen in the TaskTracker's process space. Any changes to
       // the conf object after this will NOT be reflected to the child.
       setupChildTaskConfiguration(lDirAlloc);
 
-      InitializationContext context = new InitializationContext();
-      context.user = conf.getUser();
-      context.workDir = new File(conf.get(TaskTracker.JOB_LOCAL_DIR));
-      tracker.getTaskController().initializeDistributedCache(context);
-
       if (!prepare()) {
         return;
       }
@@ -521,7 +518,7 @@
     }
     hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
                        + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
-    env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
+    env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
 
     // add the env variables passed by the user
     String mapredChildEnv = getChildEnv(conf);
@@ -647,7 +644,40 @@
       }
     }
     classPaths.add(new File(jobCacheDir, "classes").toString());
-    classPaths.add(jobCacheDir.toString());
+    classPaths.add(new File(jobCacheDir, "job.jar").toString());
+  }
+  
+  /**
+   * Sets permissions recursively and then deletes the contents of dir.
+   * Makes dir empty directory(does not delete dir itself).
+   */
+  static void deleteDirContents(JobConf conf, File dir) throws IOException {
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (fs.exists(new Path(dir.getAbsolutePath()))) {
+      File contents[] = dir.listFiles();
+      if (contents != null) {
+        for (int i = 0; i < contents.length; i++) {
+          try {
+            int ret = 0;
+            if ((ret = FileUtil.chmod(contents[i].getAbsolutePath(),
+                                      "ug+rwx", true)) != 0) {
+              LOG.warn("Unable to chmod for " + contents[i] + 
+                  "; chmod exit status = " + ret);
+            }
+          } catch(InterruptedException e) {
+            LOG.warn("Interrupted while setting permissions for contents of " +
+                "workDir. Not deleting the remaining contents of workDir.");
+            return;
+          }
+          if (!fs.delete(new Path(contents[i].getAbsolutePath()), true)) {
+            LOG.warn("Unable to delete "+ contents[i]);
+          }
+        }
+      }
+    }
+    else {
+      LOG.warn(dir + " does not exist.");
+    }
   }
   
   /**
@@ -660,11 +690,14 @@
    * @param workDir Working directory, which is completely deleted.
    */
   public static void setupWorkDir(JobConf conf, File workDir) throws IOException {
-    LOG.debug("Fully deleting and re-creating" + workDir);
-    FileUtil.fullyDelete(workDir);
-    if (!workDir.mkdir()) {
-      LOG.debug("Did not recreate " + workDir);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Fully deleting contents of " + workDir);
     }
+
+    /** delete only the contents of workDir leaving the directory empty. We
+     * can't delete the workDir as it is the current working directory.
+     */
+    deleteDirContents(conf, workDir);
     
     if (DistributedCache.getSymlink(conf)) {
       URI[] archives = DistributedCache.getCacheArchives(conf);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jan 26 14:02:53 2010
@@ -38,11 +38,11 @@
 import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.TreeMap;
-import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
 
+import javax.crypto.SecretKey;
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -67,14 +67,19 @@
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.TaskController.DebugScriptContext;
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
-import org.apache.hadoop.mapreduce.security.JobTokens;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -86,13 +91,16 @@
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.authorize.ConfiguredPolicy;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.Service;
@@ -100,7 +108,7 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 
 /*******************************************************
  * TaskTracker is a process that starts and tracks MR Tasks
@@ -123,6 +131,7 @@
   static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
     "mapred.tasktracker.pmem.reserved";
 
+
   static final long WAIT_FOR_DONE = 3 * 1000;
   int httpPort;
 
@@ -194,6 +203,8 @@
    */
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
   Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
+  private final JobTokenSecretManager jobTokenSecretManager 
+    = new JobTokenSecretManager();
 
   volatile int mapTotal = 0;
   volatile int reduceTotal = 0;
@@ -221,13 +232,14 @@
   static final String OUTPUT = "output";
   private static final String JARSDIR = "jars";
   static final String LOCAL_SPLIT_FILE = "split.dta";
+  static final String LOCAL_SPLIT_META_FILE = "split.info";
   static final String JOBFILE = "job.xml";
   static final String JOB_TOKEN_FILE="jobToken"; //localized file
 
   static final String JOB_LOCAL_DIR = JobContext.JOB_LOCAL_DIR;
 
   private JobConf fConf;
-  FileSystem localFs;
+  private FileSystem localFs;
 
   private Localizer localizer;
 
@@ -254,9 +266,7 @@
   private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
-
-  static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
-      TT_MEMORY_CALCULATOR_PLUGIN;
+  private ResourceCalculatorPlugin resourceCalculatorPlugin = null;
 
   /**
    * the minimum interval between jobtracker polls
@@ -269,6 +279,8 @@
 
   private IndexCache indexCache;
 
+  private MRAsyncDiskService asyncDiskService;
+  
   /**
   * Handle to the specific instance of the {@link TaskController} class
   */
@@ -365,6 +377,11 @@
     return taskController;
   }
   
+  // Currently this is used only by tests
+  void setTaskController(TaskController t) {
+    taskController = t;
+  }
+  
   private RunningJob addTaskToJob(JobID jobId, 
                                   TaskInProgress tip) {
     synchronized (runningJobs) {
@@ -398,6 +415,10 @@
     }
   }
 
+  JobTokenSecretManager getJobTokenSecretManager() {
+    return jobTokenSecretManager;
+  }
+
   Localizer getLocalizer() {
     return localizer;
   }
@@ -410,9 +431,13 @@
     return TaskTracker.SUBDIR + Path.SEPARATOR + user;
   }
 
-  public static String getDistributedCacheDir(String user) {
+  public static String getPrivateDistributedCacheDir(String user) {
     return getUserDir(user) + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
   }
+  
+  public static String getPublicDistributedCacheDir() {
+    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
+  }
 
   public static String getJobCacheSubdir(String user) {
     return getUserDir(user) + Path.SEPARATOR + TaskTracker.JOBCACHE;
@@ -449,11 +474,16 @@
     return getLocalJobDir(user, jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
   }
 
-  static String getLocalSplitFile(String user, String jobid, String taskid) {
+  static String getLocalSplitMetaFile(String user, String jobid, String taskid){
     return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
-        + TaskTracker.LOCAL_SPLIT_FILE;
+        + TaskTracker.LOCAL_SPLIT_META_FILE;
   }
 
+  static String getLocalSplitFile(String user, String jobid, String taskid) {
+    return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
+           + TaskTracker.LOCAL_SPLIT_FILE;
+  }
+  
   static String getIntermediateOutputDir(String user, String jobid,
       String taskid) {
     return getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
@@ -475,10 +505,7 @@
 
   static String getTaskWorkDir(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
-    String dir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid;
-    if (isCleanupAttempt) {
-      dir = dir + TASK_CLEANUP_SUFFIX;
-    }
+    String dir = getLocalTaskDir(user, jobid, taskid, isCleanupAttempt);
     return dir + Path.SEPARATOR + MRConstants.WORKDIR;
   }
 
@@ -529,9 +556,11 @@
        fConf.get(TT_DNS_NAMESERVER,"default"));
     }
  
-    //check local disk
+    // Check local disk, start async disk service, and clean up all 
+    // local directories.
     checkLocalDirs(this.fConf.getLocalDirs());
-    fConf.deleteLocalFiles(SUBDIR);
+    asyncDiskService = new MRAsyncDiskService(fConf);
+    asyncDiskService.cleanupAllVolumes();
 
     // Clear out state tables
     this.tasks.clear();
@@ -597,12 +626,19 @@
     this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
     LOG.info("Starting tracker " + taskTrackerName);
 
-    // Initialize DistributedCache and
-    // clear out temporary files that might be lying around
+    Class<? extends TaskController> taskControllerClass = fConf.getClass(
+        TT_TASK_CONTROLLER, DefaultTaskController.class, TaskController.class);
+    taskController = (TaskController) ReflectionUtils.newInstance(
+        taskControllerClass, fConf);
+
+
+    // setup and create jobcache directory with appropriate permissions
+    taskController.setup();
+
+    // Initialize DistributedCache
     this.distributedCacheManager = 
-        new TrackerDistributedCacheManager(this.fConf);
-    this.distributedCacheManager.purgeCache();
-    cleanupStorage();
+        new TrackerDistributedCacheManager(this.fConf, taskController,
+        asyncDiskService);
 
     //mark as just started; this is used in heartbeats
     this.justStarted = true;
@@ -625,6 +661,12 @@
                              taskTrackerName);
     mapEventsFetcher.start();
 
+    Class<? extends ResourceCalculatorPlugin> clazz =
+        fConf.getClass(TT_RESOURCE_CALCULATOR_PLUGIN,
+            null, ResourceCalculatorPlugin.class);
+    resourceCalculatorPlugin = ResourceCalculatorPlugin
+            .getResourceCalculatorPlugin(clazz, fConf);
+    LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculatorPlugin);
     initializeMemoryManagement();
 
     this.indexCache = new IndexCache(this.fConf);
@@ -633,15 +675,6 @@
     reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
     mapLauncher.start();
     reduceLauncher.start();
-    Class<? extends TaskController> taskControllerClass 
-                          = fConf.getClass(TT_TASK_CONTROLLER,
-                                            DefaultTaskController.class, 
-                                            TaskController.class); 
-    taskController = (TaskController)ReflectionUtils.newInstance(
-                                                      taskControllerClass, fConf);
-    
-    //setup and create jobcache directory with appropriate permissions
-    taskController.setup();
 
     // create a localizer instance
     setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));
@@ -667,10 +700,14 @@
         t, TaskTrackerInstrumentation.class);
   }
   
-  /** 
-   * Removes all contents of temporary storage.  Called upon 
+  /**
+   * Removes all contents of temporary storage.  Called upon
    * startup, to remove any leftovers from previous run.
+   * 
+   * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
+   * @see org.apache.hadoop.mapreduce.util.MRAsyncDiskService#cleanupAllVolumes()
    */
+  @Deprecated
   public void cleanupStorage() throws IOException {
     if (fConf != null) {
       fConf.deleteLocalFiles();
@@ -880,18 +917,22 @@
         rjob.jobConf = localJobConf;
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
-        FSDataInputStream in = localFs.open(new Path(
-            rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
-        JobTokens jt = new JobTokens();
-        jt.readFields(in); 
-        rjob.jobTokens = jt; // store JobToken object per job
-        
+        TokenStorage ts = TokenCache.loadTokens(rjob.jobConf);
+        Token<JobTokenIdentifier> jt = (Token<JobTokenIdentifier>)ts.getJobToken(); 
+        getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
         rjob.localized = true;
       }
     }
     launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
   }
 
+  private void setUgi(String user, Configuration conf) {
+    //The dummy-group used here will not be required once we have UGI
+    //object creation with just the user name.
+    conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, 
+        user+","+UnixUserGroupInformation.DEFAULT_GROUP);
+  }
+  
   /**
    * Localize the job on this tasktracker. Specifically
    * <ul>
@@ -934,6 +975,7 @@
     }
     System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
     localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
+    setUgi(userName, localJobConf);
 
     // Download the job.jar for this job from the system FS
     localizeJobJarFile(userName, jobId, localFs, localJobConf);
@@ -952,12 +994,17 @@
    */
   private Path localizeJobConfFile(Path jobFile, String user, JobID jobId)
       throws IOException {
-    // Get sizes of JobFile and JarFile
+    JobConf conf = new JobConf(getJobConf());
+    setUgi(user, conf);
+    
+    FileSystem userFs = jobFile.getFileSystem(conf);
+    // Get sizes of JobFile
     // sizes are -1 if they are not present.
     FileStatus status = null;
     long jobFileSize = -1;
     try {
-      status = systemFS.getFileStatus(jobFile);
+      
+      status = userFs.getFileStatus(jobFile);
       jobFileSize = status.getLen();
     } catch(FileNotFoundException fe) {
       jobFileSize = -1;
@@ -968,7 +1015,7 @@
             jobFileSize, fConf);
 
     // Download job.xml
-    systemFS.copyToLocalFile(jobFile, localJobFile);
+    userFs.copyToLocalFile(jobFile, localJobFile);
     return localJobFile;
   }
 
@@ -990,8 +1037,9 @@
     long jarFileSize = -1;
     if (jarFile != null) {
       Path jarFilePath = new Path(jarFile);
+      FileSystem fs = jarFilePath.getFileSystem(localJobConf);
       try {
-        status = systemFS.getFileStatus(jarFilePath);
+        status = fs.getFileStatus(jarFilePath);
         jarFileSize = status.getLen();
       } catch (FileNotFoundException fe) {
         jarFileSize = -1;
@@ -1003,14 +1051,15 @@
               getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf);
 
       // Download job.jar
-      systemFS.copyToLocalFile(jarFilePath, localJarFile);
+      fs.copyToLocalFile(jarFilePath, localJarFile);
 
       localJobConf.setJar(localJarFile.toString());
 
-      // Also un-jar the job.jar files. We un-jar it so that classes inside
-      // sub-directories, for e.g., lib/, classes/ are available on class-path
-      RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile
-          .getParent().toString()));
+      // Un-jar the parts of the job.jar that need to be added to the classpath
+      RunJar.unJar(
+        new File(localJarFile.toString()),
+        new File(localJarFile.getParent().toString()),
+        localJobConf.getJarUnpackPattern());
     }
   }
 
@@ -1081,9 +1130,23 @@
 
     this.running = false;
 
-    // Clear local storage
-    cleanupStorage();
-        
+    if (asyncDiskService != null) {
+      // Clear local storage
+      asyncDiskService.cleanupAllVolumes();
+      
+      // Shutdown all async deletion threads with up to 10 seconds of delay
+      asyncDiskService.shutdown();
+      try {
+        if (!asyncDiskService.awaitTermination(10000)) {
+          asyncDiskService.shutdownNow();
+          asyncDiskService = null;
+        }
+      } catch (InterruptedException e) {
+        asyncDiskService.shutdownNow();
+        asyncDiskService = null;
+      }
+    }
+    
     // Shutdown the fetcher thread
     if (mapEventsFetcher != null) {
       mapEventsFetcher.interrupt();
@@ -1229,6 +1292,16 @@
     directoryCleanupThread = new CleanupQueue();
   }
 
+
+  // only used by tests
+  void setCleanupThread(CleanupQueue c) {
+    directoryCleanupThread = c;
+  }
+  
+  CleanupQueue getCleanupThread() {
+    return directoryCleanupThread;
+  }
+
   /**
    * Tell the cleanup threads that they should end themselves   
    */
@@ -1429,7 +1502,7 @@
    * @return false if the tracker was unknown
    * @throws IOException
    */
-  private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+  HeartbeatResponse transmitHeartBeat(long now) throws IOException {
     // Send Counters in the status once every COUNTER_UPDATE_INTERVAL
     boolean sendCounters;
     if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
@@ -1478,6 +1551,12 @@
       long freeDiskSpace = getFreeSpace();
       long totVmem = getTotalVirtualMemoryOnTT();
       long totPmem = getTotalPhysicalMemoryOnTT();
+      long availableVmem = getAvailableVirtualMemoryOnTT();
+      long availablePmem = getAvailablePhysicalMemoryOnTT();
+      long cumuCpuTime = getCumulativeCpuTimeOnTT();
+      long cpuFreq = getCpuFrequencyOnTT();
+      int numCpu = getNumProcessorsOnTT();
+      float cpuUsage = getCpuUsageOnTT();
 
       status.getResourceStatus().setAvailableSpace(freeDiskSpace);
       status.getResourceStatus().setTotalVirtualMemory(totVmem);
@@ -1486,6 +1565,12 @@
           mapSlotMemorySizeOnTT);
       status.getResourceStatus().setReduceSlotMemorySizeOnTT(
           reduceSlotSizeMemoryOnTT);
+      status.getResourceStatus().setAvailableVirtualMemory(availableVmem); 
+      status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
+      status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
+      status.getResourceStatus().setCpuFrequency(cpuFreq);
+      status.getResourceStatus().setNumProcessors(numCpu);
+      status.getResourceStatus().setCpuUsage(cpuUsage);
     }
     //add node health information
     
@@ -1562,6 +1647,80 @@
     return totalPhysicalMemoryOnTT;
   }
 
+  /**
+   * Return the free virtual memory available on this TaskTracker.
+   * @return total size of free virtual memory.
+   */
+  long getAvailableVirtualMemoryOnTT() {
+    long availableVirtualMemoryOnTT = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      availableVirtualMemoryOnTT =
+              resourceCalculatorPlugin.getAvailableVirtualMemorySize();
+    }
+    return availableVirtualMemoryOnTT;
+  }
+
+  /**
+   * Return the free physical memory available on this TaskTracker.
+   * @return total size of free physical memory in bytes
+   */
+  long getAvailablePhysicalMemoryOnTT() {
+    long availablePhysicalMemoryOnTT = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      availablePhysicalMemoryOnTT =
+              resourceCalculatorPlugin.getAvailablePhysicalMemorySize();
+    }
+    return availablePhysicalMemoryOnTT;
+  }
+
+  /**
+   * Return the cumulative CPU used time on this TaskTracker since system is on
+   * @return cumulative CPU used time in millisecond
+   */
+  long getCumulativeCpuTimeOnTT() {
+    long cumulativeCpuTime = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      cumulativeCpuTime = resourceCalculatorPlugin.getCumulativeCpuTime();
+    }
+    return cumulativeCpuTime;
+  }
+
+  /**
+   * Return the number of Processors on this TaskTracker
+   * @return number of processors
+   */
+  int getNumProcessorsOnTT() {
+    int numProcessors = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      numProcessors = resourceCalculatorPlugin.getNumProcessors();
+    }
+    return numProcessors;
+  }
+
+  /**
+   * Return the CPU frequency of this TaskTracker
+   * @return CPU frequency in kHz
+   */
+  long getCpuFrequencyOnTT() {
+    long cpuFrequency = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      cpuFrequency = resourceCalculatorPlugin.getCpuFrequency();
+    }
+    return cpuFrequency;
+  }
+
+  /**
+   * Return the CPU usage in % of this TaskTracker
+   * @return CPU usage in %
+   */
+  float getCpuUsageOnTT() {
+    float cpuUsage = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      cpuUsage = resourceCalculatorPlugin.getCpuUsage();
+    }
+    return cpuUsage;
+  }
+  
   long getTotalMemoryAllottedForTasksOnTT() {
     return totalMemoryAllottedForTasks;
   }
@@ -1613,6 +1772,7 @@
           ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
           tip.reportDiagnosticInfo(msg);
           myInstrumentation.timedoutTask(tip.getTask().getTaskID());
+          dumpTaskStack(tip);
           purgeTask(tip, true);
         }
       }
@@ -1620,6 +1780,60 @@
   }
 
   /**
+   * Builds list of PathDeletionContext objects for the given paths
+   */
+  private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
+      Path[] paths) {
+    int i = 0;
+    PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
+
+    for (Path p : paths) {
+      contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
+    }
+    return contexts;
+  }
+
+  /**
+   * Builds list of TaskControllerPathDeletionContext objects for a task
+   * @param fs    : FileSystem in which the dirs to be deleted
+   * @param paths : mapred-local-dirs
+   * @param task  : the task whose taskDir or taskWorkDir is going to be deleted
+   * @param isWorkDir : the dir to be deleted is workDir or taskDir
+   * @param taskController : the task-controller to be used for deletion of
+   *                         taskDir or taskWorkDir
+   */
+  static PathDeletionContext[] buildTaskControllerPathDeletionContexts(
+      FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
+      TaskController taskController)
+      throws IOException {
+    int i = 0;
+    PathDeletionContext[] contexts =
+                          new TaskControllerPathDeletionContext[paths.length];
+
+    for (Path p : paths) {
+      contexts[i++] = new TaskControllerPathDeletionContext(fs, p, task,
+                          isWorkDir, taskController);
+    }
+    return contexts;
+  }
+
+  /**
+   * Send a signal to a stuck task commanding it to dump stack traces
+   * to stderr before we kill it with purgeTask().
+   *
+   * @param tip {@link TaskInProgress} to dump stack traces.
+   */
+  private void dumpTaskStack(TaskInProgress tip) {
+    TaskRunner runner = tip.getTaskRunner();
+    if (null == runner) {
+      return; // tip is already abandoned.
+    }
+
+    JvmManager jvmMgr = runner.getJvmManager();
+    jvmMgr.dumpStack(runner);
+  }
+
+  /**
    * The task tracker is done with this job, so we need to clean up.
    * @param action The action with the job
    * @throws IOException
@@ -1657,6 +1871,7 @@
     synchronized(runningJobs) {
       runningJobs.remove(jobId);
     }
+    getJobTokenSecretManager().removeTokenForJob(jobId.toString());
   }
 
   /**
@@ -1667,8 +1882,9 @@
    */
   void removeJobFiles(String user, String jobId)
       throws IOException {
-    directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf,
-        getLocalJobDir(user, jobId)));
+    PathDeletionContext[] contexts = buildPathDeletionContexts(localFs,
+        getLocalFiles(fConf, getLocalJobDir(user, jobId)));
+    directoryCleanupThread.addToQueue(contexts);
   }
 
   /**
@@ -2739,29 +2955,33 @@
           runner.close();
         }
 
-        String localTaskDir =
-            getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
-                .toString(), task.isTaskCleanupTask());
         if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
           // No jvm reuse, remove everything
-          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
-              defaultJobConf, localTaskDir));
+          PathDeletionContext[] contexts =
+            buildTaskControllerPathDeletionContexts(localFs,
+                getLocalFiles(fConf, ""), task, false/* not workDir */,
+                taskController);
+          directoryCleanupThread.addToQueue(contexts);
         } else {
           // Jvm reuse. We don't delete the workdir since some other task
           // (running in the same JVM) might be using the dir. The JVM
           // running the tasks would clean the workdir per a task in the
           // task process itself.
-          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
-              defaultJobConf, localTaskDir + Path.SEPARATOR
-                  + TaskTracker.JOBFILE));
+          String localTaskDir =
+            getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
+                .toString(), task.isTaskCleanupTask());
+          PathDeletionContext[] contexts = buildPathDeletionContexts(
+              localFs, getLocalFiles(defaultJobConf, localTaskDir +
+                         Path.SEPARATOR + TaskTracker.JOBFILE));
+          directoryCleanupThread.addToQueue(contexts);
         }
       } else {
         if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-          String taskWorkDir =
-              getTaskWorkDir(task.getUser(), task.getJobID().toString(),
-                  taskId.toString(), task.isTaskCleanupTask());
-          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
-              defaultJobConf, taskWorkDir));
+          PathDeletionContext[] contexts =
+            buildTaskControllerPathDeletionContexts(localFs,
+              getLocalFiles(fConf, ""), task, true /* workDir */,
+              taskController);
+          directoryCleanupThread.addToQueue(contexts);
         }
       }
     }
@@ -3001,7 +3221,6 @@
     boolean localized;
     boolean keepJobFiles;
     FetchStatus f;
-    JobTokens jobTokens;
     RunningJob(JobID jobid) {
       this.jobid = jobid;
       localized = false;
@@ -3379,14 +3598,8 @@
     private void verifyRequest(HttpServletRequest request, 
         HttpServletResponse response, TaskTracker tracker, String jobId) 
     throws IOException {
-      JobTokens jt = null;
-      synchronized (tracker.runningJobs) {
-        RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
-        if (rjob == null) {
-          throw new IOException("Unknown job " + jobId + "!!");
-        }
-        jt = rjob.jobTokens;
-      }
+      SecretKey tokenSecret = tracker.getJobTokenSecretManager()
+          .retrieveTokenSecret(jobId);
       // string to encrypt
       String enc_str = SecureShuffleUtils.buildMsgFrom(request);
       
@@ -3400,17 +3613,16 @@
       LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+
           urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug
 
-      SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken());
       // verify - throws exception
       try {
-        ssutil.verifyReply(urlHashStr, enc_str);
+        SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
       } catch (IOException ioe) {
         response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
         throw ioe;
       }
       
       // verification passed - encode the reply
-      String reply = ssutil.generateHash(urlHashStr.getBytes());
+      String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret);
       response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
       
       len = reply.length();
@@ -3419,19 +3631,29 @@
     }
   }
   
-
   // get the full paths of the directory in all the local disks.
-  private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
+  Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
     String[] localDirs = conf.getLocalDirs();
     Path[] paths = new Path[localDirs.length];
     FileSystem localFs = FileSystem.getLocal(conf);
+    boolean subdirNeeded = (subdir != null) && (subdir.length() > 0);
     for (int i = 0; i < localDirs.length; i++) {
-      paths[i] = new Path(localDirs[i], subdir);
+      paths[i] = (subdirNeeded) ? new Path(localDirs[i], subdir)
+                                : new Path(localDirs[i]);
       paths[i] = paths[i].makeQualified(localFs);
     }
     return paths;
   }
 
+  FileSystem getLocalFileSystem(){
+    return localFs;
+  }
+
+  // only used by tests
+  void setLocalFileSystem(FileSystem fs){
+    localFs = fs;
+  }
+
   int getMaxCurrentMapTasks() {
     return maxMapSlots;
   }
@@ -3515,22 +3737,24 @@
           JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY));
     }
 
-    Class<? extends MemoryCalculatorPlugin> clazz =
-        fConf.getClass(TT_MEMORY_CALCULATOR_PLUGIN,
-            null, MemoryCalculatorPlugin.class);
-    MemoryCalculatorPlugin memoryCalculatorPlugin =
-        MemoryCalculatorPlugin
-            .getMemoryCalculatorPlugin(clazz, fConf);
-    LOG.info(" Using MemoryCalculatorPlugin : " + memoryCalculatorPlugin);
-
-    if (memoryCalculatorPlugin != null) {
-      totalVirtualMemoryOnTT = memoryCalculatorPlugin.getVirtualMemorySize();
+    // Use TT_MEMORY_CALCULATOR_PLUGIN if it is configured.
+    Class<? extends MemoryCalculatorPlugin> clazz = 
+        fConf.getClass(TT_MEMORY_CALCULATOR_PLUGIN, 
+            null, MemoryCalculatorPlugin.class); 
+    MemoryCalculatorPlugin memoryCalculatorPlugin = (clazz == null ?
+        null : MemoryCalculatorPlugin.getMemoryCalculatorPlugin(clazz, fConf)); 
+    if (memoryCalculatorPlugin != null || resourceCalculatorPlugin != null) {
+      totalVirtualMemoryOnTT = (memoryCalculatorPlugin == null ?
+          resourceCalculatorPlugin.getVirtualMemorySize() :
+          memoryCalculatorPlugin.getVirtualMemorySize());
       if (totalVirtualMemoryOnTT <= 0) {
         LOG.warn("TaskTracker's totalVmem could not be calculated. "
             + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
         totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
-      totalPhysicalMemoryOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
+      totalPhysicalMemoryOnTT = (memoryCalculatorPlugin == null ?
+          resourceCalculatorPlugin.getPhysicalMemorySize() :
+          memoryCalculatorPlugin.getPhysicalMemorySize());
       if (totalPhysicalMemoryOnTT <= 0) {
         LOG.warn("TaskTracker's totalPmem could not be calculated. "
             + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
@@ -3662,7 +3886,7 @@
         throws IOException {
       // check if the tokenJob file is there..
       Path skPath = new Path(systemDirectory,
-          jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME);
+          jobId.toString()+"/"+SecureShuffleUtils.JOB_TOKEN_FILENAME);
 
       FileStatus status = null;
       long jobTokenSize = -1;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Jan 26 14:02:53 2010
@@ -55,6 +55,7 @@
   private int maxReduceTasks;
   private TaskTrackerHealthStatus healthStatus;
    
+  public static final int UNAVAILABLE = -1;
   /**
    * Class representing a collection of resources on this tasktracker.
    */
@@ -65,7 +66,14 @@
     private long mapSlotMemorySizeOnTT;
     private long reduceSlotMemorySizeOnTT;
     private long availableSpace;
-    
+
+    private long availableVirtualMemory = UNAVAILABLE; // in byte
+    private long availablePhysicalMemory = UNAVAILABLE; // in byte
+    private int numProcessors = UNAVAILABLE;
+    private long cumulativeCpuTime = UNAVAILABLE; // in millisecond
+    private long cpuFrequency = UNAVAILABLE; // in kHz
+    private float cpuUsage = UNAVAILABLE; // in %
+
     ResourceStatus() {
       totalVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
       totalPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
@@ -172,21 +180,160 @@
     long getAvailableSpace() {
       return availableSpace;
     }
+
+    /**
+     * Set the amount of available virtual memory on the tasktracker.
+     * If the input is not a valid number, it will be set to UNAVAILABLE
+     *
+     * @param vmem amount of available virtual memory on the tasktracker
+     *                    in bytes.
+     */
+    void setAvailableVirtualMemory(long availableMem) {
+      availableVirtualMemory = availableMem > 0 ?
+                               availableMem : UNAVAILABLE;
+    }
+
+    /**
+     * Get the amount of available virtual memory on the tasktracker.
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return the amount of available virtual memory on the tasktracker
+     *             in bytes.
+     */
+    long getAvailabelVirtualMemory() {
+      return availableVirtualMemory;
+    }
+
+    /**
+     * Set the amount of available physical memory on the tasktracker.
+     * If the input is not a valid number, it will be set to UNAVAILABLE
+     *
+     * @param availableRAM amount of available physical memory on the
+     *                     tasktracker in bytes.
+     */
+    void setAvailablePhysicalMemory(long availableRAM) {
+      availablePhysicalMemory = availableRAM > 0 ?
+                                availableRAM : UNAVAILABLE;
+    }
+
+    /**
+     * Get the amount of available physical memory on the tasktracker.
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return amount of available physical memory on the tasktracker in bytes.
+     */
+    long getAvailablePhysicalMemory() {
+      return availablePhysicalMemory;
+    }
+
+    /**
+     * Set the CPU frequency of this TaskTracker
+     * If the input is not a valid number, it will be set to UNAVAILABLE
+     *
+     * @param cpuFrequency CPU frequency in kHz
+     */
+    public void setCpuFrequency(long cpuFrequency) {
+      this.cpuFrequency = cpuFrequency > 0 ?
+                          cpuFrequency : UNAVAILABLE;
+    }
+
+    /**
+     * Get the CPU frequency of this TaskTracker
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return CPU frequency in kHz
+     */
+    public long getCpuFrequency() {
+      return cpuFrequency;
+    }
+
+    /**
+     * Set the number of processors on this TaskTracker
+     * If the input is not a valid number, it will be set to UNAVAILABLE
+     *
+     * @param numProcessors number of processors
+     */
+    public void setNumProcessors(int numProcessors) {
+      this.numProcessors = numProcessors > 0 ?
+                           numProcessors : UNAVAILABLE;
+    }
+
+    /**
+     * Get the number of processors on this TaskTracker
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return number of processors
+     */
+    public int getNumProcessors() {
+      return numProcessors;
+    }
+
+    /**
+     * Set the cumulative CPU time on this TaskTracker since it is up
+     * It can be set to UNAVAILABLE if it is currently unavailable.
+     *
+     * @param cumulativeCpuTime Used CPU time in millisecond
+     */
+    public void setCumulativeCpuTime(long cumulativeCpuTime) {
+      this.cumulativeCpuTime = cumulativeCpuTime > 0 ?
+                               cumulativeCpuTime : UNAVAILABLE;
+    }
+
+    /**
+     * Get the cumulative CPU time on this TaskTracker since it is up
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return used CPU time in milliseconds
+     */
+    public long getCumulativeCpuTime() {
+      return cumulativeCpuTime;
+    }
+    
+    /**
+     * Set the CPU usage on this TaskTracker
+     * 
+     * @param cpuUsage CPU usage in %
+     */
+    public void setCpuUsage(float cpuUsage) {
+      this.cpuUsage = cpuUsage;
+    }
+
+    /**
+     * Get the CPU usage on this TaskTracker
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return CPU usage in %
+     */
+    public float getCpuUsage() {
+      return cpuUsage;
+    }
     
     public void write(DataOutput out) throws IOException {
       WritableUtils.writeVLong(out, totalVirtualMemory);
       WritableUtils.writeVLong(out, totalPhysicalMemory);
+      WritableUtils.writeVLong(out, availableVirtualMemory);
+      WritableUtils.writeVLong(out, availablePhysicalMemory);
       WritableUtils.writeVLong(out, mapSlotMemorySizeOnTT);
       WritableUtils.writeVLong(out, reduceSlotMemorySizeOnTT);
       WritableUtils.writeVLong(out, availableSpace);
+      WritableUtils.writeVLong(out, cumulativeCpuTime);
+      WritableUtils.writeVLong(out, cpuFrequency);
+      WritableUtils.writeVInt(out, numProcessors);
+      out.writeFloat(getCpuUsage());
     }
     
     public void readFields(DataInput in) throws IOException {
       totalVirtualMemory = WritableUtils.readVLong(in);
       totalPhysicalMemory = WritableUtils.readVLong(in);
+      availableVirtualMemory = WritableUtils.readVLong(in);
+      availablePhysicalMemory = WritableUtils.readVLong(in);
       mapSlotMemorySizeOnTT = WritableUtils.readVLong(in);
       reduceSlotMemorySizeOnTT = WritableUtils.readVLong(in);
       availableSpace = WritableUtils.readVLong(in);
+      cumulativeCpuTime = WritableUtils.readVLong(in);
+      cpuFrequency = WritableUtils.readVLong(in);
+      numProcessors = WritableUtils.readVInt(in);
+      setCpuUsage(in.readFloat());
     }
   }
   

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java Tue Jan 26 14:02:53 2010
@@ -18,8 +18,6 @@
 package org.apache.hadoop.mapred.lib;
 
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Stringifier;
-import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -32,45 +30,19 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 
 /**
  * The Chain class provides all the common functionality for the
  * {@link ChainMapper} and the {@link ChainReducer} classes.
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.chain.Chain} instead
  */
-class Chain {
-  private static final String CHAIN_MAPPER = "chain.mapper";
-  private static final String CHAIN_REDUCER = "chain.reducer";
-
-  private static final String CHAIN_MAPPER_SIZE = ".size";
-  private static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
-  private static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
-  private static final String CHAIN_REDUCER_CLASS = ".reducer.class";
-  private static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
+@Deprecated
+class Chain extends org.apache.hadoop.mapreduce.lib.chain.Chain {
 
   private static final String MAPPER_BY_VALUE = "chain.mapper.byValue";
   private static final String REDUCER_BY_VALUE = "chain.reducer.byValue";
 
-  private static final String MAPPER_INPUT_KEY_CLASS =
-    "chain.mapper.input.key.class";
-  private static final String MAPPER_INPUT_VALUE_CLASS =
-    "chain.mapper.input.value.class";
-  private static final String MAPPER_OUTPUT_KEY_CLASS =
-    "chain.mapper.output.key.class";
-  private static final String MAPPER_OUTPUT_VALUE_CLASS =
-    "chain.mapper.output.value.class";
-  private static final String REDUCER_INPUT_KEY_CLASS =
-    "chain.reducer.input.key.class";
-  private static final String REDUCER_INPUT_VALUE_CLASS =
-    "chain.reducer.input.value.class";
-  private static final String REDUCER_OUTPUT_KEY_CLASS =
-    "chain.reducer.output.key.class";
-  private static final String REDUCER_OUTPUT_VALUE_CLASS =
-    "chain.reducer.output.value.class";
-
-  private boolean isMap;
-
   private JobConf chainJobConf;
 
   private List<Mapper> mappers = new ArrayList<Mapper>();
@@ -92,51 +64,7 @@
    *              Reducer.
    */
   Chain(boolean isMap) {
-    this.isMap = isMap;
-  }
-
-  /**
-   * Returns the prefix to use for the configuration of the chain depending
-   * if it is for a Mapper or a Reducer.
-   *
-   * @param isMap TRUE for Mapper, FALSE for Reducer.
-   * @return the prefix to use.
-   */
-  private static String getPrefix(boolean isMap) {
-    return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
-  }
-
-  /**
-   * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
-   * <p/>
-   * It creates a new JobConf using the chain job's JobConf as base and adds to
-   * it the configuration properties for the chain element. The keys of the
-   * chain element jobConf have precedence over the given JobConf.
-   *
-   * @param jobConf the chain job's JobConf.
-   * @param confKey the key for chain element configuration serialized in the
-   *                chain job's JobConf.
-   * @return a new JobConf aggregating the chain job's JobConf with the chain
-   *         element configuration properties.
-   */
-  private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
-    JobConf conf;
-    try {
-      Stringifier<JobConf> stringifier =
-        new DefaultStringifier<JobConf>(jobConf, JobConf.class);
-      conf = stringifier.fromString(jobConf.get(confKey, null));
-    } catch (IOException ioex) {
-      throw new RuntimeException(ioex);
-    }
-    // we have to do this because the Writable desearialization clears all
-    // values set in the conf making not possible do do a new JobConf(jobConf)
-    // in the creation of the conf above
-    jobConf = new JobConf(jobConf);
-
-    for(Map.Entry<String, String> entry : conf) {
-      jobConf.set(entry.getKey(), entry.getValue());
-    }
-    return jobConf;
+    super(isMap);
   }
 
   /**
@@ -169,82 +97,27 @@
     String prefix = getPrefix(isMap);
 
     // if a reducer chain check the Reducer has been already set
-    if (!isMap) {
-      if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS,
-                           Reducer.class) == null) {
-        throw new IllegalStateException(
-          "A Mapper can be added to the chain only after the Reducer has " +
-          "been set");
-      }
-    }
-    int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
+    checkReducerAlreadySet(isMap, jobConf, prefix, true);
+	    
+    // set the mapper class
+    int index = getIndex(jobConf, prefix);
     jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
-
-    // if it is a reducer chain and the first Mapper is being added check the
-    // key and value input classes of the mapper match those of the reducer
-    // output.
-    if (!isMap && index == 0) {
-      JobConf reducerConf =
-        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
-      if (! inputKeyClass.isAssignableFrom(
-        reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
-        throw new IllegalArgumentException("The Reducer output key class does" +
-          " not match the Mapper input key class");
-      }
-      if (! inputValueClass.isAssignableFrom(
-        reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
-        throw new IllegalArgumentException("The Reducer output value class" +
-          " does not match the Mapper input value class");
-      }
-    } else if (index > 0) {
-      // check the that the new Mapper in the chain key and value input classes
-      // match those of the previous Mapper output.
-      JobConf previousMapperConf =
-        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG +
-          (index - 1));
-      if (! inputKeyClass.isAssignableFrom(
-        previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
-        throw new IllegalArgumentException("The Mapper output key class does" +
-          " not match the previous Mapper input key class");
-      }
-      if (! inputValueClass.isAssignableFrom(
-        previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
-        throw new IllegalArgumentException("The Mapper output value class" +
-          " does not match the previous Mapper input value class");
-      }
-    }
-
+	    
+    validateKeyValueTypes(isMap, jobConf, inputKeyClass, inputValueClass,
+      outputKeyClass, outputValueClass, index, prefix);
+	    
     // if the Mapper does not have a private JobConf create an empty one
     if (mapperConf == null) {
-      // using a JobConf without defaults to make it lightweight.
-      // still the chain JobConf may have all defaults and this conf is
-      // overlapped to the chain JobConf one.
+    // using a JobConf without defaults to make it lightweight.
+    // still the chain JobConf may have all defaults and this conf is
+    // overlapped to the chain JobConf one.
       mapperConf = new JobConf(true);
     }
-
-    // store in the private mapper conf the input/output classes of the mapper
-    // and if it works by value or by reference
+    // store in the private mapper conf if it works by value or by reference
     mapperConf.setBoolean(MAPPER_BY_VALUE, byValue);
-    mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
-    mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass,
-                        Object.class);
-    mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
-    mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
-                        Object.class);
-
-    // serialize the private mapper jobconf in the chain jobconf.
-    Stringifier<JobConf> stringifier =
-      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
-    try {
-      jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
-                  stringifier.toString(new JobConf(mapperConf)));
-    }
-    catch (IOException ioEx) {
-      throw new RuntimeException(ioEx);
-    }
-
-    // increment the chain counter
-    jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
+    
+    setMapperConf(isMap, jobConf, inputKeyClass, inputValueClass,
+	      outputKeyClass, outputValueClass, mapperConf, index, prefix);
   }
 
   /**
@@ -273,13 +146,10 @@
                           Class<? extends V2> outputValueClass,
                           boolean byValue, JobConf reducerConf) {
     String prefix = getPrefix(false);
-
-    if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
-      throw new IllegalStateException("Reducer has been already set");
-    }
+    checkReducerAlreadySet(false, jobConf, prefix, false);
 
     jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
-
+    
     // if the Reducer does not have a private JobConf create an empty one
     if (reducerConf == null) {
       // using a JobConf without defaults to make it lightweight.
@@ -291,24 +161,9 @@
     // store in the private reducer conf the input/output classes of the reducer
     // and if it works by value or by reference
     reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
-    reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
-    reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
-                         Object.class);
-    reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
-                         Object.class);
-    reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
-                         Object.class);
-
-    // serialize the private mapper jobconf in the chain jobconf.
-    Stringifier<JobConf> stringifier =
-      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
-    try {
-      jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
-                  stringifier.toString(new JobConf(reducerConf)));
-    }
-    catch (IOException ioEx) {
-      throw new RuntimeException(ioEx);
-    }
+
+    setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass,
+      outputValueClass, reducerConf, prefix);
   }
 
   /**
@@ -325,8 +180,8 @@
     for (int i = 0; i < index; i++) {
       Class<? extends Mapper> klass =
         jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
-      JobConf mConf =
-        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
+      JobConf mConf = new JobConf(
+        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i));
       Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
       mappers.add(mapper);
 
@@ -343,8 +198,8 @@
     Class<? extends Reducer> klass =
       jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
     if (klass != null) {
-      JobConf rConf =
-        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+      JobConf rConf = new JobConf(
+        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG));
       reducer = ReflectionUtils.newInstance(klass, rConf);
       if (rConf.getBoolean(REDUCER_BY_VALUE, true)) {
         reducerKeySerialization = serializationFactory

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java Tue Jan 26 14:02:53 2010
@@ -86,7 +86,10 @@
  * RunningJob job = jc.submitJob(conf);
  * ...
  * </pre>
+ * @deprecated 
+ * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainMapper} instead
  */
+@Deprecated
 public class ChainMapper implements Mapper {
 
   /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java Tue Jan 26 14:02:53 2010
@@ -86,7 +86,10 @@
  * RunningJob job = jc.submitJob(conf);
  * ...
  * </pre>
+ * @deprecated 
+ * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainReducer} instead
  */
+@Deprecated
 public class ChainReducer implements Reducer {
 
   /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java Tue Jan 26 14:02:53 2010
@@ -46,7 +46,8 @@
  * 
  * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
  * 
- * The map output field list spec is under attribute "mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec".
+ * The map output field list spec is under attribute 
+ * "mapreduce.fieldsel.map.output.key.value.fields.spec".
  * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
  * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
  * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
@@ -57,7 +58,8 @@
  * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
  * and use fields 6,5,1,2,3,7 and above for values.
  * 
- * The reduce output field list spec is under attribute "mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec".
+ * The reduce output field list spec is under attribute 
+ * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
  * 
  * The reducer extracts output key/value pairs in a similar manner, except that
  * the key is never ignored.
@@ -156,13 +158,14 @@
   }
 
   public void configure(JobConf job) {
-    this.fieldSeparator = job.get("mapreduce.fieldsel.data.field.separator", "\t");
-    this.mapOutputKeyValueSpec = job.get("mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec",
-        "0-:");
+    this.fieldSeparator = job.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR,
+        "\t");
+    this.mapOutputKeyValueSpec = job.get(
+        FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0-:");
     this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
         job.getInputFormat().getClass().getCanonicalName());
     this.reduceOutputKeyValueSpec = job.get(
-        "mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec", "0-:");
+        FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, "0-:");
     parseOutputKeyValueSpec();
     LOG.info(specToString());
   }



Mime
View raw message