hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r892893 [1/3] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ src...
Date Mon, 21 Dec 2009 17:36:48 GMT
Author: ddas
Date: Mon Dec 21 17:36:44 2009
New Revision: 892893

URL: http://svn.apache.org/viewvc?rev=892893&view=rev
Log:
MAPREDUCE-181. Changes the job submission process to be secure. Contributed by Devaraj Das.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInfo.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplit.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
    hadoop/mapreduce/trunk/src/java/mapred-default.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
    hadoop/mapreduce/trunk/src/test/mapred-site.xml
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCh.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Dec 21 17:36:44 2009
@@ -84,6 +84,9 @@
     can be refreshed in the JobTracker via command line. 
     (Boris Shkolnik via ddas)
 
+    MAPREDUCE-181. Changes the job submission process to be secure.
+    (Devaraj Das)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java Mon Dec 21 17:36:44 2009
@@ -42,6 +42,7 @@
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
 
 
@@ -349,9 +350,9 @@
         }
       }
       TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
+      JobSplit.TaskSplitMetaInfo split = JobSplit.EMPTY_TASK_SPLIT;
       Task task = new MapTask(
-        "", attemptId, 0, "", new BytesWritable(),
-        super.numSlotsPerMap) {
+        "", attemptId, 0, split.getSplitIndex(), super.numSlotsPerMap) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -362,7 +363,7 @@
       // create a fake TIP and keep track of it
       FakeTaskInProgress mapTip = new FakeTaskInProgress(
         getJobID(),
-        getJobConf(), task, true, this);
+        getJobConf(), task, true, this, split);
       mapTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
       if (areAllMapsRunning) {
         speculativeMapTasks++;
@@ -408,7 +409,7 @@
       // create a fake TIP and keep track of it
       FakeTaskInProgress reduceTip = new FakeTaskInProgress(
         getJobID(),
-        getJobConf(), task, false, this);
+        getJobConf(), task, false, this, null);
       reduceTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
       if (areAllReducesRunning) {
         speculativeReduceTasks++;
@@ -499,8 +500,9 @@
 
     FakeTaskInProgress(
       JobID jId, JobConf jobConf, Task t,
-      boolean isMap, FakeJobInProgress job) {
-      super(jId, "", new Job.RawSplit(), null, jobConf, job, 0, 1);
+      boolean isMap, FakeJobInProgress job, 
+      JobSplit.TaskSplitMetaInfo split) {
+      super(jId, "", split, null, jobConf, job, 0, 1);
       this.isMap = isMap;
       this.fakeJob = job;
       activeTasks = new TreeMap<TaskAttemptID, String>();

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Mon Dec 21 17:36:44 2009
@@ -43,6 +43,7 @@
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.net.Node;
 
 public class TestFairScheduler extends TestCase {
@@ -111,12 +112,14 @@
       // create maps
       numMapTasks = conf.getNumMapTasks();
       maps = new TaskInProgress[numMapTasks];
+      // empty format
+      JobSplit.TaskSplitMetaInfo split = JobSplit.EMPTY_TASK_SPLIT;
       for (int i = 0; i < numMapTasks; i++) {
         String[] inputLocations = null;
         if (mapInputLocations != null)
           inputLocations = mapInputLocations[i];
         maps[i] = new FakeTaskInProgress(getJobID(), i,
-            getJobConf(), this, inputLocations);
+            getJobConf(), this, inputLocations, split);
         if (mapInputLocations == null) // Job has no locality info
           nonLocalMaps.add(maps[i]);
       }
@@ -137,7 +140,8 @@
         if (!tip.isRunning() && !tip.isComplete() &&
             getLocalityLevel(tip, tts) < localityLevel) {
           TaskAttemptID attemptId = getTaskAttemptID(tip);
-          Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1) {
+          JobSplit.TaskSplitMetaInfo split = JobSplit.EMPTY_TASK_SPLIT;
+          Task task = new MapTask("", attemptId, 0, split.getSplitIndex(), 1) {
             @Override
             public String toString() {
               return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -231,8 +235,9 @@
     
     // Constructor for map
     FakeTaskInProgress(JobID jId, int id, JobConf jobConf,
-        FakeJobInProgress job, String[] inputLocations) {
-      super(jId, "", new Job.RawSplit(), null, jobConf, job, id, 1);
+        FakeJobInProgress job, String[] inputLocations, 
+        JobSplit.TaskSplitMetaInfo split) {
+      super(jId, "", split, null, jobConf, job, id, 1);
       this.isMap = true;
       this.fakeJob = job;
       this.inputLocations = inputLocations;

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Mon Dec 21 17:36:44 2009
@@ -35,6 +35,7 @@
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -162,7 +163,8 @@
         switch (type) {
           case MAP:
              runInputBytes[i] = counters.findCounter("FileSystemCounters",
-                 "HDFS_BYTES_READ").getValue();
+                 "HDFS_BYTES_READ").getValue() - 
+                 counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getValue();
              runInputRecords[i] =
                (int)counters.findCounter(MAP_INPUT_RECORDS).getValue();
              runOutputBytes[i] =

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java Mon Dec 21 17:36:44 2009
@@ -349,6 +349,6 @@
     }
     
     SimulatorJobCache.put(org.apache.hadoop.mapred.JobID.downgrade(jobId), job);
-    return jobTracker.submitJob(jobId);
+    return jobTracker.submitJob(jobId, "dummy-path");
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java Mon Dec 21 17:36:44 2009
@@ -28,11 +28,11 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.Node;
@@ -48,10 +48,10 @@
   // cache
   private final JobStory jobStory;
 
-  RawSplit[] splits;
+  TaskSplitMetaInfo[] taskSplitMetaInfo;
 
   @SuppressWarnings("deprecation")
-  public SimulatorJobInProgress(JobID jobid, JobTracker jobtracker,
+  public SimulatorJobInProgress(JobID jobid, String jobSubmitDir, JobTracker jobtracker,
       JobConf default_conf, JobStory jobStory) {
     super();
     // jobSetupCleanupNeeded set to false in parent cstr, though
@@ -63,7 +63,7 @@
     this.jobtracker = jobtracker;
     this.conf = jobStory.getJobConf();
     this.priority = conf.getJobPriority();
-    Path jobDir = jobtracker.getSystemDirectoryForJob(jobid);
+    Path jobDir = new Path(jobSubmitDir);
     this.jobFile = new Path(jobDir, "job.xml");
     this.status = new JobStatus(jobid, 0.0f, 0.0f, 0.0f, 0.0f, JobStatus.PREP,
         priority, conf.getUser(), conf.getJobName(), jobFile.toString(), url);
@@ -127,16 +127,17 @@
     }
 
     final String jobFile = "default";
-    splits = getRawSplits(jobStory.getInputSplits());
+    taskSplitMetaInfo = createSplits(jobStory);
     if (loggingEnabled) {
       LOG.debug("(initTasks@SJIP) Created splits for job = " + jobId
-          + " number of splits = " + splits.length);
+          + " number of splits = " + taskSplitMetaInfo.length);
     }
 
-    createMapTasks(jobFile, splits);
+    createMapTasks(jobFile, taskSplitMetaInfo);
 
     if (numMapTasks > 0) {
-      nonRunningMapCache = createCache(splits, maxLevel);
+      nonRunningMapCache = createCache(taskSplitMetaInfo,
+          maxLevel);
       if (loggingEnabled) {
         LOG.debug("initTasks:numMaps=" + numMapTasks
             + " Size of nonRunningMapCache=" + nonRunningMapCache.size()
@@ -167,25 +168,25 @@
     }
   }
 
-  RawSplit[] getRawSplits(InputSplit[] splits) throws IOException {
+  
+  TaskSplitMetaInfo[] createSplits(JobStory story) throws IOException {
+    InputSplit[] splits = story.getInputSplits();
     if (splits == null || splits.length != numMapTasks) {
       throw new IllegalArgumentException("Input split size mismatch: expected="
           + numMapTasks + ", actual=" + ((splits == null) ? -1 : splits.length));
     }
 
-    RawSplit rawSplits[] = new RawSplit[splits.length];
-    for (int i = 0; i < splits.length; i++) {
+    TaskSplitMetaInfo[] splitMetaInfo = 
+      new TaskSplitMetaInfo[story.getNumberMaps()];
+    int i = 0;
+    for (InputSplit split : splits) {
       try {
-        rawSplits[i] = new RawSplit();
-        rawSplits[i].setClassName(splits[i].getClass().getName());
-        rawSplits[i].setDataLength(splits[i].getLength());
-        rawSplits[i].setLocations(splits[i].getLocations());
+        splitMetaInfo[i++] = new TaskSplitMetaInfo(split,0);
       } catch (InterruptedException ie) {
         throw new IOException(ie);
       }
     }
-
-    return rawSplits;
+    return splitMetaInfo;
   }
 
   /**
@@ -208,7 +209,8 @@
     assert (jobid == getJobID());
 
     // Get splits for the TaskAttempt
-    RawSplit split = splits[taskAttemptID.getTaskID().getId()];
+    TaskSplitMetaInfo split = 
+     taskSplitMetaInfo[taskAttemptID.getTaskID().getId()];
     int locality = getClosestLocality(taskTracker, split);
 
     TaskID taskId = taskAttemptID.getTaskID();
@@ -232,7 +234,7 @@
     return taskAttemptInfo;
   }
 
-  private int getClosestLocality(TaskTracker taskTracker, RawSplit split) {
+  private int getClosestLocality(TaskTracker taskTracker, TaskSplitMetaInfo split) {
     int locality = 2;
 
     Node taskTrackerNode = jobtracker

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java Mon Dec 21 17:36:44 2009
@@ -173,7 +173,8 @@
   }
 
   @Override
-  public synchronized JobStatus submitJob(JobID jobId) throws IOException {
+  public synchronized JobStatus submitJob(JobID jobId, String jobSubmitDir) 
+  throws IOException {
     boolean loggingEnabled = LOG.isDebugEnabled();
     if (loggingEnabled) {
       LOG.debug("submitJob for jobname = " + jobId);
@@ -191,7 +192,7 @@
     }
     validateAndSetClock(jobStory.getSubmissionTime());
     
-    SimulatorJobInProgress job = new SimulatorJobInProgress(jobId, this,
+    SimulatorJobInProgress job = new SimulatorJobInProgress(jobId, jobSubmitDir, this,
                                                             this.conf, 
                                                             jobStory);
     return addJob(jobId, job);

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java Mon Dec 21 17:36:44 2009
@@ -43,6 +43,7 @@
 import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo;
 import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
+import org.apache.hadoop.mapreduce.split.JobSplit.*;
 //
 // Mock jobtracker class that check heartbeat() in parameters and 
 // sends responses based on a prepopulated table
@@ -76,7 +77,7 @@
   }
 
   @Override
-  public JobStatus submitJob(JobID jobId) throws IOException {
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir) throws IOException {
     JobStatus status = new JobStatus(jobId, 0.0f, 0.0f, 0.0f, 0.0f,
         JobStatus.State.RUNNING, JobPriority.NORMAL, "", "", "", "");
     return status;
@@ -172,8 +173,8 @@
     final int numSlotsRequired = 1;
     org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi = 
         org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);        
-    Task task = new MapTask("dummyjobfile", taskIdOldApi, 0, "dummysplitclass",
-                            null, numSlotsRequired);
+    Task task = new MapTask("dummyjobfile", taskIdOldApi, 0, new TaskSplitIndex(),
+                             numSlotsRequired);
     // all byte counters are 0
     TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); 
     MapTaskAttemptInfo taskAttemptInfo = 
@@ -302,6 +303,11 @@
   public String getSystemDir() {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public String getStagingAreaDir() {
+    throw new UnsupportedOperationException();
+  }
   
   @Override
   public String getBuildVersion() {

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java Mon Dec 21 17:36:44 2009
@@ -79,7 +79,7 @@
       FakeJobs job = new FakeJobs("job1", 0, numMaps, numReduces);
 
       SimulatorJobCache.put(org.apache.hadoop.mapred.JobID.downgrade(jobId), job);
-      jobTracker.submitJob(jobId);
+      jobTracker.submitJob(jobId, "dummy-path");
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Mon Dec 21 17:36:44 2009
@@ -248,6 +248,17 @@
   <description>The class responsible for scheduling the tasks.</description>
 </property>
 
+
+<property>
+  <name>mapreduce.job.split.metainfo.maxsize</name>
+  <value>10000000</value>
+  <description>The maximum permissible size of the split metainfo file. 
+  The JobTracker won't attempt to read split metainfo files bigger than
+  the configured value.
+  No limits if set to -1.
+  </description>
+</property>
+
 <property>
   <name>mapreduce.jobtracker.taskscheduler.maxrunningtasks.perjob</name>
   <value></value>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Mon Dec 21 17:36:44 2009
@@ -30,10 +30,9 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 
 /**
  * IsolationRunner is intended to facilitate debugging by re-running a specific
@@ -180,19 +179,21 @@
     Thread.currentThread().setContextClassLoader(classLoader);
     conf.setClassLoader(classLoader);
 
-    // split.dta file is used only by IsolationRunner. The file can now be in
-    // any of the configured local disks, so use LocalDirAllocator to find out
-    // where it is.
-    Path localSplit =
+    // split.dta/split.meta files are used only by IsolationRunner. 
+    // The file can now be in any of the configured local disks, 
+    // so use LocalDirAllocator to find out where it is.
+    Path localMetaSplit =
         new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead(
-            TaskTracker.getLocalSplitFile(conf.getUser(), taskId.getJobID()
-                .toString(), taskId.toString()), conf);
-    DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
-    String splitClass = Text.readString(splitFile);
-    BytesWritable split = new BytesWritable();
-    split.readFields(splitFile);
+            TaskTracker.getLocalSplitMetaFile(conf.getUser(), 
+              taskId.getJobID().toString(), taskId
+                .toString()), conf);
+    DataInputStream splitFile = FileSystem.getLocal(conf).open(localMetaSplit);
+    TaskSplitIndex splitIndex = new TaskSplitIndex(); 
+    splitIndex.readFields(splitFile);
     splitFile.close();
-    Task task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split, 1);
+
+    Task task = 
+      new MapTask(jobFilename.toString(), taskId, partition, splitIndex, 1);
     task.setConf(conf);
     task.run(conf, new FakeUmbilical());
     return true;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon Dec 21 17:36:44 2009
@@ -476,6 +476,13 @@
   }
   
   /**
+   * Get a handle to the Cluster
+   */
+  public Cluster getClusterHandle() {
+    return cluster;
+  }
+  
+  /**
    * Submit a job to the MR system.
    * 
    * This returns a handle to the {@link RunningJob} which can be used to track
@@ -523,37 +530,6 @@
     }
   }
 
-  /** 
-   * Checks if the job directory is clean and has all the required components 
-   * for (re) starting the job
-   */
-  public static boolean isJobDirValid(Path jobDirPath, FileSystem fs) 
-  throws IOException {
-    FileStatus[] contents = null;
-    
-    try {
-      contents = fs.listStatus(jobDirPath);
-    } catch(FileNotFoundException fnfe) {
-      return false;
-    }
-    
-    int matchCount = 0;
-    if (contents.length >=2) {
-      for (FileStatus status : contents) {
-        if ("job.xml".equals(status.getPath().getName())) {
-          ++matchCount;
-        }
-        if ("job.split".equals(status.getPath().getName())) {
-          ++matchCount;
-        }
-      }
-      if (matchCount == 2) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   /**
    * Get an {@link RunningJob} object to track an ongoing job.  Returns
    * null if the id does not correspond to any known job.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Dec 21 17:36:44 2009
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -38,16 +37,16 @@
 import java.util.Vector;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
@@ -66,12 +65,16 @@
 import org.apache.hadoop.mapreduce.security.JobTokens;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 
@@ -101,7 +104,6 @@
   JobStatus status;
   Path jobFile = null;
   Path localJobFile = null;
-  Path localJarFile = null;
 
   TaskInProgress maps[] = new TaskInProgress[0];
   TaskInProgress reduces[] = new TaskInProgress[0];
@@ -298,6 +300,7 @@
     new HashMap<TaskTracker, FallowSlotInfo>();
   private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces = 
     new HashMap<TaskTracker, FallowSlotInfo>();
+  private Path jobSubmitDir = null;
   
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
@@ -352,45 +355,52 @@
    * Create a JobInProgress with the given job file, plus a handle
    * to the tracker.
    */
-  public JobInProgress(JobID jobid, JobTracker jobtracker, 
-                       JobConf default_conf, int rCount) throws IOException {
+  public JobInProgress(JobTracker jobtracker, 
+                       JobConf default_conf, int rCount,
+                       JobInfo jobInfo) throws IOException {
     this.restartCount = rCount;
-    this.jobId = jobid;
+    this.jobId = JobID.downgrade(jobInfo.getJobID());
     String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
-        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
+        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + this.jobId;
     this.jobtracker = jobtracker;
     this.jobHistory = jobtracker.getJobHistory();
     this.startTime = System.currentTimeMillis();
     
     this.localFs = jobtracker.getLocalFileSystem();
 
-    JobConf default_job_conf = new JobConf(default_conf);
-    this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR 
-                                                      +"/"+jobid + ".xml");
-    this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
-                                                      +"/"+ jobid + ".jar");
-    Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
-    fs = jobtracker.getFileSystem(jobDir);
-    jobFile = new Path(jobDir, "job.xml");
+    // use the user supplied token to add user credentials to the conf
+    jobSubmitDir = jobInfo.getJobSubmitDir();
+    String user = jobInfo.getUser().toString();
+    conf = new JobConf();
+    conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, 
+        new UnixUserGroupInformation(user, 
+            new String[]{UnixUserGroupInformation.DEFAULT_GROUP}).toString());
+    fs = jobSubmitDir.getFileSystem(conf);
+    
+    this.localJobFile = 
+      default_conf.getLocalPath(JobTracker.SUBDIR + "/" + this.jobId + ".xml");
+    
+    jobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
     fs.copyToLocalFile(jobFile, localJobFile);
     conf = new JobConf(localJobFile);
+    if (conf.getUser() == null) {
+      this.conf.setUser(user);
+    }
+    if (!conf.getUser().equals(user)) {
+      throw new IOException("The username obtained from the conf doesn't " +
+      		"match the username the user authenticated as");
+    }
     this.priority = conf.getJobPriority();
-    this.profile = new JobProfile(conf.getUser(), jobid, 
-                                  jobFile.toString(), url, conf.getJobName(),
-                                  conf.getQueueName());
-    this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP, 
+    this.profile = new JobProfile(conf.getUser(), this.jobId, 
+                                  jobFile.toString(), url, 
+                                  conf.getJobName(), conf.getQueueName());
+    this.status = new JobStatus(this.jobId, 0.0f, 0.0f, JobStatus.PREP, 
         profile.getUser(), profile.getJobName(), profile.getJobFile(), 
         profile.getURL().toString());
-    this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
+    this.jobtracker.getInstrumentation().addPrepJob(conf, this.jobId);
     status.setStartTime(startTime);
     this.status.setJobPriority(this.priority);
 
-    String jarFile = conf.getJar();
-    if (jarFile != null) {
-      fs.copyToLocalFile(new Path(jarFile), localJarFile);
-      conf.setJar(localJarFile.toString());
-    }
-
     this.numMapTasks = conf.getNumMapTasks();
     this.numReduceTasks = conf.getNumReduceTasks();
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
@@ -406,7 +416,7 @@
     this.jobMetrics.setTag("user", conf.getUser());
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
-    this.jobMetrics.setTag("jobId", jobid.toString());
+    this.jobMetrics.setTag("jobId", this.jobId.toString());
     hasSpeculativeMaps = conf.getMapSpeculativeExecution();
     hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
     this.maxLevel = jobtracker.getNumTaskCacheLevels();
@@ -473,7 +483,7 @@
   }
   
   Map<Node, List<TaskInProgress>> createCache(
-                         Job.RawSplit[] splits, int maxLevel) {
+                         TaskSplitMetaInfo[] splits, int maxLevel) {
     Map<Node, List<TaskInProgress>> cache = 
       new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
     
@@ -586,26 +596,25 @@
     //
     // read input splits and create a map per a split
     //
-    String jobFile = profile.getJobFile();
-
-    Job.RawSplit[] splits = createSplits();
-    numMapTasks = splits.length;
+    TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
+    numMapTasks = taskSplitMetaInfo.length;
 
     checkTaskLimits();
 
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
 
-    createMapTasks(jobFile, splits);
+    createMapTasks(jobFile.toString(), taskSplitMetaInfo);
     
     if (numMapTasks > 0) { 
-      nonRunningMapCache = createCache(splits, maxLevel);
+      nonRunningMapCache = createCache(taskSplitMetaInfo,
+          maxLevel);
     }
         
     // set the launch time
     this.launchTime = JobTracker.getClock().getTime();
 
-    createReduceTasks(jobFile);
+    createReduceTasks(jobFile.toString());
     
     // Calculate the minimum number of maps to be complete before 
     // we should start scheduling reduces
@@ -615,7 +624,7 @@
                          DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
            numMapTasks));
     
-    initSetupCleanupTasks(jobFile);
+    initSetupCleanupTasks(jobFile.toString());
     
     synchronized(jobInitKillStatus){
       jobInitKillStatus.initDone = true;
@@ -669,16 +678,11 @@
     
   }
 
-  Job.RawSplit[] createSplits() throws IOException {
-    DataInputStream splitFile =
-      fs.open(new Path(conf.get(JobContext.SPLIT_FILE)));
-    Job.RawSplit[] splits;
-    try {
-      splits = Job.readSplitFile(splitFile);
-    } finally {
-      splitFile.close();
-    }
-    return splits;
+  TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId) 
+  throws IOException {
+    TaskSplitMetaInfo[] allTaskSplitMetaInfo = 
+      SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf, jobSubmitDir);
+    return allTaskSplitMetaInfo;
   }
 
   /**
@@ -695,10 +699,11 @@
     }
   }
 
-  synchronized void createMapTasks(String jobFile, Job.RawSplit[] splits) {
+  synchronized void createMapTasks(String jobFile, 
+		  TaskSplitMetaInfo[] splits) {
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
-      inputLength += splits[i].getDataLength();
+      inputLength += splits[i].getInputDataLength();
       maps[i] = new TaskInProgress(jobId, jobFile, 
                                    splits[i], 
                                    jobtracker, conf, this, 
@@ -720,6 +725,7 @@
     }
   }
 
+  
   synchronized void initSetupCleanupTasks(String jobFile) {
     if (!jobSetupCleanupNeeded) {
       // nothing to initialize
@@ -730,7 +736,7 @@
 
     // cleanup map tip. This map doesn't use any splits. Just assign an empty
     // split.
-    Job.RawSplit emptySplit = new Job.RawSplit();
+    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
     cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
             jobtracker, conf, this, numMapTasks, 1);
     cleanup[0].setJobCleanupTask();
@@ -3216,20 +3222,7 @@
         localFs.delete(localJobFile, true);
         localJobFile = null;
       }
-      if (localJarFile != null) {
-        localFs.delete(localJarFile, true);
-        localJarFile = null;
-      }
-
-      // clean up splits
-      for (int i = 0; i < maps.length; i++) {
-        maps[i].clearSplit();
-      }
 
-      // JobClient always creates a new directory with job files
-      // so we remove that directory to cleanup
-      // Delete temp dfs dirs created if any, like in case of 
-      // speculative exn of reduces.  
       Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
       new CleanupQueue().addToQueue(jobtracker.getFileSystem(tempDir), tempDir); 
     } catch (IOException e) {
@@ -3534,7 +3527,8 @@
    */
   private void generateJobTokens(Path jobDir) throws IOException{
     Path keysFile = new Path(jobDir, JobTokens.JOB_TOKEN_FILENAME);
-    FSDataOutputStream os = fs.create(keysFile);
+    // we need to create this file using the jobtracker's filesystem
+    FSDataOutputStream os = jobtracker.getFileSystem().create(keysFile);
     //create JobTokens file and add key to it
     JobTokens jt = new JobTokens();
     byte [] key;

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInfo.java?rev=892893&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInfo.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInfo.java Mon Dec 21 17:36:44 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Represents the basic information that is saved per a job when the 
+ * JobTracker receives a submitJob request. The information is saved
+ * so that the JobTracker can recover incomplete jobs upon restart.
+ */
+class JobInfo implements Writable {
+  private org.apache.hadoop.mapreduce.JobID id;
+  private Text user;
+  private Path jobSubmitDir;
+  public JobInfo() {}
+  
+  public JobInfo(org.apache.hadoop.mapreduce.JobID id, 
+      Text user,
+      Path jobSubmitDir) {
+    this.id = id;
+    this.user = user;
+    this.jobSubmitDir = jobSubmitDir;
+  }
+  
+  /**
+   * Get the job id.
+   */
+  public org.apache.hadoop.mapreduce.JobID getJobID() {
+    return id;
+  }
+  
+  /**
+   * Get the configured job's user-name.
+   */
+  public Text getUser() {
+    return user;
+  }
+      
+  /**
+   * Get the job submission directory
+   */
+  public Path getJobSubmitDir() {
+    return this.jobSubmitDir;
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    id = new org.apache.hadoop.mapreduce.JobID();
+    id.readFields(in);
+    user = new Text();
+    user.readFields(in);
+    jobSubmitDir = new Path(WritableUtils.readString(in));
+  }
+
+  public void write(DataOutput out) throws IOException {
+    id.write(out);
+    user.write(out);
+    WritableUtils.writeString(out, jobSubmitDir.toString());
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Dec 21 17:36:44 2009
@@ -64,6 +64,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -147,6 +148,8 @@
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
+  
+  static final String JOB_INFO_FILE = "job-info";
 
   private DNSToSwitchMapping dnsToSwitchMapping;
   NetworkTopology clusterMap = new NetworkTopology();
@@ -156,9 +159,9 @@
   private final List<JobInProgressListener> jobInProgressListeners =
     new CopyOnWriteArrayList<JobInProgressListener>();
 
-  // system directories are world-wide readable and owner readable
+  // system directory is completely owned by the JobTracker
   final static FsPermission SYSTEM_DIR_PERMISSION =
-    FsPermission.createImmutable((short) 0733); // rwx-wx-wx
+    FsPermission.createImmutable((short) 0700); // rwx------
 
   // system files should have 700 permission
   final static FsPermission SYSTEM_FILE_PERMISSION =
@@ -172,6 +175,8 @@
 
   private MRAsyncDiskService asyncDiskService;
   
+  private final String defaultStagingBaseDir;
+  
   /**
    * A client tried to submit a job before the Job Tracker was ready.
    */
@@ -991,36 +996,10 @@
       return jobsToRecover;
     }
 
-    /** Check if the given string represents a job-id or not 
-     */
-    private boolean isJobNameValid(String str) {
-      if(str == null) {
-        return false;
-      }
-      String[] parts = str.split("_");
-      if(parts.length == 3) {
-        if(parts[0].equals("job")) {
-            // other 2 parts should be parseable
-            return JobTracker.validateIdentifier(parts[1])
-                   && JobTracker.validateJobNumber(parts[2]);
-        }
-      }
-      return false;
-    }
-    
-    // checks if the job dir has the required files
-    public void checkAndAddJob(FileStatus status) throws IOException {
-      String fileName = status.getPath().getName();
-      if (isJobNameValid(fileName)) {
-        if (JobClient.isJobDirValid(status.getPath(), fs)) {
-          recoveryManager.addJobForRecovery(JobID.forName(fileName));
-          shouldRecover = true; // enable actual recovery if num-files > 1
-        } else {
-          LOG.info("Found an incomplete job directory " + fileName + "." 
-                   + " Deleting it!!");
-          fs.delete(status.getPath(), true);
-        }
-      }
+    // add the job
+    void addJobForRecovery(FileStatus status) throws IOException {
+      recoveryManager.addJobForRecovery(JobID.forName(status.getPath().getName()));
+      shouldRecover = true; // enable actual recovery if num-files > 1
     }
 
    
@@ -1127,7 +1106,16 @@
       for (JobID jobId : jobsToRecover) {
         LOG.info("Submitting job "+ jobId);
         try {
-          submitJob(jobId, restartCount);
+          Path jobInfoFile = getSystemFileForJob(jobId);
+          FSDataInputStream in = fs.open(jobInfoFile);
+          JobInfo token = new JobInfo();
+          token.readFields(in);
+          in.close();
+          UnixUserGroupInformation ugi = new UnixUserGroupInformation(
+              token.getUser().toString(), 
+              new String[]{UnixUserGroupInformation.DEFAULT_GROUP});
+          submitJob(token.getJobID(), restartCount, 
+              ugi, token.getJobSubmitDir().toString(), true);
           recovered++;
         } catch (Exception e) {
           LOG.warn("Could not recover job " + jobId, e);
@@ -1311,6 +1299,7 @@
     tasktrackerExpiryInterval = 0;
     myInstrumentation = new JobTrackerMetricsInst(this, new JobConf());
     mrOwner = null;
+    defaultStagingBaseDir = "/Users"; 
   }
 
   
@@ -1466,6 +1455,18 @@
         if(systemDir == null) {
           systemDir = new Path(getSystemDir());    
         }
+        try {
+          FileStatus systemDirStatus = fs.getFileStatus(systemDir);
+          if (!systemDirStatus.getOwner().equals(mrOwner.getUserName())) {
+            throw new AccessControlException("The systemdir " + systemDir + 
+                " is not owned by " + mrOwner.getUserName());
+          }
+          if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
+            LOG.warn("Incorrect permissions on " + systemDir + 
+                ". Setting it to " + SYSTEM_DIR_PERMISSION);
+            fs.setPermission(systemDir, SYSTEM_DIR_PERMISSION);
+          }
+        } catch (FileNotFoundException fnf) {} //ignore
         // Make sure that the backup data is preserved
         FileStatus[] systemDirData;
         try {
@@ -1480,7 +1481,7 @@
             && systemDirData != null) {
           for (FileStatus status : systemDirData) {
             try {
-              recoveryManager.checkAndAddJob(status);
+              recoveryManager.addJobForRecovery(status);
             } catch (Throwable t) {
               LOG.warn("Failed to add the job " + status.getPath().getName(), 
                        t);
@@ -1536,6 +1537,8 @@
 
     //initializes the job status store
     completedJobStatusStore = new CompletedJobStatusStore(conf);
+    Path homeDir = fs.getHomeDirectory();
+    defaultStagingBaseDir = homeDir.getParent().toString();
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -2871,8 +2874,8 @@
    * the JobTracker alone.
    */
   public synchronized org.apache.hadoop.mapreduce.JobStatus submitJob(
-      org.apache.hadoop.mapreduce.JobID jobId) throws IOException {
-    return submitJob(JobID.downgrade(jobId));
+    org.apache.hadoop.mapreduce.JobID jobId, String jobSubmitDir) throws IOException {
+    return submitJob(JobID.downgrade(jobId), jobSubmitDir);
   }
   
   /**
@@ -2883,45 +2886,49 @@
    * of the JobTracker.  But JobInProgress adds info that's useful for
    * the JobTracker alone.
    * @deprecated Use 
-   * {@link #submitJob(org.apache.hadoop.mapreduce.JobID)} instead
+   * {@link #submitJob(org.apache.hadoop.mapreduce.JobID, String)} instead
    */
   @Deprecated
-  public synchronized JobStatus submitJob(JobID jobId) throws IOException {
-    return submitJob(jobId, 0);
+  public synchronized JobStatus submitJob(JobID jobId, String jobSubmitDir) 
+  throws IOException {
+    return submitJob(jobId, 0, 
+        UserGroupInformation.getCurrentUGI(), 
+        jobSubmitDir, false);
   }
 
   /**
    * Submits either a new job or a job from an earlier run.
    */
-  private synchronized JobStatus submitJob(JobID jobId, 
-      int restartCount) throws IOException {
+  private synchronized JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobID, 
+      int restartCount, UserGroupInformation ugi, String jobSubmitDir, 
+      boolean recovered) throws IOException {
+    JobID jobId = JobID.downgrade(jobID);
     if(jobs.containsKey(jobId)) {
       //job already running, don't start twice
       return jobs.get(jobId).getStatus();
     }
-    
-    JobInProgress job = new JobInProgress(jobId, this, this.conf, restartCount);
+
+    //the conversion from String to Text for the UGI's username will
+    //not be required when we have the UGI to return us the username as
+    //Text.
+    JobInfo jobInfo = new JobInfo(jobId, new Text(ugi.getUserName()), 
+        new Path(jobSubmitDir));
+    JobInProgress job = new JobInProgress(this, this.conf, restartCount, jobInfo);
     
     String queue = job.getProfile().getQueueName();
     if(!(queueManager.getLeafQueueNames().contains(queue))) {
-      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw new IOException("Queue \"" + queue + "\" does not exist");        
     }
     
     //check if queue is RUNNING
     if(!queueManager.isRunning(queue)) {
-      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));      
       throw new IOException("Queue \"" + queue + "\" is not running");
     }
     try {
-      // check for access
-      UserGroupInformation ugi =
-        UserGroupInformation.readFrom(job.getJobConf());
       checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi);
     } catch (IOException ioe) {
-       LOG.warn("Access denied for user " + job.getJobConf().getUser() 
-                + ". Ignoring job " + jobId, ioe);
-      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
+      LOG.warn("Access denied for user " + job.getJobConf().getUser() 
+          + ". Ignoring job " + jobId, ioe);
       throw ioe;
     }
 
@@ -2930,11 +2937,19 @@
     try {
       checkMemoryRequirements(job);
     } catch (IOException ioe) {
-      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw ioe;
     }
 
-   return addJob(jobId, job); 
+    if (!recovered) {
+      //Store the information in a file so that the job can be recovered
+      //later (if at all)
+      Path jobDir = getSystemDirectoryForJob(jobId);
+      FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
+      FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
+      jobInfo.write(out);
+      out.close();
+    }
+    return addJob(jobId, job); 
   }
 
   /**
@@ -3600,6 +3615,17 @@
   }
   
   /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
+   */
+  public String getStagingAreaDir() {
+    Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, 
+        defaultStagingBaseDir));
+    String user = UserGroupInformation.getCurrentUGI().getUserName();
+    return fs.makeQualified(new Path(stagingRootDir, 
+                                user+"/.staging")).toString();
+  }
+  
+  /**
    * @see 
    * org.apache.hadoop.mapreduce.protocol.ClientProtocol#getJobHistoryDir()
    */
@@ -3614,10 +3640,15 @@
     return jobs.get(jobid);
   }
 
-  // Get the job directory in system directory
+  //Get the job directory in system directory
   Path getSystemDirectoryForJob(JobID id) {
     return new Path(getSystemDir(), id.toString());
   }
+  
+  //Get the job token file in system directory
+  Path getSystemFileForJob(JobID id) {
+    return new Path(getSystemDirectoryForJob(id)+"/" + JOB_INFO_FILE);
+  }
 
   /**
    * Change the run-time priority of the given job.
@@ -4304,6 +4335,8 @@
 
     //initializes the job status store
     completedJobStatusStore = new CompletedJobStatusStore(conf);
+    Path homeDir = fs.getHomeDirectory();
+    defaultStagingBaseDir = homeDir.getParent().toString();
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Dec 21 17:36:44 2009
@@ -23,19 +23,16 @@
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.QueueInfo;
@@ -48,7 +45,9 @@
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
 public class LocalJobRunner implements ClientProtocol {
@@ -60,6 +59,7 @@
   private JobConf conf;
   private int map_tasks = 0;
   private int reduce_tasks = 0;
+  final Random rand = new Random();
 
   private JobTrackerInstrumentation myMetrics = null;
 
@@ -68,33 +68,6 @@
   public long getProtocolVersion(String protocol, long clientVersion) {
     return ClientProtocol.versionID;
   }
-  
-  @SuppressWarnings("unchecked")
-  static RawSplit[] getRawSplits(JobContext jContext, JobConf job)
-      throws Exception {
-    JobConf jobConf = jContext.getJobConf();
-    org.apache.hadoop.mapreduce.InputFormat<?,?> input =
-      ReflectionUtils.newInstance(jContext.getInputFormatClass(), jobConf);
-
-    List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(jContext);
-    RawSplit[] rawSplits = new RawSplit[splits.size()];
-    DataOutputBuffer buffer = new DataOutputBuffer();
-    SerializationFactory factory = new SerializationFactory(jobConf);
-    Serializer serializer = 
-      factory.getSerializer(splits.get(0).getClass());
-    serializer.open(buffer);
-    for (int i = 0; i < splits.size(); i++) {
-      buffer.reset();
-      serializer.serialize(splits.get(i));
-      RawSplit rawSplit = new RawSplit();
-      rawSplit.setClassName(splits.get(i).getClass().getName());
-      rawSplit.setDataLength(splits.get(i).getLength());
-      rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-      rawSplit.setLocations(splits.get(i).getLocations());
-      rawSplits[i] = rawSplit;
-    }
-    return rawSplits;
-  }
 
   private class Job extends Thread implements TaskUmbilicalProtocol {
     // The job directory on the system: JobClient places job configurations here.
@@ -130,8 +103,8 @@
       return TaskUmbilicalProtocol.versionID;
     }
     
-    public Job(JobID jobid) throws IOException {
-      this.systemJobDir = new Path(getSystemDir(), jobid.toString());
+    public Job(JobID jobid, String jobSubmitDir) throws IOException {
+      this.systemJobDir = new Path(jobSubmitDir);
       this.systemJobFile = new Path(systemJobDir, "job.xml");
       this.id = jobid;
       JobConf conf = new JobConf(systemJobFile);
@@ -202,26 +175,10 @@
       JobContext jContext = new JobContextImpl(job, jobId);
       OutputCommitter outputCommitter = job.getOutputCommitter();
       try {
-        // split input into minimum number of splits
-        RawSplit[] rawSplits;
-        if (job.getUseNewMapper()) {
-          rawSplits = getRawSplits(jContext, job);
-        } else {
-          InputSplit[] splits = job.getInputFormat().getSplits(job, 1);
-          rawSplits = new RawSplit[splits.length];
-          DataOutputBuffer buffer = new DataOutputBuffer();
-          for (int i = 0; i < splits.length; i++) {
-            buffer.reset();
-            splits[i].write(buffer);
-            RawSplit rawSplit = new RawSplit();
-            rawSplit.setClassName(splits[i].getClass().getName());
-            rawSplit.setDataLength(splits[i].getLength());
-            rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-            rawSplit.setLocations(splits[i].getLocations());
-            rawSplits[i] = rawSplit;
-          }
-        }
+        TaskSplitMetaInfo[] taskSplitMetaInfos = 
+          SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
         
+    
         int numReduceTasks = job.getNumReduceTasks();
         if (numReduceTasks > 1 || numReduceTasks < 0) {
           // we only allow 0 or 1 reducer in local mode
@@ -233,15 +190,14 @@
 
         Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
             new HashMap<TaskAttemptID, MapOutputFile>();
-        for (int i = 0; i < rawSplits.length; i++) {
+        for (int i = 0; i < taskSplitMetaInfos.length; i++) {
           if (!this.isInterrupted()) {
             TaskAttemptID mapId = new TaskAttemptID(
                 new TaskID(jobId, TaskType.MAP, i),0);  
             mapIds.add(mapId);
             MapTask map = new MapTask(systemJobFile.toString(),  
                                       mapId, i,
-                                      rawSplits[i].getClassName(),
-                                      rawSplits[i].getBytes(), 1);
+                                      taskSplitMetaInfos[i].getSplitIndex(), 1);
             JobConf localConf = new JobConf(job);
             TaskRunner.setupChildMapredLocalDirs(map, localConf);
 
@@ -459,9 +415,9 @@
   }
 
   public org.apache.hadoop.mapreduce.JobStatus submitJob(
-      org.apache.hadoop.mapreduce.JobID jobid) 
+      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir) 
       throws IOException {
-    return new Job(JobID.downgrade(jobid)).status;
+    return new Job(JobID.downgrade(jobid), jobSubmitDir).status;
   }
 
   public void killJob(org.apache.hadoop.mapreduce.JobID id) {
@@ -564,6 +520,22 @@
     return fs.makeQualified(sysDir).toString();
   }
 
+  /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
+   */
+  public String getStagingAreaDir() {
+    Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, 
+        "/tmp/hadoop/mapred/staging"));
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+    String user;
+    if (ugi != null) {
+      user = ugi.getUserName() + rand.nextInt();
+    } else {
+      user = "dummy" + rand.nextInt();
+    }
+    return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
+  }
+  
   public String getJobHistoryDir() {
     return null;
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Dec 21 17:36:44 2009
@@ -33,6 +33,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -43,6 +44,7 @@
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
@@ -54,6 +56,10 @@
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -72,7 +78,7 @@
    */
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
 
-  private BytesWritable split = new BytesWritable();
+  private TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
   private String splitClass;
   private final static int APPROX_HEADER_LENGTH = 150;
 
@@ -81,7 +87,6 @@
   private Progress mapPhase;
   private Progress sortPhase;
   
-  
   {   // set phase for this task
     setPhase(TaskStatus.Phase.MAP); 
     getProgress().setStatus("map");
@@ -92,11 +97,10 @@
   }
 
   public MapTask(String jobFile, TaskAttemptID taskId, 
-                 int partition, String splitClass, BytesWritable split,
+                 int partition, TaskSplitIndex splitIndex,
                  int numSlotsRequired) {
     super(jobFile, taskId, partition, numSlotsRequired);
-    this.splitClass = splitClass;
-    this.split = split;
+    this.splitMetaInfo = splitIndex;
   }
 
   @Override
@@ -108,26 +112,26 @@
   public void localizeConfiguration(JobConf conf)
       throws IOException {
     super.localizeConfiguration(conf);
-    // split.dta file is used only by IsolationRunner.
+    // split.dta/split.info files are used only by IsolationRunner.
     // Write the split file to the local disk if it is a normal map task (not a
     // job-setup or a job-cleanup task) and if the user wishes to run
     // IsolationRunner either by setting keep.failed.tasks.files to true or by
     // using keep.tasks.files.pattern
-    if (isMapOrReduce()
-        && (conf.getKeepTaskFilesPattern() != null || conf
-            .getKeepFailedTaskFiles())) {
-      Path localSplit =
+    if (supportIsolationRunner(conf) && isMapOrReduce()) {
+      // localize the split meta-information 
+      Path localSplitMeta =
           new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathForWrite(
-              TaskTracker.getLocalSplitFile(conf.getUser(), getJobID()
-                  .toString(), getTaskID().toString()), conf);
-      LOG.debug("Writing local split to " + localSplit);
-      DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
-      Text.writeString(out, splitClass);
-      split.write(out);
+              TaskTracker.getLocalSplitMetaFile(conf.getUser(), 
+                getJobID().toString(), getTaskID()
+                  .toString()), conf);
+      LOG.debug("Writing local split to " + localSplitMeta);
+      DataOutputStream out = FileSystem.getLocal(conf).create(localSplitMeta);
+      splitMetaInfo.write(out);
       out.close();
     }
   }
   
+  
   @Override
   public TaskRunner createRunner(TaskTracker tracker, 
       TaskTracker.TaskInProgress tip) {
@@ -138,9 +142,8 @@
   public void write(DataOutput out) throws IOException {
     super.write(out);
     if (isMapOrReduce()) {
-      Text.writeString(out, splitClass);
-      split.write(out);
-      split = null;
+      splitMetaInfo.write(out);
+      splitMetaInfo = null;
     }
   }
   
@@ -148,8 +151,7 @@
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     if (isMapOrReduce()) {
-      splitClass = Text.readString(in);
-      split.readFields(in);
+      splitMetaInfo.readFields(in);
     }
   }
 
@@ -320,36 +322,52 @@
     }
 
     if (useNewApi) {
-      runNewMapper(job, split, umbilical, reporter);
+      runNewMapper(job, splitMetaInfo, umbilical, reporter);
     } else {
-      runOldMapper(job, split, umbilical, reporter);
+      runOldMapper(job, splitMetaInfo, umbilical, reporter);
     }
     done(umbilical, reporter);
   }
 
+ @SuppressWarnings("unchecked")
+ private <T> T getSplitDetails(Path file, long offset) 
+  throws IOException {
+   FileSystem fs = file.getFileSystem(conf);
+   FSDataInputStream inFile = fs.open(file);
+   inFile.seek(offset);
+   String className = Text.readString(inFile);
+   Class<T> cls;
+   try {
+     cls = (Class<T>) conf.getClassByName(className);
+   } catch (ClassNotFoundException ce) {
+     IOException wrap = new IOException("Split class " + className + 
+                                         " not found");
+     wrap.initCause(ce);
+     throw wrap;
+   }
+   SerializationFactory factory = new SerializationFactory(conf);
+   Deserializer<T> deserializer = 
+     (Deserializer<T>) factory.getDeserializer(cls);
+   deserializer.open(inFile);
+   T split = deserializer.deserialize(null);
+   long pos = inFile.getPos();
+   getCounters().findCounter(
+       TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
+   inFile.close();
+   return split;
+ }
+  
   @SuppressWarnings("unchecked")
   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
   void runOldMapper(final JobConf job,
-                    final BytesWritable rawSplit,
+                    final TaskSplitIndex splitIndex,
                     final TaskUmbilicalProtocol umbilical,
                     TaskReporter reporter
                     ) throws IOException, InterruptedException,
                              ClassNotFoundException {
-    InputSplit inputSplit = null;
-    // reinstantiate the split
-    try {
-      inputSplit = (InputSplit) 
-        ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
-    } catch (ClassNotFoundException exp) {
-      IOException wrap = new IOException("Split class " + splitClass + 
-                                         " not found");
-      wrap.initCause(exp);
-      throw wrap;
-    }
-    DataInputBuffer splitBuffer = new DataInputBuffer();
-    splitBuffer.reset(split.getBytes(), 0, split.getLength());
-    inputSplit.readFields(splitBuffer);
-    
+    InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
+           splitIndex.getStartOffset());
+
     updateJobWithSplit(job, inputSplit);
     reporter.setInputSplit(inputSplit);
 
@@ -578,7 +596,7 @@
   @SuppressWarnings("unchecked")
   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
   void runNewMapper(final JobConf job,
-                    final BytesWritable rawSplit,
+                    final TaskSplitIndex splitIndex,
                     final TaskUmbilicalProtocol umbilical,
                     TaskReporter reporter
                     ) throws IOException, ClassNotFoundException,
@@ -597,15 +615,8 @@
         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
     // rebuild the input split
     org.apache.hadoop.mapreduce.InputSplit split = null;
-    DataInputBuffer splitBuffer = new DataInputBuffer();
-    splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
-    SerializationFactory factory = new SerializationFactory(job);
-    Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>
-      deserializer = 
-        (Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>) 
-        factory.getDeserializer(job.getClassByName(splitClass));
-    deserializer.open(splitBuffer);
-    split = deserializer.deserialize(null);
+    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
+        splitIndex.getStartOffset());
 
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
       new NewTrackingRecordReader<INKEY,INVALUE>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Dec 21 17:36:44 2009
@@ -922,8 +922,22 @@
                             + JobStatus.State.FAILED + " or "
                             + JobStatus.State.KILLED);
     }
+    
+    // delete the staging area for the job
+    JobConf conf = new JobConf(jobContext.getConfiguration());
+    if (!supportIsolationRunner(conf)) {
+      String jobTempDir = conf.get("mapreduce.job.dir");
+      Path jobTempDirPath = new Path(jobTempDir);
+      FileSystem fs = jobTempDirPath.getFileSystem(conf);
+      fs.delete(jobTempDirPath, true);
+    }
     done(umbilical, reporter);
   }
+  
+  protected boolean supportIsolationRunner(JobConf conf) {
+    return (conf.getKeepTaskFilesPattern() != null || conf
+        .getKeepFailedTaskFiles());
+  }
 
   protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
                              TaskReporter reporter

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon Dec 21 17:36:44 2009
@@ -31,13 +31,12 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
 import org.apache.hadoop.mapred.SortedRanges.Range;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.net.Node;
 
 
@@ -65,7 +64,7 @@
 
   // Defines the TIP
   private String jobFile = null;
-  private Job.RawSplit rawSplit;
+  private TaskSplitMetaInfo splitInfo;
   private int numMaps;
   private int partition;
   private JobTracker jobtracker;
@@ -140,12 +139,12 @@
    * Constructor for MapTask
    */
   public TaskInProgress(JobID jobid, String jobFile, 
-                        Job.RawSplit rawSplit, 
+                        TaskSplitMetaInfo split, 
                         JobTracker jobtracker, JobConf conf, 
                         JobInProgress job, int partition,
                         int numSlotsRequired) {
     this.jobFile = jobFile;
-    this.rawSplit = rawSplit;
+    this.splitInfo = split;
     this.jobtracker = jobtracker;
     this.job = job;
     this.conf = conf;
@@ -316,7 +315,7 @@
    * Whether this is a map task
    */
   public boolean isMapTask() {
-    return rawSplit != null;
+    return splitInfo != null;
   }
     
   /**
@@ -807,7 +806,7 @@
    */
   public String[] getSplitLocations() {
     if (isMapTask() && !jobSetup && !jobCleanup) {
-      return rawSplit.getLocations();
+      return splitInfo.getLocations();
     }
     return new String[0];
   }
@@ -1012,16 +1011,8 @@
     if (isMapTask()) {
       LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
           + failedRanges.getIndicesCount());
-      String splitClass = null;
-      BytesWritable split;
-      if (!jobSetup && !jobCleanup) {
-        splitClass = rawSplit.getClassName();
-        split = rawSplit.getBytes();
-      } else {
-        split = new BytesWritable();
-      }
-      t = new MapTask(jobFile, taskid, partition, splitClass, split,
-                      numSlotsNeeded);
+      t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(),
+          numSlotsNeeded);
     } else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
     }
@@ -1134,7 +1125,7 @@
     if (!isMapTask() || jobSetup || jobCleanup) {
       return "";
     }
-    String[] splits = rawSplit.getLocations();
+    String[] splits = splitInfo.getLocations();
     Node[] nodes = new Node[splits.length];
     for (int i = 0; i < splits.length; i++) {
       nodes[i] = jobtracker.getNode(splits[i]);
@@ -1164,16 +1155,12 @@
 
   public long getMapInputSize() {
     if(isMapTask() && !jobSetup && !jobCleanup) {
-      return rawSplit.getDataLength();
+      return splitInfo.getInputDataLength();
     } else {
       return 0;
     }
   }
   
-  public void clearSplit() {
-    rawSplit.clearBytes();
-  }
-  
   /**
    * Compare most recent task attempts dispatch time to current system time so
    * that task progress rate will slow down as time proceeds even if no progress

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Dec 21 17:36:44 2009
@@ -86,6 +86,8 @@
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ConfiguredPolicy;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
@@ -218,6 +220,7 @@
   static final String OUTPUT = "output";
   private static final String JARSDIR = "jars";
   static final String LOCAL_SPLIT_FILE = "split.dta";
+  static final String LOCAL_SPLIT_META_FILE = "split.info";
   static final String JOBFILE = "job.xml";
   static final String JOB_TOKEN_FILE="jobToken"; //localized file
 
@@ -474,11 +477,16 @@
     return getLocalJobDir(user, jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
   }
 
-  static String getLocalSplitFile(String user, String jobid, String taskid) {
+  static String getLocalSplitMetaFile(String user, String jobid, String taskid){
     return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
-        + TaskTracker.LOCAL_SPLIT_FILE;
+        + TaskTracker.LOCAL_SPLIT_META_FILE;
   }
 
+  static String getLocalSplitFile(String user, String jobid, String taskid) {
+    return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
+           + TaskTracker.LOCAL_SPLIT_FILE;
+  }
+  
   static String getIntermediateOutputDir(String user, String jobid,
       String taskid) {
     return getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
@@ -898,6 +906,13 @@
     launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
   }
 
+  private void setUgi(String user, Configuration conf) {
+    //The dummy-group used here will not be required once we have UGI
+    //object creation with just the user name.
+    conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, 
+        user+","+UnixUserGroupInformation.DEFAULT_GROUP);
+  }
+  
   /**
    * Localize the job on this tasktracker. Specifically
    * <ul>
@@ -940,6 +955,7 @@
     }
     System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
     localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
+    setUgi(userName, localJobConf);
 
     // Download the job.jar for this job from the system FS
     localizeJobJarFile(userName, jobId, localFs, localJobConf);
@@ -958,12 +974,17 @@
    */
   private Path localizeJobConfFile(Path jobFile, String user, JobID jobId)
       throws IOException {
-    // Get sizes of JobFile and JarFile
+    JobConf conf = new JobConf(getJobConf());
+    setUgi(user, conf);
+    
+    FileSystem userFs = jobFile.getFileSystem(conf);
+    // Get sizes of JobFile
     // sizes are -1 if they are not present.
     FileStatus status = null;
     long jobFileSize = -1;
     try {
-      status = systemFS.getFileStatus(jobFile);
+      
+      status = userFs.getFileStatus(jobFile);
       jobFileSize = status.getLen();
     } catch(FileNotFoundException fe) {
       jobFileSize = -1;
@@ -974,7 +995,7 @@
             jobFileSize, fConf);
 
     // Download job.xml
-    systemFS.copyToLocalFile(jobFile, localJobFile);
+    userFs.copyToLocalFile(jobFile, localJobFile);
     return localJobFile;
   }
 
@@ -996,8 +1017,9 @@
     long jarFileSize = -1;
     if (jarFile != null) {
       Path jarFilePath = new Path(jarFile);
+      FileSystem fs = jarFilePath.getFileSystem(localJobConf);
       try {
-        status = systemFS.getFileStatus(jarFilePath);
+        status = fs.getFileStatus(jarFilePath);
         jarFileSize = status.getLen();
       } catch (FileNotFoundException fe) {
         jarFileSize = -1;
@@ -1009,7 +1031,7 @@
               getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf);
 
       // Download job.jar
-      systemFS.copyToLocalFile(jarFilePath, localJarFile);
+      fs.copyToLocalFile(jarFilePath, localJarFile);
 
       localJobConf.setJar(localJarFile.toString());
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java Mon Dec 21 17:36:44 2009
@@ -46,6 +46,7 @@
   private Configuration conf;
   private FileSystem fs = null;
   private Path sysDir = null;
+  private Path stagingAreaDir = null;
   private Path jobHistoryDir = null;
 
   static {
@@ -76,6 +77,7 @@
     ClientProtocol client;
     String tracker = conf.get("mapred.job.tracker", "local");
     if ("local".equals(tracker)) {
+      conf.setInt("mapreduce.job.maps", 1);
       client = new LocalJobRunner(conf);
     } else {
       client = createRPCProxy(JobTracker.getAddress(conf), conf);
@@ -222,6 +224,19 @@
     }
     return sysDir;
   }
+  
+  /**
+   * Grab the jobtracker's view of the staging directory path where 
+   * job-specific files will  be placed.
+   * 
+   * @return the staging directory where job-specific files are to be placed.
+   */
+  public Path getStagingAreaDir() throws IOException, InterruptedException {
+    if (stagingAreaDir == null) {
+      stagingAreaDir = new Path(client.getStagingAreaDir());
+    }
+    return stagingAreaDir;
+  }
 
   /**
    * Get the job history file path for a given job id. The job history file at 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java Mon Dec 21 17:36:44 2009
@@ -20,8 +20,6 @@
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,7 +28,6 @@
 import java.io.OutputStreamWriter;
 import java.net.URL;
 import java.net.URLConnection;
-import java.util.Arrays;
 import java.net.URI;
 
 import javax.security.auth.login.LoginException;
@@ -40,12 +37,9 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
@@ -956,7 +950,7 @@
     ensureState(JobState.DEFINE);
     setUseNewAPI();
     status = new JobSubmitter(cluster.getFileSystem(),
-      cluster.getClient()).submitJobInternal(this);
+      cluster.getClient()).submitJobInternal(this, cluster);
     state = JobState.RUNNING;
    }
   
@@ -1211,106 +1205,4 @@
     }
     return ugi;
   }
-
-  /**
-   * Read a splits file into a list of raw splits.
-   * 
-   * @param in the stream to read from
-   * @return the complete list of splits
-   * @throws IOException
-   */
-  public static RawSplit[] readSplitFile(DataInput in) throws IOException {
-    byte[] header = new byte[JobSubmitter.SPLIT_FILE_HEADER.length];
-    in.readFully(header);
-    if (!Arrays.equals(JobSubmitter.SPLIT_FILE_HEADER, header)) {
-      throw new IOException("Invalid header on split file");
-    }
-    int vers = WritableUtils.readVInt(in);
-    if (vers != JobSubmitter.CURRENT_SPLIT_FILE_VERSION) {
-      throw new IOException("Unsupported split version " + vers);
-    }
-    int len = WritableUtils.readVInt(in);
-    RawSplit[] result = new RawSplit[len];
-    for (int i=0; i < len; ++i) {
-      result[i] = new RawSplit();
-      result[i].readFields(in);
-    }
-    return result;
-  }
-
-  public static class RawSplit implements Writable {
-    private String splitClass;
-    private BytesWritable bytes = new BytesWritable();
-    private String[] locations;
-    long dataLength;
-
-    public RawSplit() {
-    }
-    
-    protected RawSplit(String splitClass, BytesWritable bytes,
-        String[] locations, long dataLength) {
-      this.splitClass = splitClass;
-      this.bytes = bytes;
-      this.locations = locations;
-      this.dataLength = dataLength;
-    }
-
-    public void setBytes(byte[] data, int offset, int length) {
-      bytes.set(data, offset, length);
-    }
-
-    public void setClassName(String className) {
-      splitClass = className;
-    }
-      
-    public String getClassName() {
-      return splitClass;
-    }
-      
-    public BytesWritable getBytes() {
-      return bytes;
-    }
-
-    public void clearBytes() {
-      bytes = null;
-    }
-      
-    public void setLocations(String[] locations) {
-      this.locations = locations;
-    }
-      
-    public String[] getLocations() {
-      return locations;
-    }
-
-    public long getDataLength() {
-      return dataLength;
-    }
-
-    public void setDataLength(long l) {
-      dataLength = l;
-    }
-      
-    public void readFields(DataInput in) throws IOException {
-      splitClass = Text.readString(in);
-      dataLength = in.readLong();
-      bytes.readFields(in);
-      int len = WritableUtils.readVInt(in);
-      locations = new String[len];
-      for (int i=0; i < len; ++i) {
-        locations[i] = Text.readString(in);
-      }
-    }
-      
-    public void write(DataOutput out) throws IOException {
-      Text.writeString(out, splitClass);
-      out.writeLong(dataLength);
-      bytes.write(out);
-      WritableUtils.writeVInt(out, locations.length);
-      for (int i = 0; i < locations.length; i++) {
-        Text.writeString(out, locations[i]);
-      }        
-    }
-  }
-
 }



Mime
View raw message