hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r903227 [10/16] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ conf/ ivy/ src/benchmarks/gridmix/ src/benchmarks/gridmix/javasort/ src/benchmarks/gridmix/maxent/ src/benchmarks/gridmix/monsterQuery/ src/benchmarks/gri...
Date Tue, 26 Jan 2010 14:03:09 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Jan 26 14:02:53 2010
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -40,14 +39,16 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
@@ -63,17 +64,22 @@
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
-import org.apache.hadoop.mapreduce.security.JobTokens;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.fs.FSDataOutputStream;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -101,7 +107,6 @@
   JobStatus status;
   Path jobFile = null;
   Path localJobFile = null;
-  Path localJarFile = null;
 
   TaskInProgress maps[] = new TaskInProgress[0];
   TaskInProgress reduces[] = new TaskInProgress[0];
@@ -141,6 +146,8 @@
   JobPriority priority = JobPriority.NORMAL;
   protected JobTracker jobtracker;
   
+  protected TokenStorage tokenStorage;
+  
   JobHistory jobHistory;
 
   // NetworkTopology Node to the set of TIPs
@@ -298,6 +305,7 @@
     new HashMap<TaskTracker, FallowSlotInfo>();
   private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces = 
     new HashMap<TaskTracker, FallowSlotInfo>();
+  private Path jobSubmitDir = null;
   
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
@@ -341,6 +349,7 @@
     if (tracker != null) { // Some mock tests have null tracker
       this.jobHistory = tracker.getJobHistory();
     }
+    this.tokenStorage = null;
   }
   
   JobInProgress() {
@@ -352,45 +361,52 @@
    * Create a JobInProgress with the given job file, plus a handle
    * to the tracker.
    */
-  public JobInProgress(JobID jobid, JobTracker jobtracker, 
-                       JobConf default_conf, int rCount) throws IOException {
+  public JobInProgress(JobTracker jobtracker, 
+                       JobConf default_conf, int rCount,
+                       JobInfo jobInfo, TokenStorage ts) throws IOException {
     this.restartCount = rCount;
-    this.jobId = jobid;
+    this.jobId = JobID.downgrade(jobInfo.getJobID());
     String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
-        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
+        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + this.jobId;
     this.jobtracker = jobtracker;
     this.jobHistory = jobtracker.getJobHistory();
     this.startTime = System.currentTimeMillis();
     
     this.localFs = jobtracker.getLocalFileSystem();
 
-    JobConf default_job_conf = new JobConf(default_conf);
-    this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR 
-                                                      +"/"+jobid + ".xml");
-    this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
-                                                      +"/"+ jobid + ".jar");
-    Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
-    fs = jobtracker.getFileSystem(jobDir);
-    jobFile = new Path(jobDir, "job.xml");
+    // use the user supplied token to add user credentials to the conf
+    jobSubmitDir = jobInfo.getJobSubmitDir();
+    String user = jobInfo.getUser().toString();
+    conf = new JobConf();
+    conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, 
+        new UnixUserGroupInformation(user, 
+            new String[]{UnixUserGroupInformation.DEFAULT_GROUP}).toString());
+    fs = jobSubmitDir.getFileSystem(conf);
+    
+    this.localJobFile = 
+      default_conf.getLocalPath(JobTracker.SUBDIR + "/" + this.jobId + ".xml");
+    
+    jobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
     fs.copyToLocalFile(jobFile, localJobFile);
     conf = new JobConf(localJobFile);
+    if (conf.getUser() == null) {
+      this.conf.setUser(user);
+    }
+    if (!conf.getUser().equals(user)) {
+      throw new IOException("The username obtained from the conf doesn't " +
+      		"match the username the user authenticated as");
+    }
     this.priority = conf.getJobPriority();
-    this.profile = new JobProfile(conf.getUser(), jobid, 
-                                  jobFile.toString(), url, conf.getJobName(),
-                                  conf.getQueueName());
-    this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP, 
+    this.profile = new JobProfile(conf.getUser(), this.jobId, 
+                                  jobFile.toString(), url, 
+                                  conf.getJobName(), conf.getQueueName());
+    this.status = new JobStatus(this.jobId, 0.0f, 0.0f, JobStatus.PREP, 
         profile.getUser(), profile.getJobName(), profile.getJobFile(), 
         profile.getURL().toString());
-    this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
+    this.jobtracker.getInstrumentation().addPrepJob(conf, this.jobId);
     status.setStartTime(startTime);
     this.status.setJobPriority(this.priority);
 
-    String jarFile = conf.getJar();
-    if (jarFile != null) {
-      fs.copyToLocalFile(new Path(jarFile), localJarFile);
-      conf.setJar(localJarFile.toString());
-    }
-
     this.numMapTasks = conf.getNumMapTasks();
     this.numReduceTasks = conf.getNumReduceTasks();
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
@@ -406,7 +422,7 @@
     this.jobMetrics.setTag("user", conf.getUser());
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
-    this.jobMetrics.setTag("jobId", jobid.toString());
+    this.jobMetrics.setTag("jobId", this.jobId.toString());
     hasSpeculativeMaps = conf.getMapSpeculativeExecution();
     hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
     this.maxLevel = jobtracker.getNumTaskCacheLevels();
@@ -429,6 +445,7 @@
         JobContext.SPECULATIVECAP,0.1f);
     this.slowNodeThreshold = conf.getFloat(
         JobContext.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
+    this.tokenStorage = ts;
 
   }
 
@@ -473,7 +490,7 @@
   }
   
   Map<Node, List<TaskInProgress>> createCache(
-                         Job.RawSplit[] splits, int maxLevel) {
+                         TaskSplitMetaInfo[] splits, int maxLevel) {
     Map<Node, List<TaskInProgress>> cache = 
       new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
     
@@ -581,31 +598,30 @@
     //
     // generate security keys needed by Tasks
     //
-    generateJobTokens(jobtracker.getSystemDirectoryForJob(jobId));
+    generateAndStoreTokens();
     
     //
     // read input splits and create a map per a split
     //
-    String jobFile = profile.getJobFile();
-
-    Job.RawSplit[] splits = createSplits();
-    numMapTasks = splits.length;
+    TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
+    numMapTasks = taskSplitMetaInfo.length;
 
     checkTaskLimits();
 
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
 
-    createMapTasks(jobFile, splits);
+    createMapTasks(jobFile.toString(), taskSplitMetaInfo);
     
     if (numMapTasks > 0) { 
-      nonRunningMapCache = createCache(splits, maxLevel);
+      nonRunningMapCache = createCache(taskSplitMetaInfo,
+          maxLevel);
     }
         
     // set the launch time
     this.launchTime = JobTracker.getClock().getTime();
 
-    createReduceTasks(jobFile);
+    createReduceTasks(jobFile.toString());
     
     // Calculate the minimum number of maps to be complete before 
     // we should start scheduling reduces
@@ -615,7 +631,7 @@
                          DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
            numMapTasks));
     
-    initSetupCleanupTasks(jobFile);
+    initSetupCleanupTasks(jobFile.toString());
     
     synchronized(jobInitKillStatus){
       jobInitKillStatus.initDone = true;
@@ -633,6 +649,9 @@
     
     jobHistory.logEvent(jie, jobId);
    
+    // Log the number of map and reduce tasks
+    LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks 
+             + " map tasks and " + numReduceTasks + " reduce tasks.");
   }
 
   // Returns true if the job is empty (0 maps, 0 reduces and no setup-cleanup)
@@ -669,16 +688,11 @@
     
   }
 
-  Job.RawSplit[] createSplits() throws IOException {
-    DataInputStream splitFile =
-      fs.open(new Path(conf.get(JobContext.SPLIT_FILE)));
-    Job.RawSplit[] splits;
-    try {
-      splits = Job.readSplitFile(splitFile);
-    } finally {
-      splitFile.close();
-    }
-    return splits;
+  TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId) 
+  throws IOException {
+    TaskSplitMetaInfo[] allTaskSplitMetaInfo = 
+      SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf, jobSubmitDir);
+    return allTaskSplitMetaInfo;
   }
 
   /**
@@ -695,10 +709,11 @@
     }
   }
 
-  synchronized void createMapTasks(String jobFile, Job.RawSplit[] splits) {
+  synchronized void createMapTasks(String jobFile, 
+		  TaskSplitMetaInfo[] splits) {
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
-      inputLength += splits[i].getDataLength();
+      inputLength += splits[i].getInputDataLength();
       maps[i] = new TaskInProgress(jobId, jobFile, 
                                    splits[i], 
                                    jobtracker, conf, this, 
@@ -720,8 +735,10 @@
     }
   }
 
+  
   synchronized void initSetupCleanupTasks(String jobFile) {
     if (!jobSetupCleanupNeeded) {
+      LOG.info("Setup/Cleanup not needed for job " + jobId);
       // nothing to initialize
       return;
     }
@@ -730,7 +747,7 @@
 
     // cleanup map tip. This map doesn't use any splits. Just assign an empty
     // split.
-    Job.RawSplit emptySplit = new Job.RawSplit();
+    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
     cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
             jobtracker, conf, this, numMapTasks, 1);
     cleanup[0].setJobCleanupTask();
@@ -870,36 +887,42 @@
     return launchedSetup;
   }
 
-  /**
-   * Get the list of map tasks
-   * @return the raw array of maps for this job
-   */
-  TaskInProgress[] getMapTasks() {
-    return maps;
-  }
-    
-  /**
-   * Get the list of cleanup tasks
-   * @return the array of cleanup tasks for the job
-   */
-  TaskInProgress[] getCleanupTasks() {
-    return cleanup;
-  }
-  
-  /**
-   * Get the list of setup tasks
-   * @return the array of setup tasks for the job
-   */
-  TaskInProgress[] getSetupTasks() {
-    return setup;
-  }
-  
-  /**
-   * Get the list of reduce tasks
-   * @return the raw array of reduce tasks for this job
-   */
-  TaskInProgress[] getReduceTasks() {
-    return reduces;
+  /** 
+   * Get all the tasks of the desired type in this job.
+   * @param type {@link TaskType} of the tasks required
+   * @return An array of {@link TaskInProgress} matching the given type. 
+   *         Returns an empty array if no tasks are found for the given type.  
+   */
+  TaskInProgress[] getTasks(TaskType type) {
+    TaskInProgress[] tasks = null;
+    switch (type) {
+      case MAP:
+      {
+        tasks = maps;
+      }
+      break;
+      case REDUCE:
+      {
+        tasks = reduces;
+      }
+      break;
+      case JOB_SETUP: 
+      {
+        tasks = setup;
+      }
+      break;
+      case JOB_CLEANUP:
+      {
+        tasks = cleanup;
+      }
+      break;
+      default:
+      {
+          tasks = new TaskInProgress[0];
+      }
+      break;
+    }
+    return tasks;
   }
 
   /**
@@ -1003,6 +1026,8 @@
     boolean wasComplete = tip.isComplete();
     boolean wasPending = tip.isOnlyCommitPending();
     TaskAttemptID taskid = status.getTaskID();
+    boolean wasAttemptRunning = tip.isAttemptRunning(taskid);
+
     
     // If the TIP is already completed and the task reports as SUCCEEDED then 
     // mark the task as KILLED.
@@ -1099,7 +1124,7 @@
         
         // Tell the job to fail the relevant task
         failedTask(tip, taskid, status, taskTracker,
-                   wasRunning, wasComplete);
+                   wasRunning, wasComplete, wasAttemptRunning);
 
         // Did the task failure lead to tip failure?
         TaskCompletionEvent.Status taskCompletionStatus = 
@@ -2950,11 +2975,10 @@
    * we need to schedule reexecution so that downstream reduce tasks can 
    * obtain the map task's output.
    */
-  private void failedTask(TaskInProgress tip, TaskAttemptID taskid, 
-                          TaskStatus status, 
-                          TaskTracker taskTracker,
-                          boolean wasRunning, boolean wasComplete) {
-    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
+  private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
+                          TaskStatus status,
+                          TaskTracker taskTracker, boolean wasRunning,
+                          boolean wasComplete, boolean wasAttemptRunning) {
     // check if the TIP is already failed
     boolean wasFailed = tip.isFailed();
     boolean wasSpeculating = tip.isSpeculating();
@@ -2965,6 +2989,25 @@
    
     boolean isRunning = tip.isRunning();
     boolean isComplete = tip.isComplete();
+
+    if(wasAttemptRunning) {
+      // We are decrementing counters without looking for isRunning ,
+      // because we increment the counters when we obtain
+      // new map task attempt or reduce task attempt.We do not really check
+      // for tip being running.
+      // Whenever we obtain new task attempt runningMapTasks incremented.
+      // hence we are decrementing the same.      
+      if(!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
+        if(tip.isMapTask()) {
+          runningMapTasks -= 1;
+        } else {
+          runningReduceTasks -= 1;
+        }
+      }
+      
+      // Metering
+      meterTaskAttempt(tip, status);
+    }
         
     //update running  count on task failure.
     if (wasRunning && !isRunning) {
@@ -2973,8 +3016,6 @@
       } else if (tip.isJobSetupTask()) {
         launchedSetup = false;
       } else if (tip.isMapTask()) {
-        runningMapTasks -= 1;
-        metrics.failedMap(taskid);
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
         if (!isComplete) {
@@ -2982,8 +3023,6 @@
           failMap(tip);
         }
       } else {
-        runningReduceTasks -= 1;
-        metrics.failedReduce(taskid);
         // remove from the running queue and put in the failed queue if the tip
         // is not complete
         if (!isComplete) {
@@ -2991,9 +3030,6 @@
           failReduce(tip);
         }
       }
-      
-      // Metering
-      meterTaskAttempt(tip, status);
     }
         
     // The case when the map was complete but the task tracker went down.
@@ -3203,22 +3239,10 @@
         localFs.delete(localJobFile, true);
         localJobFile = null;
       }
-      if (localJarFile != null) {
-        localFs.delete(localJarFile, true);
-        localJarFile = null;
-      }
 
-      // clean up splits
-      for (int i = 0; i < maps.length; i++) {
-        maps[i].clearSplit();
-      }
-
-      // JobClient always creates a new directory with job files
-      // so we remove that directory to cleanup
-      // Delete temp dfs dirs created if any, like in case of 
-      // speculative exn of reduces.  
       Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
-      new CleanupQueue().addToQueue(jobtracker.getFileSystem(tempDir), tempDir); 
+      new CleanupQueue().addToQueue(new PathDeletionContext(
+          jobtracker.getFileSystem(tempDir), tempDir.toUri().getPath())); 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }
@@ -3448,11 +3472,11 @@
                "submitTime" + EQUALS + job.getStartTime() + StringUtils.COMMA +
                "launchTime" + EQUALS + job.getLaunchTime() + StringUtils.COMMA +
                "finishTime" + EQUALS + job.getFinishTime() + StringUtils.COMMA +
-               "numMaps" + EQUALS + job.getMapTasks().length + 
+               "numMaps" + EQUALS + job.getTasks(TaskType.MAP).length + 
                            StringUtils.COMMA +
                "numSlotsPerMap" + EQUALS + job.getNumSlotsPerMap() + 
                                   StringUtils.COMMA +
-               "numReduces" + EQUALS + job.getReduceTasks().length + 
+               "numReduces" + EQUALS + job.getTasks(TaskType.REDUCE).length + 
                               StringUtils.COMMA +
                "numSlotsPerReduce" + EQUALS + job.getNumSlotsPerReduce() + 
                                      StringUtils.COMMA +
@@ -3515,28 +3539,33 @@
   }
   
   /**
-   * generate keys and save it into the file
-   * @param jobDir
+   * generate job token and save it into the file
    * @throws IOException
    */
-  private void generateJobTokens(Path jobDir) throws IOException{
-    Path keysFile = new Path(jobDir, JobTokens.JOB_TOKEN_FILENAME);
-    FSDataOutputStream os = fs.create(keysFile);
-    //create JobTokens file and add key to it
-    JobTokens jt = new JobTokens();
-    byte [] key;
-    try {
-      // new key
-      key = SecureShuffleUtils.getNewEncodedKey();
-    } catch (java.security.GeneralSecurityException e) {
-      throw new IOException(e);
-    }
-    // remember the key 
-    jt.setShuffleJobToken(key);
-    // other keys..
-    jt.write(os);
+  private void generateAndStoreTokens() throws IOException{
+    Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
+    Path keysFile = new Path(jobDir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
+    // we need to create this file using the jobtracker's filesystem
+    FSDataOutputStream os = jobtracker.getFileSystem().create(keysFile);
+    
+    //create JobToken file and write token to it
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
+        .toString()));
+    Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(identifier,
+        jobtracker.getJobTokenSecretManager());
+    token.setService(identifier.getJobId());
+    
+    // add this token to the tokenStorage
+    if(tokenStorage == null)
+      tokenStorage = new TokenStorage();
+    
+    tokenStorage.setJobToken(token);
+    
+    // write TokenStorage out
+    tokenStorage.write(os);
     os.close();
-    LOG.debug("jobTokens generated and stored in "+ keysFile.toUri().getPath());
+    LOG.info("jobToken generated and stored with users keys in "
+        + keysFile.toUri().getPath());
   }
 
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueClient.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueClient.java Tue Jan 26 14:02:53 2010
@@ -175,6 +175,11 @@
   private void displayQueueInfo(String queue, boolean showJobs)
       throws IOException {
     JobQueueInfo jobQueueInfo = jc.getQueueInfo(queue);
+    
+    if (jobQueueInfo == null) {
+      System.out.println("Queue \"" + queue + "\" does not exist.");
+      return;
+    }
     printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out));
     if (showJobs && (jobQueueInfo.getChildren() == null ||
         jobQueueInfo.getChildren().size() == 0)) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Jan 26 14:02:53 2010
@@ -22,11 +22,9 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
 import java.io.Writer;
 import java.net.BindException;
 import java.net.InetSocketAddress;
-import java.net.URLEncoder;
 import java.net.UnknownHostException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -64,6 +62,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -76,9 +75,14 @@
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -87,6 +91,7 @@
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.PermissionChecker;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -100,8 +105,6 @@
 import org.apache.hadoop.util.Service;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.util.ConfigUtil;
 
 /*******************************************************
  * JobTracker is the central location for submitting and 
@@ -110,7 +113,7 @@
  *******************************************************/
 public class JobTracker extends Service 
     implements MRConstants, InterTrackerProtocol,
-    ClientProtocol, TaskTrackerManager,
+    ClientProtocol, TaskTrackerManager, RefreshUserToGroupMappingsProtocol,
     RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol, JTConfig {
 
   static{
@@ -145,6 +148,8 @@
   
   public static enum State { INITIALIZING, RUNNING }
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
+  
+  static final String JOB_INFO_FILE = "job-info";
 
   private DNSToSwitchMapping dnsToSwitchMapping;
   NetworkTopology clusterMap = new NetworkTopology();
@@ -154,9 +159,9 @@
   private final List<JobInProgressListener> jobInProgressListeners =
     new CopyOnWriteArrayList<JobInProgressListener>();
 
-  // system directories are world-wide readable and owner readable
+  // system directory is completely owned by the JobTracker
   final static FsPermission SYSTEM_DIR_PERMISSION =
-    FsPermission.createImmutable((short) 0733); // rwx-wx-wx
+    FsPermission.createImmutable((short) 0700); // rwx------
 
   // system files should have 700 permission
   final static FsPermission SYSTEM_FILE_PERMISSION =
@@ -167,7 +172,18 @@
   static final Clock DEFAULT_CLOCK = new Clock();
 
   private JobHistory jobHistory;
+  
+  private final JobTokenSecretManager jobTokenSecretManager 
+    = new JobTokenSecretManager();
+  
+  JobTokenSecretManager getJobTokenSecretManager() {
+    return jobTokenSecretManager;
+  }
 
+  private MRAsyncDiskService asyncDiskService;
+  
+  private String defaultStagingBaseDir;
+  
   /**
    * A client tried to submit a job before the Job Tracker was ready.
    */
@@ -269,6 +285,8 @@
       return RefreshAuthorizationPolicyProtocol.versionID;
     } else if (protocol.equals(AdminOperationsProtocol.class.getName())){
       return AdminOperationsProtocol.versionID;
+    } else if (protocol.equals(RefreshUserToGroupMappingsProtocol.class.getName())){
+      return RefreshUserToGroupMappingsProtocol.versionID;
     } else {
       throw new IOException("Unknown protocol to job tracker: " + protocol);
     }
@@ -441,6 +459,7 @@
     }
   }
 
+  // Assumes JobTracker, taskTrackers and trackerExpiryQueue are locked on entry
   private void removeTracker(TaskTracker tracker) {
     lostTaskTracker(tracker);
     String trackerName = tracker.getStatus().getTrackerName();
@@ -460,19 +479,10 @@
       if (job != null) {
         JobStatus status = job.getStatus();
         
-        //set the historyfile and update the tracking url
-        String trackingUrl = "";
+        //set the historyfile
         if (historyFile != null) {
           status.setHistoryFile(historyFile);
-          try {
-            trackingUrl = "http://" + getJobTrackerMachine() + ":" + 
-              getInfoPort() + "/jobdetailshistory.jsp?jobid=" + 
-              jobid + "&logFile=" + URLEncoder.encode(historyFile, "UTF-8");
-          } catch (UnsupportedEncodingException e) {
-            LOG.warn("Could not create trackingUrl", e);
-          }
         }
-        status.setTrackingUrl(trackingUrl);
         // clean up job files from the local disk
         job.cleanupLocalizedJobConf(job.getProfile().getJobID());
 
@@ -639,6 +649,7 @@
      * Increments faults(blacklist by job) for the tracker by one.
      * 
      * Adds the tracker to the potentially faulty list. 
+     * Assumes JobTracker is locked on the entry.
      * 
      * @param hostName 
      */
@@ -729,13 +740,17 @@
       }
     }
     
+    // Assumes JobTracker is locked on entry.
     private FaultInfo getFaultInfo(String hostName, 
         boolean createIfNeccessary) {
-      FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
-      long now = clock.getTime();
-      if (fi == null && createIfNeccessary) {
-        fi = new FaultInfo(now);
-        potentiallyFaultyTrackers.put(hostName, fi);
+      FaultInfo fi = null;
+      synchronized (potentiallyFaultyTrackers) {
+        fi = potentiallyFaultyTrackers.get(hostName);
+        long now = clock.getTime();
+        if (fi == null && createIfNeccessary) {
+          fi = new FaultInfo(now);
+          potentiallyFaultyTrackers.put(hostName, fi);
+        }
       }
       return fi;
     }
@@ -773,6 +788,8 @@
      * Removes the tracker from blacklist and
      * from potentially faulty list, when it is restarted.
      * 
+     * Assumes JobTracker is locked on the entry.
+     * 
      * @param hostName
      */
     void markTrackerHealthy(String hostName) {
@@ -791,6 +808,7 @@
      * One fault of the tracker is discarded if there
      * are no faults during one day. So, the tracker will get a 
      * chance again to run tasks of a job.
+     * Assumes JobTracker is locked on the entry.
      * 
      * @param hostName The tracker name
      * @param now The current time
@@ -819,17 +837,21 @@
     private void removeHostCapacity(String hostName) {
       synchronized (taskTrackers) {
         // remove the capacity of trackers on this host
+        int numTrackersOnHost = 0;
         for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
           int mapSlots = status.getMaxMapSlots();
           totalMapTaskCapacity -= mapSlots;
           int reduceSlots = status.getMaxReduceSlots();
           totalReduceTaskCapacity -= reduceSlots;
+          ++numTrackersOnHost;
           getInstrumentation().addBlackListedMapSlots(
               mapSlots);
           getInstrumentation().addBlackListedReduceSlots(
               reduceSlots);
         }
-        incrBlackListedTrackers(uniqueHostsMap.remove(hostName));
+        // remove the host
+        uniqueHostsMap.remove(hostName);
+        incrBlackListedTrackers(numTrackersOnHost);
       }
     }
     
@@ -856,6 +878,7 @@
     /**
      * Whether a host is blacklisted across all the jobs. 
      * 
+     * Assumes JobTracker is locked on the entry.
      * @param hostName
      * @return
      */
@@ -869,6 +892,7 @@
       return false;
     }
     
+    // Assumes JobTracker is locked on the entry.
     int getFaultCount(String hostName) {
       synchronized (potentiallyFaultyTrackers) {
         FaultInfo fi = null;
@@ -879,6 +903,7 @@
       return 0;
     }
     
+    // Assumes JobTracker is locked on the entry.
     Set<ReasonForBlackListing> getReasonForBlackListing(String hostName) {
       synchronized (potentiallyFaultyTrackers) {
         FaultInfo fi = null;
@@ -890,6 +915,7 @@
     }
 
 
+    // Assumes JobTracker is locked on the entry.
     void setNodeHealthStatus(String hostName, boolean isHealthy, String reason) {
       FaultInfo fi = null;
       // If tracker is not healthy, create a fault info object
@@ -947,6 +973,7 @@
   /**
    * Get all task tracker statuses on given host
    * 
+   * Assumes JobTracker is locked on the entry
    * @param hostName
    * @return {@link java.util.List} of {@link TaskTrackerStatus}
    */
@@ -996,36 +1023,10 @@
       return jobsToRecover;
     }
 
-    /** Check if the given string represents a job-id or not 
-     */
-    private boolean isJobNameValid(String str) {
-      if(str == null) {
-        return false;
-      }
-      String[] parts = str.split("_");
-      if(parts.length == 3) {
-        if(parts[0].equals("job")) {
-            // other 2 parts should be parseable
-            return JobTracker.validateIdentifier(parts[1])
-                   && JobTracker.validateJobNumber(parts[2]);
-        }
-      }
-      return false;
-    }
-    
-    // checks if the job dir has the required files
-    public void checkAndAddJob(FileStatus status) throws IOException {
-      String fileName = status.getPath().getName();
-      if (isJobNameValid(fileName)) {
-        if (JobClient.isJobDirValid(status.getPath(), fs)) {
-          recoveryManager.addJobForRecovery(JobID.forName(fileName));
-          shouldRecover = true; // enable actual recovery if num-files > 1
-        } else {
-          LOG.info("Found an incomplete job directory " + fileName + "." 
-                   + " Deleting it!!");
-          fs.delete(status.getPath(), true);
-        }
-      }
+    // add the job
+    void addJobForRecovery(FileStatus status) throws IOException {
+      recoveryManager.addJobForRecovery(JobID.forName(status.getPath().getName()));
+      shouldRecover = true; // enable actual recovery if num-files > 1
     }
 
    
@@ -1132,7 +1133,16 @@
       for (JobID jobId : jobsToRecover) {
         LOG.info("Submitting job "+ jobId);
         try {
-          submitJob(jobId, restartCount);
+          Path jobInfoFile = getSystemFileForJob(jobId);
+          FSDataInputStream in = fs.open(jobInfoFile);
+          JobInfo token = new JobInfo();
+          token.readFields(in);
+          in.close();
+          UnixUserGroupInformation ugi = new UnixUserGroupInformation(
+              token.getUser().toString(), 
+              new String[]{UnixUserGroupInformation.DEFAULT_GROUP});
+          submitJob(token.getJobID(), restartCount, 
+              ugi, token.getJobSubmitDir().toString(), true, null);
           recovered++;
         } catch (Exception e) {
           LOG.warn("Could not recover job " + jobId, e);
@@ -1300,6 +1310,26 @@
 
   private final QueueManager queueManager;
 
+  //TO BE USED BY TEST CLASSES ONLY
+  //ONLY BUILD THE STATE WHICH IS REQUIRED BY TESTS
+  JobTracker() {
+    hostsReader = null;
+    retiredJobsCacheSize = 0;
+    infoServer = null;
+    queueManager = null;
+    supergroup = null;
+    taskScheduler = null;
+    trackerIdentifier = null;
+    recoveryManager = null;
+    jobHistory = null;
+    completedJobStatusStore = null;
+    tasktrackerExpiryInterval = 0;
+    myInstrumentation = new JobTrackerMetricsInst(this, new JobConf());
+    mrOwner = null;
+    defaultStagingBaseDir = "/Users"; 
+  }
+
+  
   JobTracker(JobConf conf) 
   throws IOException,InterruptedException, LoginException {
     this(conf, new Clock());
@@ -1482,6 +1512,18 @@
         if(systemDir == null) {
           systemDir = new Path(getSystemDir());    
         }
+        try {
+          FileStatus systemDirStatus = fs.getFileStatus(systemDir);
+          if (!systemDirStatus.getOwner().equals(mrOwner.getUserName())) {
+            throw new AccessControlException("The systemdir " + systemDir + 
+                " is not owned by " + mrOwner.getUserName());
+          }
+          if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
+            LOG.warn("Incorrect permissions on " + systemDir + 
+                ". Setting it to " + SYSTEM_DIR_PERMISSION);
+            fs.setPermission(systemDir, SYSTEM_DIR_PERMISSION);
+          }
+        } catch (FileNotFoundException fnf) {} //ignore
         // Make sure that the backup data is preserved
         FileStatus[] systemDirData;
         try {
@@ -1496,7 +1538,7 @@
             && systemDirData != null) {
           for (FileStatus status : systemDirData) {
             try {
-              recoveryManager.checkAndAddJob(status);
+              recoveryManager.addJobForRecovery(status);
             } catch (Throwable t) {
               LOG.warn("Failed to add the job " + status.getPath().getName(), 
                        t);
@@ -1539,7 +1581,8 @@
     }
     
     // Same with 'localDir' except it's always on the local disk.
-    jobConf.deleteLocalFiles(SUBDIR);
+    asyncDiskService = new MRAsyncDiskService(FileSystem.getLocal(conf), conf.getLocalDirs());
+    asyncDiskService.moveAndDeleteFromEachVolume(SUBDIR);
 
     // Initialize history DONE folder
     jobHistory.initDone(conf, fs);
@@ -1561,6 +1604,8 @@
     synchronized (this) {
       completedJobStatusStore = new CompletedJobStatusStore(conf);
     }
+    Path homeDir = fs.getHomeDirectory();
+    defaultStagingBaseDir = homeDir.getParent().toString();
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -1860,8 +1905,7 @@
   // and TaskInProgress
   ///////////////////////////////////////////////////////
   void createTaskEntry(TaskAttemptID taskid, String taskTracker, TaskInProgress tip) {
-    LOG.info("Adding task " + 
-      (tip.isCleanupAttempt(taskid) ? "(cleanup)" : "") + 
+    LOG.info("Adding task (" + tip.getAttemptType(taskid) + ") " + 
       "'"  + taskid + "' to tip " + 
       tip.getTIPId() + ", for tracker '" + taskTracker + "'");
 
@@ -1894,9 +1938,10 @@
     }
 
     // taskid --> TIP
-    taskidToTIPMap.remove(taskid);
-        
-    LOG.debug("Removing task '" + taskid + "'");
+    if (taskidToTIPMap.remove(taskid) != null) {
+      // log the task removal in case of success
+      LOG.info("Removing task '" + taskid + "'");
+    }
   }
     
   /**
@@ -1925,7 +1970,7 @@
    * @param job the completed job
    */
   void markCompletedJob(JobInProgress job) {
-    for (TaskInProgress tip : job.getSetupTasks()) {
+    for (TaskInProgress tip : job.getTasks(TaskType.JOB_SETUP)) {
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
@@ -1935,7 +1980,7 @@
         }
       }
     }
-    for (TaskInProgress tip : job.getMapTasks()) {
+    for (TaskInProgress tip : job.getTasks(TaskType.MAP)) {
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
@@ -1947,7 +1992,7 @@
         }
       }
     }
-    for (TaskInProgress tip : job.getReduceTasks()) {
+    for (TaskInProgress tip : job.getTasks(TaskType.REDUCE)) {
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
@@ -1975,8 +2020,10 @@
     if (markedTaskSet != null) {
       for (TaskAttemptID taskid : markedTaskSet) {
         removeTaskEntry(taskid);
-        LOG.info("Removed completed task '" + taskid + "' from '" + 
-                 taskTracker + "'");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Removed marked completed task '" + taskid + "' from '" + 
+                    taskTracker + "'");
+        }
       }
       // Clear
       trackerToMarkedTasksMap.remove(taskTracker);
@@ -1990,15 +2037,16 @@
    * 
    * @param job the job about to be 'retired'
    */
-  synchronized private void removeJobTasks(JobInProgress job) { 
-    for (TaskInProgress tip : job.getMapTasks()) {
-      for (TaskStatus taskStatus : tip.getTaskStatuses()) {
-        removeTaskEntry(taskStatus.getTaskID());
-      }
-    }
-    for (TaskInProgress tip : job.getReduceTasks()) {
-      for (TaskStatus taskStatus : tip.getTaskStatuses()) {
-        removeTaskEntry(taskStatus.getTaskID());
+  synchronized void removeJobTasks(JobInProgress job) { 
+    // iterate over all the task types
+    for (TaskType type : TaskType.values()) {
+      // iterate over all the tips of the type under consideration
+      for (TaskInProgress tip : job.getTasks(type)) {
+        // iterate over all the task-ids in the tip under consideration
+        for (TaskAttemptID id : tip.getAllTaskAttemptIDs()) {
+          // remove the task-id entry from the jobtracker
+          removeTaskEntry(id);
+        }
       }
     }
   }
@@ -2134,7 +2182,8 @@
    * 
    * @return {@link Collection} of {@link TaskTrackerStatus} 
    */
-  public Collection<TaskTrackerStatus> taskTrackers() {
+  // lock to taskTrackers should hold JT lock first.
+  public synchronized Collection<TaskTrackerStatus> taskTrackers() {
     Collection<TaskTrackerStatus> ttStatuses;
     synchronized (taskTrackers) {
       ttStatuses = 
@@ -2151,7 +2200,10 @@
    *  
    * @return {@link Collection} of active {@link TaskTrackerStatus} 
    */
-  public Collection<TaskTrackerStatus> activeTaskTrackers() {
+  // This method is synchronized to make sure that the locking order 
+  // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers 
+  // lock" is under JobTracker lock to avoid deadlocks.
+  synchronized public Collection<TaskTrackerStatus> activeTaskTrackers() {
     Collection<TaskTrackerStatus> activeTrackers = 
       new ArrayList<TaskTrackerStatus>();
     synchronized (taskTrackers) {
@@ -2171,7 +2223,10 @@
    * The second element in the returned list contains the list of blacklisted
    * tracker names. 
    */
-  public List<List<String>> taskTrackerNames() {
+  // This method is synchronized to make sure that the locking order 
+  // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers 
+  // lock" is under JobTracker lock to avoid deadlocks.
+  synchronized public List<List<String>> taskTrackerNames() {
     List<String> activeTrackers = 
       new ArrayList<String>();
     List<String> blacklistedTrackers = 
@@ -2197,7 +2252,10 @@
    *  
    * @return {@link Collection} of blacklisted {@link TaskTrackerStatus} 
    */
-  public Collection<TaskTrackerStatus> blacklistedTaskTrackers() {
+  // This method is synchronized to make sure that the locking order 
+  // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers 
+  // lock" is under JobTracker lock to avoid deadlocks.
+  synchronized public Collection<TaskTrackerStatus> blacklistedTaskTrackers() {
     Collection<TaskTrackerStatus> blacklistedTrackers = 
       new ArrayList<TaskTrackerStatus>();
     synchronized (taskTrackers) {
@@ -2211,7 +2269,7 @@
     return blacklistedTrackers;
   }
 
-  int getFaultCount(String hostName) {
+  synchronized int getFaultCount(String hostName) {
     return faultyTrackers.getFaultCount(hostName);
   }
   
@@ -2231,7 +2289,7 @@
    * 
    * @return true if blacklisted, false otherwise
    */
-  public boolean isBlacklisted(String trackerID) {
+  synchronized public boolean isBlacklisted(String trackerID) {
     TaskTrackerStatus status = getTaskTrackerStatus(trackerID);
     if (status != null) {
       return faultyTrackers.isBlacklisted(status.getHost());
@@ -2239,7 +2297,8 @@
     return false;
   }
   
-  public TaskTrackerStatus getTaskTrackerStatus(String trackerID) {
+  // lock to taskTrackers should hold JT lock first.
+  synchronized public TaskTrackerStatus getTaskTrackerStatus(String trackerID) {
     TaskTracker taskTracker;
     synchronized (taskTrackers) {
       taskTracker = taskTrackers.get(trackerID);
@@ -2247,7 +2306,8 @@
     return (taskTracker == null) ? null : taskTracker.getStatus();
   }
 
-  public TaskTracker getTaskTracker(String trackerID) {
+  // lock to taskTrackers should hold JT lock first.
+  synchronized public TaskTracker getTaskTracker(String trackerID) {
     synchronized (taskTrackers) {
       return taskTrackers.get(trackerID);
     }
@@ -2260,7 +2320,7 @@
    * Adds a new node to the jobtracker. It involves adding it to the expiry
    * thread and adding it for resolution
    * 
-   * Assuming trackerExpiryQueue is locked on entry
+   * Assumes JobTracker, taskTrackers and trackerExpiryQueue are locked on entry
    * 
    * @param status Task Tracker's status
    */
@@ -2596,12 +2656,14 @@
         taskTrackers.remove(trackerName);
         Integer numTaskTrackersInHost = 
           uniqueHostsMap.get(oldStatus.getHost());
-        numTaskTrackersInHost --;
-        if (numTaskTrackersInHost > 0)  {
-          uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost);
-        }
-        else {
-          uniqueHostsMap.remove(oldStatus.getHost());
+        if (numTaskTrackersInHost != null) {
+          numTaskTrackersInHost --;
+          if (numTaskTrackersInHost > 0)  {
+            uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost);
+          }
+          else {
+            uniqueHostsMap.remove(oldStatus.getHost());
+          }
         }
       }
     }
@@ -2987,8 +3049,9 @@
    * the JobTracker alone.
    */
   public synchronized org.apache.hadoop.mapreduce.JobStatus submitJob(
-      org.apache.hadoop.mapreduce.JobID jobId) throws IOException {
-    return submitJob(JobID.downgrade(jobId));
+    org.apache.hadoop.mapreduce.JobID jobId,String jobSubmitDir, TokenStorage ts) 
+  throws IOException {  
+    return submitJob(JobID.downgrade(jobId), jobSubmitDir, ts);
   }
   
   /**
@@ -2999,46 +3062,53 @@
    * of the JobTracker.  But JobInProgress adds info that's useful for
    * the JobTracker alone.
    * @deprecated Use 
-   * {@link #submitJob(org.apache.hadoop.mapreduce.JobID)} instead
+   * {@link #submitJob(org.apache.hadoop.mapreduce.JobID, String, TokenStorage)}
+   *  instead
    */
   @Deprecated
-  public synchronized JobStatus submitJob(JobID jobId) throws IOException {
-    verifyServiceState(ServiceState.LIVE);
-    return submitJob(jobId, 0);
+  public synchronized JobStatus submitJob(
+      JobID jobId, String jobSubmitDir, TokenStorage ts) 
+  throws IOException {
+    return submitJob(jobId, 0, 
+        UserGroupInformation.getCurrentUGI(), 
+        jobSubmitDir, false, ts);
   }
 
   /**
    * Submits either a new job or a job from an earlier run.
    */
-  private synchronized JobStatus submitJob(JobID jobId, 
-      int restartCount) throws IOException {
+  private synchronized JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobID, 
+      int restartCount, UserGroupInformation ugi, String jobSubmitDir, 
+      boolean recovered, TokenStorage ts) throws IOException {
+    verifyServiceState(ServiceState.LIVE);
+    JobID jobId = JobID.downgrade(jobID);
     if(jobs.containsKey(jobId)) {
       //job already running, don't start twice
       return jobs.get(jobId).getStatus();
     }
-    
-    JobInProgress job = new JobInProgress(jobId, this, this.conf, restartCount);
+
+    //the conversion from String to Text for the UGI's username will
+    //not be required when we have the UGI to return us the username as
+    //Text.
+    JobInfo jobInfo = new JobInfo(jobId, new Text(ugi.getUserName()), 
+        new Path(jobSubmitDir));
+    JobInProgress job = 
+      new JobInProgress(this, this.conf, restartCount, jobInfo, ts);
     
     String queue = job.getProfile().getQueueName();
     if(!(queueManager.getLeafQueueNames().contains(queue))) {
-      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw new IOException("Queue \"" + queue + "\" does not exist");        
     }
     
     //check if queue is RUNNING
     if(!queueManager.isRunning(queue)) {
-      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));      
       throw new IOException("Queue \"" + queue + "\" is not running");
     }
     try {
-      // check for access
-      UserGroupInformation ugi =
-        UserGroupInformation.readFrom(job.getJobConf());
       checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi);
     } catch (IOException ioe) {
-       LOG.warn("Access denied for user " + job.getJobConf().getUser() 
-                + ". Ignoring job " + jobId, ioe);
-      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
+      LOG.warn("Access denied for user " + job.getJobConf().getUser() 
+          + ". Ignoring job " + jobId, ioe);
       throw ioe;
     }
 
@@ -3047,11 +3117,19 @@
     try {
       checkMemoryRequirements(job);
     } catch (IOException ioe) {
-      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw ioe;
     }
 
-   return addJob(jobId, job); 
+    if (!recovered) {
+      //Store the information in a file so that the job can be recovered
+      //later (if at all)
+      Path jobDir = getSystemDirectoryForJob(jobId);
+      FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
+      FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
+      jobInfo.write(out);
+      out.close();
+    }
+    return addJob(jobId, job); 
   }
 
   /**
@@ -3076,6 +3154,9 @@
       }
     }
     myInstrumentation.submitJob(job.getJobConf(), jobId);
+    LOG.info("Job " + jobId + " added successfully for user '" 
+             + job.getJobConf().getUser() + "' to queue '" 
+             + job.getJobConf().getQueueName() + "'");
     return job.getStatus();
   }
 
@@ -3726,6 +3807,17 @@
   }
   
   /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
+   */
+  public String getStagingAreaDir() {
+    Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, 
+        defaultStagingBaseDir));
+    String user = UserGroupInformation.getCurrentUGI().getUserName();
+    return fs.makeQualified(new Path(stagingRootDir, 
+                                user+"/.staging")).toString();
+  }
+  
+  /**
    * @see 
    * org.apache.hadoop.mapreduce.protocol.ClientProtocol#getJobHistoryDir()
    */
@@ -3740,10 +3832,15 @@
     return jobs.get(jobid);
   }
 
-  // Get the job directory in system directory
+  //Get the job directory in system directory
   Path getSystemDirectoryForJob(JobID id) {
     return new Path(getSystemDir(), id.toString());
   }
+  
+  //Get the job token file in system directory
+  Path getSystemFileForJob(JobID id) {
+    return new Path(getSystemDirectoryForJob(id)+"/" + JOB_INFO_FILE);
+  }
 
   /**
    * Change the run-time priority of the given job.
@@ -3980,8 +4077,8 @@
           Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
           if (trackers != null) {
             for (TaskTracker tracker : trackers) {
-              LOG.info("Decommission: Losing tracker " + tracker + 
-                       " on host " + host);
+              LOG.info("Decommission: Losing tracker " 
+                       + tracker.getTrackerName() + " on host " + host);
               removeTracker(tracker);
             }
             trackersDecommissioned += trackers.size();
@@ -4105,7 +4202,9 @@
   @Override
   public QueueInfo getQueue(String queue) throws IOException {
     JobQueueInfo jqueue = queueManager.getJobQueueInfo(queue);
-    jqueue.setJobStatuses(getJobsFromQueue(jqueue.getQueueName()));
+    if (jqueue != null) {
+      jqueue.setJobStatuses(getJobsFromQueue(jqueue.getQueueName()));
+    }
     return jqueue;
   }
 
@@ -4221,6 +4320,15 @@
             limitMaxMemForReduceTasks).append(")"));
   }
 
+   
+  @Override
+  public void refreshUserToGroupsMappings(Configuration conf) throws IOException {
+    LOG.info("Refreshing all user-to-groups mappings. Requested by user: " + 
+             UserGroupInformation.getCurrentUGI().getUserName());
+    
+    SecurityUtil.getUserToGroupsMappingService(conf).refresh();
+  }
+  
   private boolean perTaskMemoryConfigurationSetOnJT() {
     if (limitMaxMemForMapTasks == JobConf.DISABLED_MEMORY_LIMIT
         || limitMaxMemForReduceTasks == JobConf.DISABLED_MEMORY_LIMIT
@@ -4273,7 +4381,7 @@
     }
   }
   
-  String getFaultReport(String host) {
+  synchronized String getFaultReport(String host) {
     FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
     if (fi == null) {
       return "";
@@ -4281,7 +4389,7 @@
     return fi.getTrackerFaultReport();
   }
 
-  Set<ReasonForBlackListing> getReasonForBlackList(String host) {
+  synchronized Set<ReasonForBlackListing> getReasonForBlackList(String host) {
     FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
     if (fi == null) {
       return new HashSet<ReasonForBlackListing>();
@@ -4289,7 +4397,7 @@
     return fi.getReasonforblacklisting();
   }
   
-  Collection<BlackListInfo> getBlackListedTrackers() {
+  synchronized Collection<BlackListInfo> getBlackListedTrackers() {
     Collection<BlackListInfo> blackListedTrackers = 
       new ArrayList<BlackListInfo>();
     for(TaskTrackerStatus tracker : blacklistedTaskTrackers()) {
@@ -4314,9 +4422,12 @@
     return blackListedTrackers;
   }
   
-  /** Test method to increment the fault*/
-  
-  void incrementFaults(String hostName) {
+  /** Test method to increment the fault
+   * This method is synchronized to make sure that the locking order 
+   * "faultyTrackers.potentiallyFaultyTrackers lock followed by taskTrackers 
+   * lock" is under JobTracker lock to avoid deadlocks.
+   */
+  synchronized void incrementFaults(String hostName) {
     faultyTrackers.incrementFaults(hostName);
   }
 
@@ -4419,6 +4530,8 @@
 
     //initializes the job status store
     completedJobStatusStore = new CompletedJobStatusStore(conf);
+    Path homeDir = fs.getHomeDirectory();
+    defaultStagingBaseDir = homeDir.getParent().toString();
   }
 
   /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JvmManager.java Tue Jan 26 14:02:53 2010
@@ -30,11 +30,11 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapreduce.util.ProcessTree;
 
 class JvmManager {
@@ -136,6 +136,14 @@
     }
   }
 
+  void dumpStack(TaskRunner tr) {
+    if (tr.getTask().isMapTask()) {
+      mapJvmManager.dumpStack(tr);
+    } else {
+      reduceJvmManager.dumpStack(tr);
+    }
+  }
+
   public void killJvm(JVMId jvmId) {
     if (jvmId.isMap) {
       mapJvmManager.killJvm(jvmId);
@@ -144,6 +152,22 @@
     }
   }  
 
+  /**
+   * Adds the task's work dir to the cleanup queue of taskTracker for
+   * asynchronous deletion of work dir.
+   * @param tracker taskTracker
+   * @param task    the task whose work dir needs to be deleted
+   * @throws IOException
+   */
+  static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException {
+    tracker.getCleanupThread().addToQueue(
+        TaskTracker.buildTaskControllerPathDeletionContexts(
+          tracker.getLocalFileSystem(),
+          tracker.getLocalFiles(tracker.getJobConf(), ""),
+          task, true /* workDir */,
+          tracker.getTaskController()));
+  }
+
   private static class JvmManagerForType {
     //Mapping from the JVM IDs to running Tasks
     Map <JVMId,TaskRunner> jvmToRunningTask = 
@@ -243,6 +267,16 @@
       }
     }
     
+    synchronized void dumpStack(TaskRunner tr) {
+      JVMId jvmId = runningTaskToJvm.get(tr);
+      if (null != jvmId) {
+        JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
+        if (null != jvmRunner) {
+          jvmRunner.dumpChildStacks();
+        }
+      }
+    }
+
     synchronized public void stop() {
       //since the kill() method invoked later on would remove
       //an entry from the jvmIdToRunner map, we create a
@@ -428,7 +462,7 @@
             //task at the beginning of each task in the task JVM.
             //For the last task, we do it here.
             if (env.conf.getNumTasksToExecutePerJvm() != 1) {
-              FileUtil.fullyDelete(env.workDir);
+              deleteWorkDir(tracker, initalContext.task);
             }
           } catch (IOException ie){}
         }
@@ -459,7 +493,38 @@
           removeJvm(jvmId);
         }
       }
-      
+
+      /** Send a signal to the JVM requesting that it dump a stack trace,
+       * and wait for a timeout interval to give this signal time to be
+       * processed.
+       */
+      void dumpChildStacks() {
+        if (!killed) {
+          TaskController controller = tracker.getTaskController();
+          // Check inital context before issuing a signal to prevent situations
+          // where signal is issued before task is launched.
+          if (initalContext != null && initalContext.env != null) {
+            initalContext.pid = jvmIdToPid.get(jvmId);
+            initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
+                .getLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
+                    ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+
+            // signal the task jvm
+            controller.dumpTaskStack(initalContext);
+
+            // We're going to kill the jvm with SIGKILL after this,
+            // so we should wait for a few seconds first to ensure that
+            // the SIGQUIT has time to be processed.
+            try {
+              Thread.sleep(initalContext.sleeptimeBeforeSigkill);
+            } catch (InterruptedException e) {
+              LOG.warn("Sleep interrupted : " +
+                  StringUtils.stringifyException(e));
+            }
+          }
+        }
+      }
+
       public void taskRan() {
         busy = false;
         numTasksRan++;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Tue Jan 26 14:02:53 2010
@@ -29,7 +29,9 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -83,12 +85,14 @@
   enum TaskCommands {
     INITIALIZE_USER,
     INITIALIZE_JOB,
-    INITIALIZE_DISTRIBUTEDCACHE,
+    INITIALIZE_DISTRIBUTEDCACHE_FILE,
     LAUNCH_TASK_JVM,
     INITIALIZE_TASK,
     TERMINATE_TASK_JVM,
     KILL_TASK_JVM,
     RUN_DEBUG_SCRIPT,
+    SIGQUIT_TASK_JVM,
+    ENABLE_TASK_FOR_CLEANUP
   }
 
   /**
@@ -228,12 +232,75 @@
   @Override
   void initializeTask(TaskControllerContext context)
       throws IOException {
-    LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
-        + " for " + context.task.getTaskID().toString());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
+                + " for " + context.task.getTaskID().toString());
+    }
     runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(),
         buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
   }
 
+  /**
+   * Builds the args to be passed to task-controller for enabling of task for
+   * cleanup. Last arg in this List is either $attemptId or $attemptId/work
+   */
+  private List<String> buildTaskCleanupArgs(
+      TaskControllerPathDeletionContext context) {
+    List<String> commandArgs = new ArrayList<String>(3);
+    commandArgs.add(context.mapredLocalDir.toUri().getPath());
+    commandArgs.add(context.task.getJobID().toString());
+
+    String workDir = "";
+    if (context.isWorkDir) {
+      workDir = "/work";
+    }
+    if (context.task.isTaskCleanupTask()) {
+      commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
+                      + workDir);
+    } else {
+      commandArgs.add(context.task.getTaskID() + workDir);
+    }
+
+    return commandArgs;
+  }
+
+  /**
+   * Enables the task for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  @Override
+  void enableTaskForCleanup(PathDeletionContext context)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to do " + TaskCommands.ENABLE_TASK_FOR_CLEANUP.toString()
+                + " for " + context.fullPath);
+    }
+
+    if (context instanceof TaskControllerPathDeletionContext) {
+      TaskControllerPathDeletionContext tContext =
+        (TaskControllerPathDeletionContext) context;
+    
+      if (tContext.task.getUser() != null &&
+          tContext.fs instanceof LocalFileSystem) {
+        try {
+          runCommand(TaskCommands.ENABLE_TASK_FOR_CLEANUP,
+                   tContext.task.getUser(),
+                   buildTaskCleanupArgs(tContext), null, null);
+        } catch(IOException e) {
+          LOG.warn("Uanble to change permissions for " + tContext.fullPath);
+        }
+      }
+      else {
+        throw new IllegalArgumentException("Either user is null or the "  +
+                               "file system is not local file system.");
+      }
+    }
+    else {
+      throw new IllegalArgumentException("PathDeletionContext provided is not "
+          + "TaskControllerPathDeletionContext.");
+    }
+  }
+
   private void logOutput(String output) {
     String shExecOutput = output;
     if (shExecOutput != null) {
@@ -408,12 +475,21 @@
   }
 
   @Override
-  public void initializeDistributedCache(InitializationContext context)
+  public void initializeDistributedCacheFile(DistributedCacheFileContext context)
       throws IOException {
-    LOG.debug("Going to initialize distributed cache for " + context.user
-        + " on the TT");
-    runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE, context.user,
-        new ArrayList<String>(), context.workDir, null);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to initialize distributed cache for " + context.user
+          + " with localizedBaseDir " + context.localizedBaseDir + 
+          " and uniqueString " + context.uniqueString);
+    }
+    List<String> args = new ArrayList<String>();
+    // Here, uniqueString might start with '-'. Adding -- in front of the 
+    // arguments indicates that they are non-option parameters.
+    args.add("--");
+    args.add(context.localizedBaseDir.toString());
+    args.add(context.uniqueString);
+    runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, context.user,
+        args, context.workDir, null);
   }
 
   @Override
@@ -443,21 +519,22 @@
   }
   
   /**
-   * Convenience method used to sending appropriate Kill signal to the task 
+   * Convenience method used to sending appropriate signal to the task
    * VM
    * @param context
    * @param command
    * @throws IOException
    */
-  private void finishTask(TaskControllerContext context,
+  protected void signalTask(TaskControllerContext context,
       TaskCommands command) throws IOException{
     if(context.task == null) {
-      LOG.info("Context task null not killing the JVM");
+      LOG.info("Context task is null; not signaling the JVM");
       return;
     }
     ShellCommandExecutor shExec = buildTaskControllerExecutor(
         command, context.env.conf.getUser(), 
-        buildKillTaskCommandArgs(context), context.env.workDir, context.env.env);
+        buildKillTaskCommandArgs(context), context.env.workDir,
+        context.env.env);
     try {
       shExec.execute();
     } catch (Exception e) {
@@ -469,7 +546,7 @@
   @Override
   void terminateTask(TaskControllerContext context) {
     try {
-      finishTask(context, TaskCommands.TERMINATE_TASK_JVM);
+      signalTask(context, TaskCommands.TERMINATE_TASK_JVM);
     } catch (Exception e) {
       LOG.warn("Exception thrown while sending kill to the Task VM " + 
           StringUtils.stringifyException(e));
@@ -479,13 +556,23 @@
   @Override
   void killTask(TaskControllerContext context) {
     try {
-      finishTask(context, TaskCommands.KILL_TASK_JVM);
+      signalTask(context, TaskCommands.KILL_TASK_JVM);
     } catch (Exception e) {
       LOG.warn("Exception thrown while sending destroy to the Task VM " + 
           StringUtils.stringifyException(e));
     }
   }
 
+  @Override
+  void dumpTaskStack(TaskControllerContext context) {
+    try {
+      signalTask(context, TaskCommands.SIGQUIT_TASK_JVM);
+    } catch (Exception e) {
+      LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " +
+          StringUtils.stringifyException(e));
+    }
+  }
+
   protected String getTaskControllerExecutablePath() {
     return taskControllerExe;
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jan 26 14:02:53 2010
@@ -23,32 +23,32 @@
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
-import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
 public class LocalJobRunner implements ClientProtocol {
@@ -60,7 +60,8 @@
   private JobConf conf;
   private int map_tasks = 0;
   private int reduce_tasks = 0;
-
+  final Random rand = new Random();
+  
   private JobTrackerInstrumentation myMetrics = null;
 
   private static final String jobDir =  "localRunner/";
@@ -68,33 +69,6 @@
   public long getProtocolVersion(String protocol, long clientVersion) {
     return ClientProtocol.versionID;
   }
-  
-  @SuppressWarnings("unchecked")
-  static RawSplit[] getRawSplits(JobContext jContext, JobConf job)
-      throws Exception {
-    JobConf jobConf = jContext.getJobConf();
-    org.apache.hadoop.mapreduce.InputFormat<?,?> input =
-      ReflectionUtils.newInstance(jContext.getInputFormatClass(), jobConf);
-
-    List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(jContext);
-    RawSplit[] rawSplits = new RawSplit[splits.size()];
-    DataOutputBuffer buffer = new DataOutputBuffer();
-    SerializationFactory factory = new SerializationFactory(jobConf);
-    Serializer serializer = 
-      factory.getSerializer(splits.get(0).getClass());
-    serializer.open(buffer);
-    for (int i = 0; i < splits.size(); i++) {
-      buffer.reset();
-      serializer.serialize(splits.get(i));
-      RawSplit rawSplit = new RawSplit();
-      rawSplit.setClassName(splits.get(i).getClass().getName());
-      rawSplit.setDataLength(splits.get(i).getLength());
-      rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-      rawSplit.setLocations(splits.get(i).getLocations());
-      rawSplits[i] = rawSplit;
-    }
-    return rawSplits;
-  }
 
   private class Job extends Thread implements TaskUmbilicalProtocol {
     // The job directory on the system: JobClient places job configurations here.
@@ -130,8 +104,8 @@
       return TaskUmbilicalProtocol.versionID;
     }
     
-    public Job(JobID jobid) throws IOException {
-      this.systemJobDir = new Path(getSystemDir(), jobid.toString());
+    public Job(JobID jobid, String jobSubmitDir) throws IOException {
+      this.systemJobDir = new Path(jobSubmitDir);
       this.systemJobFile = new Path(systemJobDir, "job.xml");
       this.id = jobid;
       JobConf conf = new JobConf(systemJobFile);
@@ -142,13 +116,13 @@
       // Manage the distributed cache.  If there are files to be copied,
       // this will trigger localFile to be re-written again.
       this.trackerDistributerdCacheManager =
-          new TrackerDistributedCacheManager(conf);
+          new TrackerDistributedCacheManager(conf, new DefaultTaskController());
       this.taskDistributedCacheManager = 
           trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
       taskDistributedCacheManager.setup(
           new LocalDirAllocator(MRConfig.LOCAL_DIR), 
           new File(systemJobDir.toString()),
-          "archive");
+          "archive", "archive");
       
       if (DistributedCache.getSymlink(conf)) {
         // This is not supported largely because, 
@@ -202,26 +176,10 @@
       JobContext jContext = new JobContextImpl(job, jobId);
       OutputCommitter outputCommitter = job.getOutputCommitter();
       try {
-        // split input into minimum number of splits
-        RawSplit[] rawSplits;
-        if (job.getUseNewMapper()) {
-          rawSplits = getRawSplits(jContext, job);
-        } else {
-          InputSplit[] splits = job.getInputFormat().getSplits(job, 1);
-          rawSplits = new RawSplit[splits.length];
-          DataOutputBuffer buffer = new DataOutputBuffer();
-          for (int i = 0; i < splits.length; i++) {
-            buffer.reset();
-            splits[i].write(buffer);
-            RawSplit rawSplit = new RawSplit();
-            rawSplit.setClassName(splits[i].getClass().getName());
-            rawSplit.setDataLength(splits[i].getLength());
-            rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-            rawSplit.setLocations(splits[i].getLocations());
-            rawSplits[i] = rawSplit;
-          }
-        }
+        TaskSplitMetaInfo[] taskSplitMetaInfos = 
+          SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
         
+    
         int numReduceTasks = job.getNumReduceTasks();
         if (numReduceTasks > 1 || numReduceTasks < 0) {
           // we only allow 0 or 1 reducer in local mode
@@ -233,15 +191,14 @@
 
         Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
             new HashMap<TaskAttemptID, MapOutputFile>();
-        for (int i = 0; i < rawSplits.length; i++) {
+        for (int i = 0; i < taskSplitMetaInfos.length; i++) {
           if (!this.isInterrupted()) {
             TaskAttemptID mapId = new TaskAttemptID(
                 new TaskID(jobId, TaskType.MAP, i),0);  
             mapIds.add(mapId);
             MapTask map = new MapTask(systemJobFile.toString(),  
                                       mapId, i,
-                                      rawSplits[i].getClassName(),
-                                      rawSplits[i].getBytes(), 1);
+                                      taskSplitMetaInfos[i].getSplitIndex(), 1);
             JobConf localConf = new JobConf(job);
             TaskRunner.setupChildMapredLocalDirs(map, localConf);
 
@@ -459,9 +416,10 @@
   }
 
   public org.apache.hadoop.mapreduce.JobStatus submitJob(
-      org.apache.hadoop.mapreduce.JobID jobid) 
+      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir, TokenStorage ts) 
       throws IOException {
-    return new Job(JobID.downgrade(jobid)).status;
+    TokenCache.setTokenStorage(ts);
+    return new Job(JobID.downgrade(jobid), jobSubmitDir).status;
   }
 
   public void killJob(org.apache.hadoop.mapreduce.JobID id) {
@@ -564,6 +522,22 @@
     return fs.makeQualified(sysDir).toString();
   }
 
+  /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
+   */
+  public String getStagingAreaDir() {
+    Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, 
+        "/tmp/hadoop/mapred/staging"));
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+    String user;
+    if (ugi != null) {
+      user = ugi.getUserName() + rand.nextInt();
+    } else {
+      user = "dummy" + rand.nextInt();
+    }
+    return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
+  }
+  
   public String getJobHistoryDir() {
     return null;
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java Tue Jan 26 14:02:53 2010
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.authorize.Service;
@@ -36,6 +37,8 @@
                   TaskUmbilicalProtocol.class),
       new Service("security.refresh.policy.protocol.acl", 
                   RefreshAuthorizationPolicyProtocol.class),
+      new Service("security.refresh.usertogroups.mappings.protocol.acl", 
+                  RefreshUserToGroupMappingsProtocol.class),
   };
   
   @Override

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTask.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTask.java Tue Jan 26 14:02:53 2010
@@ -33,6 +33,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -43,6 +44,7 @@
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
@@ -54,6 +56,10 @@
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -72,7 +78,7 @@
    */
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
 
-  private BytesWritable split = new BytesWritable();
+  private TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
   private String splitClass;
   private final static int APPROX_HEADER_LENGTH = 150;
 
@@ -81,7 +87,6 @@
   private Progress mapPhase;
   private Progress sortPhase;
   
-  
   {   // set phase for this task
     setPhase(TaskStatus.Phase.MAP); 
     getProgress().setStatus("map");
@@ -92,11 +97,10 @@
   }
 
   public MapTask(String jobFile, TaskAttemptID taskId, 
-                 int partition, String splitClass, BytesWritable split,
+                 int partition, TaskSplitIndex splitIndex,
                  int numSlotsRequired) {
     super(jobFile, taskId, partition, numSlotsRequired);
-    this.splitClass = splitClass;
-    this.split = split;
+    this.splitMetaInfo = splitIndex;
   }
 
   @Override
@@ -108,26 +112,26 @@
   public void localizeConfiguration(JobConf conf)
       throws IOException {
     super.localizeConfiguration(conf);
-    // split.dta file is used only by IsolationRunner.
+    // split.dta/split.info files are used only by IsolationRunner.
     // Write the split file to the local disk if it is a normal map task (not a
     // job-setup or a job-cleanup task) and if the user wishes to run
     // IsolationRunner either by setting keep.failed.tasks.files to true or by
     // using keep.tasks.files.pattern
-    if (isMapOrReduce()
-        && (conf.getKeepTaskFilesPattern() != null || conf
-            .getKeepFailedTaskFiles())) {
-      Path localSplit =
+    if (supportIsolationRunner(conf) && isMapOrReduce()) {
+      // localize the split meta-information 
+      Path localSplitMeta =
           new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathForWrite(
-              TaskTracker.getLocalSplitFile(conf.getUser(), getJobID()
-                  .toString(), getTaskID().toString()), conf);
-      LOG.debug("Writing local split to " + localSplit);
-      DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
-      Text.writeString(out, splitClass);
-      split.write(out);
+              TaskTracker.getLocalSplitMetaFile(conf.getUser(), 
+                getJobID().toString(), getTaskID()
+                  .toString()), conf);
+      LOG.debug("Writing local split to " + localSplitMeta);
+      DataOutputStream out = FileSystem.getLocal(conf).create(localSplitMeta);
+      splitMetaInfo.write(out);
       out.close();
     }
   }
   
+  
   @Override
   public TaskRunner createRunner(TaskTracker tracker, 
       TaskTracker.TaskInProgress tip) {
@@ -138,9 +142,8 @@
   public void write(DataOutput out) throws IOException {
     super.write(out);
     if (isMapOrReduce()) {
-      Text.writeString(out, splitClass);
-      split.write(out);
-      split = null;
+      splitMetaInfo.write(out);
+      splitMetaInfo = null;
     }
   }
   
@@ -148,8 +151,7 @@
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     if (isMapOrReduce()) {
-      splitClass = Text.readString(in);
-      split.readFields(in);
+      splitMetaInfo.readFields(in);
     }
   }
 
@@ -320,36 +322,52 @@
     }
 
     if (useNewApi) {
-      runNewMapper(job, split, umbilical, reporter);
+      runNewMapper(job, splitMetaInfo, umbilical, reporter);
     } else {
-      runOldMapper(job, split, umbilical, reporter);
+      runOldMapper(job, splitMetaInfo, umbilical, reporter);
     }
     done(umbilical, reporter);
   }
 
+ @SuppressWarnings("unchecked")
+ private <T> T getSplitDetails(Path file, long offset) 
+  throws IOException {
+   FileSystem fs = file.getFileSystem(conf);
+   FSDataInputStream inFile = fs.open(file);
+   inFile.seek(offset);
+   String className = Text.readString(inFile);
+   Class<T> cls;
+   try {
+     cls = (Class<T>) conf.getClassByName(className);
+   } catch (ClassNotFoundException ce) {
+     IOException wrap = new IOException("Split class " + className + 
+                                         " not found");
+     wrap.initCause(ce);
+     throw wrap;
+   }
+   SerializationFactory factory = new SerializationFactory(conf);
+   Deserializer<T> deserializer = 
+     (Deserializer<T>) factory.getDeserializer(cls);
+   deserializer.open(inFile);
+   T split = deserializer.deserialize(null);
+   long pos = inFile.getPos();
+   getCounters().findCounter(
+       TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
+   inFile.close();
+   return split;
+ }
+  
   @SuppressWarnings("unchecked")
   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
   void runOldMapper(final JobConf job,
-                    final BytesWritable rawSplit,
+                    final TaskSplitIndex splitIndex,
                     final TaskUmbilicalProtocol umbilical,
                     TaskReporter reporter
                     ) throws IOException, InterruptedException,
                              ClassNotFoundException {
-    InputSplit inputSplit = null;
-    // reinstantiate the split
-    try {
-      inputSplit = (InputSplit) 
-        ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
-    } catch (ClassNotFoundException exp) {
-      IOException wrap = new IOException("Split class " + splitClass + 
-                                         " not found");
-      wrap.initCause(exp);
-      throw wrap;
-    }
-    DataInputBuffer splitBuffer = new DataInputBuffer();
-    splitBuffer.reset(split.getBytes(), 0, split.getLength());
-    inputSplit.readFields(splitBuffer);
-    
+    InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
+           splitIndex.getStartOffset());
+
     updateJobWithSplit(job, inputSplit);
     reporter.setInputSplit(inputSplit);
 
@@ -465,7 +483,7 @@
     @SuppressWarnings("unchecked")
     OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) {
       numPartitions = conf.getNumReduceTasks();
-      if (numPartitions > 0) {
+      if (numPartitions > 1) {
         partitioner = (Partitioner<K,V>)
           ReflectionUtils.newInstance(conf.getPartitionerClass(), conf);
       } else {
@@ -474,7 +492,7 @@
           public void configure(JobConf job) { }
           @Override
           public int getPartition(K key, V value, int numPartitions) {
-            return -1;
+            return numPartitions - 1;
           }
         };
       }
@@ -544,14 +562,14 @@
                        ) throws IOException, ClassNotFoundException {
       collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
       partitions = jobContext.getNumReduceTasks();
-      if (partitions > 0) {
+      if (partitions > 1) {
         partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
           ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
       } else {
         partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
           @Override
           public int getPartition(K key, V value, int numPartitions) {
-            return -1;
+            return partitions - 1;
           }
         };
       }
@@ -578,7 +596,7 @@
   @SuppressWarnings("unchecked")
   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
   void runNewMapper(final JobConf job,
-                    final BytesWritable rawSplit,
+                    final TaskSplitIndex splitIndex,
                     final TaskUmbilicalProtocol umbilical,
                     TaskReporter reporter
                     ) throws IOException, ClassNotFoundException,
@@ -597,15 +615,8 @@
         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
     // rebuild the input split
     org.apache.hadoop.mapreduce.InputSplit split = null;
-    DataInputBuffer splitBuffer = new DataInputBuffer();
-    splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
-    SerializationFactory factory = new SerializationFactory(job);
-    Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>
-      deserializer = 
-        (Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>) 
-        factory.getDeserializer(job.getClassByName(splitClass));
-    deserializer.open(splitBuffer);
-    split = deserializer.deserialize(null);
+    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
+        splitIndex.getStartOffset());
 
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
       new NewTrackingRecordReader<INKEY,INVALUE>
@@ -782,11 +793,11 @@
         throw new IOException("Invalid \"mapreduce.map.sort.record.percent\": " + recper);
       }
       if ((sortmb & 0x7FF) != sortmb) {
-        throw new IOException("Invalid \"mapreduce.task.mapreduce.task.io.sort.mb\": " + sortmb);
+        throw new IOException("Invalid " + JobContext.IO_SORT_MB + ": " + sortmb);
       }
       sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
             QuickSort.class, IndexedSorter.class), job);
-      LOG.info("mapreduce.task.mapreduce.task.io.sort.mb = " + sortmb);
+      LOG.info(JobContext.IO_SORT_MB + " = " + sortmb);
       // buffers and accounting
       int maxMemUsage = sortmb << 20;
       int recordCapacity = (int)(maxMemUsage * recper);
@@ -832,7 +843,7 @@
       } else {
         combineCollector = null;
       }
-      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPISS, 3);
+      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
       spillThread.setDaemon(true);
       spillThread.setName("SpillThread");
       spillLock.lock();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Merger.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Merger.java Tue Jan 26 14:02:53 2010
@@ -747,7 +747,7 @@
      * calculating mergeProgress. This simulates the above merge() method and
      * tries to obtain the number of bytes that are going to be merged in all
      * merges(assuming that there is no combiner called while merging).
-     * @param factor mapreduce.task.mapreduce.task.io.sort.factor
+     * @param factor mapreduce.task.io.sort.factor
      * @param inMem  number of segments in memory to be merged
      */
     long computeBytesInMerges(int factor, int inMem) {



Mime
View raw message