hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077108 [1/3] - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ contrib/gridmix/src/test/org/apache/hadoop/...
Date Fri, 04 Mar 2011 03:41:32 GMT
Author: omalley
Date: Fri Mar  4 03:41:31 2011
New Revision: 1077108

URL: http://svn.apache.org/viewvc?rev=1077108&view=rev
Log:
commit 1e1dacdac3a62a5bea43e762a12af8fb3bd84d55
Author: Devaraj Das <ddas@yahoo-inc.com>
Date:   Tue Jan 12 15:43:25 2010 -0800

    MAPREDUCE:181 from https://issues.apache.org/jira/secure/attachment/12430064/181.20.s.3.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-181. Changes the job submission process to be secure.
    +    (Devaraj Das)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInfo.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/cli/testConf.xml
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSubmitJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCh.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java
    hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/HadoopArchives.java
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/analysejobhistory.jsp
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobdetailshistory.jsp
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtaskshistory.jsp
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/taskdetailshistory.jsp

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar  4 03:41:31 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configurat
 
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 
 public class TestCapacityScheduler extends TestCase {
 
@@ -202,8 +203,8 @@ public class TestCapacityScheduler exten
         }
       }
       TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 
-                              super.numSlotsPerMap, getJobConf().getUser()) {
+      Task task = new MapTask("", attemptId, 0, new JobSplit.TaskSplitIndex(), 
+                              super.numSlotsPerMap) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -245,7 +246,7 @@ public class TestCapacityScheduler exten
         }
       }
       TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
-      Task task = new ReduceTask("", attemptId, 0, 10, super.numSlotsPerReduce, getJobConf().getUser()) {
+      Task task = new ReduceTask("", attemptId, 0, 10, super.numSlotsPerReduce) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -337,7 +338,7 @@ public class TestCapacityScheduler exten
     
     FakeTaskInProgress(JobID jId, JobConf jobConf, Task t, 
         boolean isMap, FakeJobInProgress job) {
-      super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0, 1);
+      super(jId, "", JobSplit.EMPTY_TASK_SPLIT, null, jobConf, job, 0, 1);
       this.isMap = isMap;
       this.fakeJob = job;
       activeTasks = new TreeMap<TaskAttemptID, String>();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java Fri Mar  4 03:41:31 2011
@@ -19,8 +19,11 @@ package org.apache.hadoop.mapred;
 
 import java.util.Properties;
 import org.apache.hadoop.mapred.ControlledMapReduceJob.ControlledMapReduceJobRunner;
+import org.junit.*;
 
-
+/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
+ */
+@Ignore
 public class TestJobTrackerRestartWithCS extends ClusterWithCapacityScheduler {
 
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Mar  4 03:41:31 2011
@@ -34,6 +34,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 
 public class TestFairScheduler extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -69,7 +70,8 @@ public class TestFairScheduler extends T
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(true);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1, getJobConf().getUser()) {
+      Task task = new MapTask("", attemptId, 0, new JobSplit.TaskSplitIndex(),
+          1) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -84,7 +86,7 @@ public class TestFairScheduler extends T
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(false);
-      Task task = new ReduceTask("", attemptId, 0, 10, 1, getJobConf().getUser()) {
+      Task task = new ReduceTask("", attemptId, 0, 10, 1) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Mar  4 03:41:31 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.JobClien
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -165,7 +166,8 @@ public class TestGridmixSubmission {
         switch (type) {
           case MAP:
              runInputBytes[i] = counters.findCounter("FileSystemCounters",
-                 "HDFS_BYTES_READ").getValue();
+                 "HDFS_BYTES_READ").getValue() - 
+                 counters.findCounter(Task.Counter.SPLIT_RAW_BYTES).getValue();
              runInputRecords[i] =
                (int)counters.findCounter(MAP_INPUT_RECORDS).getValue();
              runOutputBytes[i] =

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar  4 03:41:31 2011
@@ -125,9 +125,18 @@
 </property>
 
 <property>
-  <name>mapred.system.dir</name>
+  <name>mapreduce.jobtracker.system.dir</name>
   <value>${hadoop.tmp.dir}/mapred/system</value>
-  <description>The shared directory where MapReduce stores control files.
+  <description>The directory where MapReduce stores control files.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.jobtracker.staging.root.dir</name>
+  <value>${hadoop.tmp.dir}/mapred/staging</value>
+  <description>The root of the staging area for users' job files
+  In practice, this should be the directory where users' home 
+  directories are located (usually /user)
   </description>
 </property>
 
@@ -249,6 +258,16 @@
 </property>
 
 <property>
+  <name>mapreduce.job.split.metainfo.maxsize</name>
+  <value>10000000</value>
+  <description>The maximum permissible size of the split metainfo file.
+  The JobTracker won't attempt to read split metainfo files bigger than
+  the configured value.
+  No limits if set to -1.
+  </description>
+</property>
+
+<property>
   <name>mapred.jobtracker.taskScheduler</name>
   <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
   <description>The class responsible for scheduling the tasks.</description>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java Fri Mar  4 03:41:31 2011
@@ -84,7 +84,8 @@ class HistoryViewer {
       }
       jobLogFile = jobFiles[0].toString();
       String[] jobDetails = 
-          JobInfo.decodeJobHistoryFileName(jobFiles[0].getName()).split("_");
+          JobHistory.JobInfo.decodeJobHistoryFileName(jobFiles[0].getName()).
+                             split("_");
       trackerHostName = jobDetails[0];
       trackerStartTime = jobDetails[1];
       jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
@@ -155,7 +156,7 @@ class HistoryViewer {
     System.out.println(jobDetails.toString());
   }
   
-  private void printCounters(StringBuffer buff, JobInfo job) 
+  private void printCounters(StringBuffer buff, JobHistory.JobInfo job) 
       throws ParseException {
     Counters mapCounters = 
       Counters.fromEscapedCompactString(job.get(Keys.MAP_COUNTERS));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Fri Mar  4 03:41:31 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 
 public class IsolationRunner {
   private static final Log LOG = 
@@ -199,20 +200,19 @@ public class IsolationRunner {
     
     Task task;
     if (isMap) {
-      Path localSplit = new Path(new Path(jobFilename.toString()).getParent(), 
-                                 "split.dta");
-      DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
-      String splitClass = Text.readString(splitFile);
-      BytesWritable split = new BytesWritable();
-      split.readFields(splitFile);
+      Path localMetaSplit = 
+        new Path(new Path(jobFilename.toString()).getParent(), "split.info");
+      DataInputStream splitFile = FileSystem.getLocal(conf).open(localMetaSplit);
+      TaskSplitIndex splitIndex = new TaskSplitIndex();
+      splitIndex.readFields(splitFile);
       splitFile.close();
-      task = new MapTask(jobFilename.toString(), taskId, partition, 
-                         splitClass, split, 1, conf.getUser());
+      task =
+        new MapTask(jobFilename.toString(), taskId, partition, splitIndex, 1);
     } else {
       int numMaps = conf.getNumMapTasks();
       fillInMissingMapOutputs(local, taskId, numMaps, conf);
       task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps, 
-                            1, conf.getUser());
+                            1);
     }
     task.setConf(conf);
     task.run(conf, new FakeUmbilical());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar  4 03:41:31 2011
@@ -65,6 +65,12 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -389,6 +395,7 @@ public class JobClient extends Configure
 
   private JobSubmissionProtocol jobSubmitClient;
   private Path sysDir = null;
+  private Path stagingAreaDir = null;
   
   private FileSystem fs = null;
 
@@ -418,6 +425,7 @@ public class JobClient extends Configure
   public void init(JobConf conf) throws IOException {
     String tracker = conf.get("mapred.job.tracker", "local");
     if ("local".equals(tracker)) {
+      conf.setNumMapTasks(1);
       this.jobSubmitClient = new LocalJobRunner(conf);
     } else {
       this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
@@ -529,11 +537,23 @@ public class JobClient extends Configure
   /**
    * configure the jobconf of the user with the command line options of 
    * -libjars, -files, -archives
-   * @param conf
+   * @param job the JobConf
+   * @param submitJobDir
    * @throws IOException
    */
-  private void configureCommandLineOptions(JobConf job, Path submitJobDir, Path submitJarFile) 
-    throws IOException {
+  private void copyAndConfigureFiles(JobConf job, Path jobSubmitDir) 
+  throws IOException {
+    short replication = (short)job.getInt("mapred.submit.replication", 10);
+    copyAndConfigureFiles(job, jobSubmitDir, replication);
+
+    // Set the working directory
+    if (job.getWorkingDirectory() == null) {
+      job.setWorkingDirectory(fs.getWorkingDirectory());          
+    }
+  }
+  
+  private void copyAndConfigureFiles(JobConf job, Path submitJobDir, 
+      short replication) throws IOException {
     
     if (!(job.getBoolean("mapred.used.genericoptionsparser", false))) {
       LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
@@ -566,15 +586,18 @@ public class JobClient extends Configure
     // Create a number of filenames in the JobTracker's fs namespace
     FileSystem fs = getFs();
     LOG.debug("default FileSystem: " + fs.getUri());
-    fs.delete(submitJobDir, true);
+    if (fs.exists(submitJobDir)) {
+      throw new IOException("Not submitting job. Job directory " + submitJobDir
+          +" already exists!! This is unexpected.Please check what's there in" +
+          " that directory");
+    }
     submitJobDir = fs.makeQualified(submitJobDir);
     submitJobDir = new Path(submitJobDir.toUri().getPath());
-    FsPermission mapredSysPerms = new FsPermission(JOB_DIR_PERMISSION);
+    FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
     FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms);
-    Path filesDir = new Path(submitJobDir, "files");
-    Path archivesDir = new Path(submitJobDir, "archives");
-    Path libjarsDir = new Path(submitJobDir, "libjars");
-    short replication = (short)job.getInt("mapred.submit.replication", 10);
+    Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
+    Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
+    Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
     // add all the command line files/ jars and archive
     // first copy them to jobtrackers filesystem 
     
@@ -601,7 +624,8 @@ public class JobClient extends Configure
       for (String tmpjars: libjarsArr) {
         Path tmp = new Path(tmpjars);
         Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
-        DistributedCache.addArchiveToClassPath(newPath, job);
+        DistributedCache.addArchiveToClassPath(
+            new Path(newPath.toUri().getPath()), job);
       }
     }
     
@@ -653,10 +677,12 @@ public class JobClient extends Configure
       if ("".equals(job.getJobName())){
         job.setJobName(new Path(originalJarPath).getName());
       }
+      Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
       job.setJar(submitJarFile.toString());
       fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
       fs.setReplication(submitJarFile, replication);
-      fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
+      fs.setPermission(submitJarFile, 
+          new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
     } else {
       LOG.warn("No job jar file set.  User classes may not be found. "+
                "See JobConf(Class) or JobConf#setJar(String).");
@@ -667,10 +693,6 @@ public class JobClient extends Configure
     if (ugi.getGroupNames().length > 0) {
       job.set("group.name", ugi.getGroupNames()[0]);
     }
-    if (job.getWorkingDirectory() == null) {
-      job.setWorkingDirectory(fs.getWorkingDirectory());          
-    }
-
   }
 
   private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
@@ -704,15 +726,7 @@ public class JobClient extends Configure
     JobConf job = new JobConf(jobFile);
     return submitJob(job);
   }
-    
-  // job files are world-wide readable and owner writable
-  final private static FsPermission JOB_FILE_PERMISSION = 
-    FsPermission.createImmutable((short) 0644); // rw-r--r--
-
-  // job submission directory is world readable/writable/executable
-  final static FsPermission JOB_DIR_PERMISSION =
-    FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
-   
+      
   /**
    * Submit a job to the MR system.
    * This returns a handle to the {@link RunningJob} which can be used to track
@@ -753,66 +767,100 @@ public class JobClient extends Configure
     /*
      * configure the command line options correctly on the submitting dfs
      */
-    
+    Path jobStagingArea = JobSubmissionFiles.getStagingDir(this, job);
     JobID jobId = jobSubmitClient.getNewJobId();
-    Path submitJobDir = new Path(getSystemDir(), jobId.toString());
-    Path submitJarFile = new Path(submitJobDir, "job.jar");
-    Path submitSplitFile = new Path(submitJobDir, "job.split");
-    configureCommandLineOptions(job, submitJobDir, submitJarFile);
-    Path submitJobFile = new Path(submitJobDir, "job.xml");
-    int reduces = job.getNumReduceTasks();
-    JobContext context = new JobContext(job, jobId);
-    
-    // Check the output specification
-    if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
-      org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
-        ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
-      output.checkOutputSpecs(context);
-    } else {
-      job.getOutputFormat().checkOutputSpecs(fs, job);
-    }
+    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
+    job.set("mapreduce.job.dir", submitJobDir.toString());
+    JobStatus status = null;
+    try {
+      copyAndConfigureFiles(job, submitJobDir);
+      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
+      int reduces = job.getNumReduceTasks();
+      JobContext context = new JobContext(job, jobId);
+
+      // Check the output specification
+      if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
+        org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
+          ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
+        output.checkOutputSpecs(context);
+      } else {
+        job.getOutputFormat().checkOutputSpecs(fs, job);
+      }
 
-    // Create the splits for the job
-    LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
-    int maps;
-    if (job.getUseNewMapper()) {
-      maps = writeNewSplits(context, submitSplitFile);
-    } else {
-      maps = writeOldSplits(job, submitSplitFile);
-    }
-    job.set("mapred.job.split.file", submitSplitFile.toString());
-    job.setNumMapTasks(maps);
-        
-    // Write job file to JobTracker's fs        
-    FSDataOutputStream out = 
-      FileSystem.create(fs, submitJobFile,
-                        new FsPermission(JOB_FILE_PERMISSION));
+      // Create the splits for the job
+      LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
+      int maps = writeSplits(context, submitJobDir);
+      job.setNumMapTasks(maps);
+
+      // Write job file to JobTracker's fs        
+      FSDataOutputStream out = 
+        FileSystem.create(fs, submitJobFile,
+            new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
 
-    try {
-      job.writeXml(out);
+      try {
+        job.writeXml(out);
+      } finally {
+        out.close();
+      }
+
+      //
+      // Now, actually submit the job (using the submit name)
+      //
+      status = jobSubmitClient.submitJob(jobId, submitJobDir.toString());
+      if (status != null) {
+        return new NetworkedJob(status);
+      } else {
+        throw new IOException("Could not launch job");
+      }
     } finally {
-      out.close();
+      if (status == null) {
+        LOG.info("Cleaning up the staging area " + submitJobDir);
+        fs.delete(submitJobDir, true);
+      }
     }
+  }
 
-    //
-    // Now, actually submit the job (using the submit name)
-    //
-    JobStatus status = jobSubmitClient.submitJob(jobId);
-    if (status != null) {
-      return new NetworkedJob(status);
+  @SuppressWarnings("unchecked")
+  private <T extends InputSplit>
+  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    Configuration conf = job.getConfiguration();
+    InputFormat<?, ?> input =
+      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
+
+    List<InputSplit> splits = input.getSplits(job);
+    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
+
+    // sort the splits into order based on size, so that the biggest
+    // go first
+    Arrays.sort(array, new SplitComparator());
+    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, array);
+    return array.length;
+  }
+  
+  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
+      Path jobSubmitDir) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    JobConf jConf = (JobConf)job.getConfiguration();
+    int maps;
+    if (jConf.getUseNewMapper()) {
+      maps = writeNewSplits(job, jobSubmitDir);
     } else {
-      throw new IOException("Could not launch job");
+      maps = writeOldSplits(jConf, jobSubmitDir);
     }
+    return maps;
   }
-
-  private int writeOldSplits(JobConf job, 
-                             Path submitSplitFile) throws IOException {
-    InputSplit[] splits = 
-      job.getInputFormat().getSplits(job, job.getNumMapTasks());
+  
+  //method to write splits for old api mapper.
+  private int writeOldSplits(JobConf job, Path jobSubmitDir) 
+  throws IOException {
+    org.apache.hadoop.mapred.InputSplit[] splits =
+    job.getInputFormat().getSplits(job, job.getNumMapTasks());
     // sort the splits into order based on size, so that the biggest
     // go first
-    Arrays.sort(splits, new Comparator<InputSplit>() {
-      public int compare(InputSplit a, InputSplit b) {
+    Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() {
+      public int compare(org.apache.hadoop.mapred.InputSplit a,
+                         org.apache.hadoop.mapred.InputSplit b) {
         try {
           long left = a.getLength();
           long right = b.getLength();
@@ -824,37 +872,17 @@ public class JobClient extends Configure
             return -1;
           }
         } catch (IOException ie) {
-          throw new RuntimeException("Problem getting input split size",
-                                     ie);
+          throw new RuntimeException("Problem getting input split size", ie);
         }
       }
     });
-    DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile, splits.length);
-    
-    try {
-      DataOutputBuffer buffer = new DataOutputBuffer();
-      RawSplit rawSplit = new RawSplit();
-      for(InputSplit split: splits) {
-        rawSplit.setClassName(split.getClass().getName());
-        buffer.reset();
-        split.write(buffer);
-        rawSplit.setDataLength(split.getLength());
-        rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-        rawSplit.setLocations(split.getLocations());
-        rawSplit.write(out);
-      }
-    } finally {
-      out.close();
-    }
+    JobSplitWriter.createSplitFiles(jobSubmitDir, job, splits);
     return splits.length;
   }
-
-  private static class NewSplitComparator 
-    implements Comparator<org.apache.hadoop.mapreduce.InputSplit>{
-
+  
+  private static class SplitComparator implements Comparator<InputSplit> {
     @Override
-    public int compare(org.apache.hadoop.mapreduce.InputSplit o1,
-                       org.apache.hadoop.mapreduce.InputSplit o2) {
+    public int compare(InputSplit o1, InputSplit o2) {
       try {
         long len1 = o1.getLength();
         long len2 = o2.getLength();
@@ -868,54 +896,11 @@ public class JobClient extends Configure
       } catch (IOException ie) {
         throw new RuntimeException("exception in compare", ie);
       } catch (InterruptedException ie) {
-        throw new RuntimeException("exception in compare", ie);        
-      }
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private <T extends org.apache.hadoop.mapreduce.InputSplit> 
-  int writeNewSplits(JobContext job, Path submitSplitFile
-                     ) throws IOException, InterruptedException, 
-                              ClassNotFoundException {
-    JobConf conf = job.getJobConf();
-    org.apache.hadoop.mapreduce.InputFormat<?,?> input =
-      ReflectionUtils.newInstance(job.getInputFormatClass(), job.getJobConf());
-    
-    List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(job);
-    T[] array = (T[])
-      splits.toArray(new org.apache.hadoop.mapreduce.InputSplit[splits.size()]);
-
-    // sort the splits into order based on size, so that the biggest
-    // go first
-    Arrays.sort(array, new NewSplitComparator());
-    DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile, 
-                                                 array.length);
-    try {
-      if (array.length != 0) {
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        RawSplit rawSplit = new RawSplit();
-        SerializationFactory factory = new SerializationFactory(conf);
-        Serializer<T> serializer = 
-          factory.getSerializer((Class<T>) array[0].getClass());
-        serializer.open(buffer);
-        for(T split: array) {
-          rawSplit.setClassName(split.getClass().getName());
-          buffer.reset();
-          serializer.serialize(split);
-          rawSplit.setDataLength(split.getLength());
-          rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-          rawSplit.setLocations(split.getLocations());
-          rawSplit.write(out);
-        }
-        serializer.close();
+        throw new RuntimeException("exception in compare", ie);
       }
-    } finally {
-      out.close();
     }
-    return array.length;
   }
-
+  
   /** 
    * Checks if the job directory is clean and has all the required components 
    * for (re) starting the job
@@ -939,125 +924,6 @@ public class JobClient extends Configure
     }
     return false;
   }
-
-  static class RawSplit implements Writable {
-    private String splitClass;
-    private BytesWritable bytes = new BytesWritable();
-    private String[] locations;
-    long dataLength;
-
-    public void setBytes(byte[] data, int offset, int length) {
-      bytes.set(data, offset, length);
-    }
-
-    public void setClassName(String className) {
-      splitClass = className;
-    }
-      
-    public String getClassName() {
-      return splitClass;
-    }
-      
-    public BytesWritable getBytes() {
-      return bytes;
-    }
-
-    public void clearBytes() {
-      bytes = null;
-    }
-      
-    public void setLocations(String[] locations) {
-      this.locations = locations;
-    }
-      
-    public String[] getLocations() {
-      return locations;
-    }
-      
-    public void readFields(DataInput in) throws IOException {
-      splitClass = Text.readString(in);
-      dataLength = in.readLong();
-      bytes.readFields(in);
-      int len = WritableUtils.readVInt(in);
-      locations = new String[len];
-      for(int i=0; i < len; ++i) {
-        locations[i] = Text.readString(in);
-      }
-    }
-      
-    public void write(DataOutput out) throws IOException {
-      Text.writeString(out, splitClass);
-      out.writeLong(dataLength);
-      bytes.write(out);
-      WritableUtils.writeVInt(out, locations.length);
-      for(int i = 0; i < locations.length; i++) {
-        Text.writeString(out, locations[i]);
-      }        
-    }
-
-    public long getDataLength() {
-      return dataLength;
-    }
-    public void setDataLength(long l) {
-      dataLength = l;
-    }
-    
-  }
-    
-  private static final int CURRENT_SPLIT_FILE_VERSION = 0;
-  private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
-
-  private DataOutputStream writeSplitsFileHeader(Configuration conf,
-                                                 Path filename,
-                                                 int length
-                                                 ) throws IOException {
-    // write the splits to a file for the job tracker
-    FileSystem fs = filename.getFileSystem(conf);
-    FSDataOutputStream out = 
-      FileSystem.create(fs, filename, new FsPermission(JOB_FILE_PERMISSION));
-    out.write(SPLIT_FILE_HEADER);
-    WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
-    WritableUtils.writeVInt(out, length);
-    return out;
-  }
-
-  /** Create the list of input splits and write them out in a file for
-   *the JobTracker. The format is:
-   * <format version>
-   * <numSplits>
-   * for each split:
-   *    <RawSplit>
-   * @param splits the input splits to write out
-   * @param out the stream to write to
-   */
-  private void writeOldSplitsFile(InputSplit[] splits, 
-                                  FSDataOutputStream out) throws IOException {
-  }
-
-  /**
-   * Read a splits file into a list of raw splits
-   * @param in the stream to read from
-   * @return the complete list of splits
-   * @throws IOException
-   */
-  static RawSplit[] readSplitFile(DataInput in) throws IOException {
-    byte[] header = new byte[SPLIT_FILE_HEADER.length];
-    in.readFully(header);
-    if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
-      throw new IOException("Invalid header on split file");
-    }
-    int vers = WritableUtils.readVInt(in);
-    if (vers != CURRENT_SPLIT_FILE_VERSION) {
-      throw new IOException("Unsupported split version " + vers);
-    }
-    int len = WritableUtils.readVInt(in);
-    RawSplit[] result = new RawSplit[len];
-    for(int i=0; i < len; ++i) {
-      result[i] = new RawSplit();
-      result[i].readFields(in);
-    }
-    return result;
-  }
     
   /**
    * Get an {@link RunningJob} object to track an ongoing job.  Returns
@@ -1205,7 +1071,19 @@ public class JobClient extends Configure
   public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
     return jobSubmitClient.getClusterStatus(detailed);
   }
-    
+  
+  /**
+   * Grab the jobtracker's view of the staging directory path where 
+   * job-specific files will  be placed.
+   * 
+   * @return the staging directory where job-specific files are to be placed.
+   */
+  public Path getStagingAreaDir() throws IOException {
+    if (stagingAreaDir == null) {
+      stagingAreaDir = new Path(jobSubmitClient.getStagingAreaDir());
+    }
+    return stagingAreaDir;
+  }    
 
   /** 
    * Get the jobs that are not completed and not failed.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:41:31 2011
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -49,11 +48,15 @@ import org.apache.hadoop.metrics.Metrics
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.split.*;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -79,7 +82,7 @@ class JobInProgress {
     
   JobProfile profile;
   JobStatus status;
-  Path jobFile = null;
+  String jobFile = null;
   Path localJobFile = null;
 
   TaskInProgress maps[] = new TaskInProgress[0];
@@ -260,6 +263,7 @@ class JobInProgress {
     new HashMap<TaskTracker, FallowSlotInfo>();
   private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces = 
     new HashMap<TaskTracker, FallowSlotInfo>();
+  private Path jobSubmitDir = null;
   
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
@@ -286,43 +290,48 @@ class JobInProgress {
   
   public JobInProgress(JobID jobid, JobTracker jobtracker, 
                        JobConf default_conf, int rCount) throws IOException {
-    this(jobid, jobtracker, default_conf, null, rCount);
+    this(jobtracker, default_conf, null, rCount);
   }
 
-  JobInProgress(JobID jobid, JobTracker jobtracker,
-                JobConf default_conf, String user, int rCount) 
+  JobInProgress(JobTracker jobtracker,
+                JobConf default_conf, JobInfo jobInfo, int rCount) 
   throws IOException {
     this.restartCount = rCount;
-    this.jobId = jobid;
+    this.jobId = JobID.downgrade(jobInfo.getJobID());
     String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
-        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
+        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobId;
     this.jobtracker = jobtracker;
-    this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
-    this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
+    this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
+    this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
     this.startTime = System.currentTimeMillis();
     status.setStartTime(startTime);
     this.localFs = jobtracker.getLocalFileSystem();
 
-    JobConf default_job_conf = new JobConf(default_conf);
-    this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR 
-                                                      +"/"+jobid + ".xml");
-
-    if (user == null) {
-      this.user = conf.getUser();
-    } else {
-      this.user = user;
-    }
-    LOG.info("User : " +  this.user);
-
-    Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
-    fs = jobtracker.getFileSystem(jobDir);
-    jobFile = new Path(jobDir, "job.xml");
-    fs.copyToLocalFile(jobFile, localJobFile);
+    // use the user supplied token to add user credentials to the conf
+    jobSubmitDir = jobInfo.getJobSubmitDir();
+    String user = jobInfo.getUser().toString();
+    conf = new JobConf();
+    conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME,
+        new UnixUserGroupInformation(user,
+            new String[]{UnixUserGroupInformation.DEFAULT_GROUP}).toString());
+    fs = jobSubmitDir.getFileSystem(conf);
+    this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR
+        +"/"+jobId + ".xml");
+    Path jobFilePath = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
+    jobFile = jobFilePath.toString();
+    fs.copyToLocalFile(jobFilePath, localJobFile);
     conf = new JobConf(localJobFile);
+    if (conf.getUser() == null) {
+      this.conf.setUser(user);
+    }
+    if (!conf.getUser().equals(user)) {
+      throw new IOException("The username obtained from the conf doesn't " +
+            "match the username the user authenticated as");
+    }
     this.priority = conf.getJobPriority();
     this.status.setJobPriority(this.priority);
-    this.profile = new JobProfile(user, jobid, 
-                                  jobFile.toString(), url, conf.getJobName(),
+    this.profile = new JobProfile(user, jobId, 
+                                  jobFile, url, conf.getJobName(),
                                   conf.getQueueName());
 
     this.numMapTasks = conf.getNumMapTasks();
@@ -338,7 +347,7 @@ class JobInProgress {
     this.jobMetrics.setTag("user", conf.getUser());
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
-    this.jobMetrics.setTag("jobId", jobid.toString());
+    this.jobMetrics.setTag("jobId", jobId.toString());
     hasSpeculativeMaps = conf.getMapSpeculativeExecution();
     hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
     this.maxLevel = jobtracker.getNumTaskCacheLevels();
@@ -392,7 +401,7 @@ class JobInProgress {
   }
   
   private Map<Node, List<TaskInProgress>> createCache(
-                         JobClient.RawSplit[] splits, int maxLevel) {
+                                 TaskSplitMetaInfo[] splits, int maxLevel) {
     Map<Node, List<TaskInProgress>> cache = 
       new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
     
@@ -495,7 +504,7 @@ class JobInProgress {
     LOG.info("Initializing " + jobId);
 
     // log job info
-    JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
+    JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile, 
                                     this.startTime, hasRestarted());
     // log the job priority
     setPriority(this.priority);
@@ -503,21 +512,12 @@ class JobInProgress {
     //
     // generate security keys needed by Tasks
     //
-    generateJobToken(fs);
+    generateJobToken(jobtracker.getFileSystem());
     
     //
     // read input splits and create a map per a split
     //
-    String jobFile = profile.getJobFile();
-
-    DataInputStream splitFile =
-      fs.open(new Path(conf.get("mapred.job.split.file")));
-    JobClient.RawSplit[] splits;
-    try {
-      splits = JobClient.readSplitFile(splitFile);
-    } finally {
-      splitFile.close();
-    }
+    TaskSplitMetaInfo[] splits = createSplits(jobId);
     numMapTasks = splits.length;
 
 
@@ -535,7 +535,7 @@ class JobInProgress {
 
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
-      inputLength += splits[i].getDataLength();
+      inputLength += splits[i].getInputDataLength();
       maps[i] = new TaskInProgress(jobId, jobFile, 
                                    splits[i], 
                                    jobtracker, conf, this, i, numSlotsPerMap);
@@ -573,7 +573,7 @@ class JobInProgress {
 
     // cleanup map tip. This map doesn't use any splits. Just assign an empty
     // split.
-    JobClient.RawSplit emptySplit = new JobClient.RawSplit();
+    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
     cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
             jobtracker, conf, this, numMapTasks, 1);
     cleanup[0].setJobCleanupTask();
@@ -608,6 +608,13 @@ class JobInProgress {
     JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
                                  numMapTasks, numReduceTasks);
   }
+  
+  TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
+  throws IOException {
+    TaskSplitMetaInfo[] allTaskSplitMetaInfo =
+      SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf, jobSubmitDir);
+    return allTaskSplitMetaInfo;
+  }
 
   /////////////////////////////////////////////////////
   // Accessors for the JobInProgress
@@ -773,14 +780,6 @@ class JobInProgress {
   }
     
   /**
-   * Get the job user/owner
-   * @return the job's user/owner
-   */ 
-  String getUser() {
-    return user;
-  }
-
-  /**
    * Return a vector of completed TaskInProgress objects
    */
   public synchronized Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
@@ -2845,15 +2844,6 @@ class JobInProgress {
         localJobFile = null;
       }
 
-      // clean up splits
-      for (int i = 0; i < maps.length; i++) {
-        maps[i].clearSplit();
-      }
-
-      // JobClient always creates a new directory with job files
-      // so we remove that directory to cleanup
-      // Delete temp dfs dirs created if any, like in case of 
-      // speculative exn of reduces.  
       Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
       new CleanupQueue().addToQueue(jobtracker.getFileSystem(tempDir), tempDir); 
     } catch (IOException e) {

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInfo.java?rev=1077108&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInfo.java Fri Mar  4 03:41:31 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Represents the basic information that is saved per a job when the 
+ * JobTracker receives a submitJob request. The information is saved
+ * so that the JobTracker can recover incomplete jobs upon restart.
+ */
+class JobInfo implements Writable {
+  private org.apache.hadoop.mapreduce.JobID id;
+  private Text user;
+  private Path jobSubmitDir;
+  public JobInfo() {}
+  
+  public JobInfo(org.apache.hadoop.mapreduce.JobID id, 
+      Text user,
+      Path jobSubmitDir) {
+    this.id = id;
+    this.user = user;
+    this.jobSubmitDir = jobSubmitDir;
+  }
+  
+  /**
+   * Get the job id.
+   */
+  public org.apache.hadoop.mapreduce.JobID getJobID() {
+    return id;
+  }
+  
+  /**
+   * Get the configured job's user-name.
+   */
+  public Text getUser() {
+    return user;
+  }
+      
+  /**
+   * Get the job submission directory
+   */
+  public Path getJobSubmitDir() {
+    return this.jobSubmitDir;
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    id = new org.apache.hadoop.mapreduce.JobID();
+    id.readFields(in);
+    user = new Text();
+    user.readFields(in);
+    jobSubmitDir = new Path(WritableUtils.readString(in));
+  }
+
+  public void write(DataOutput out) throws IOException {
+    id.write(out);
+    user.write(out);
+    WritableUtils.writeString(out, jobSubmitDir.toString());
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Mar  4 03:41:31 2011
@@ -60,8 +60,11 @@ interface JobSubmissionProtocol extends 
    *             interval for HADOOP-4939                     
    * Version 21: Added method getQueueAclsForCurrentUser to get queue acls info
    *             for a user
+   * Version 22: Job submission files are uploaded to a staging area under
+   *             user home dir. JobTracker reads the required files from the
+   *             staging area using user credentials passed via the rpc. 
    */
-  public static final long versionID = 21L;
+  public static final long versionID = 22L;
 
   /**
    * Allocate a name for the job.
@@ -73,9 +76,10 @@ interface JobSubmissionProtocol extends 
   /**
    * Submit a Job for execution.  Returns the latest profile for
    * that job.
-   * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
+   * The job files should be submitted in <b>jobSubmitDir</b>.
    */
-  public JobStatus submitJob(JobID jobName) throws IOException;
+  public JobStatus submitJob(JobID jobName, String jobSubmitDir) 
+  throws IOException;
 
   /**
    * Get the current status of the cluster
@@ -191,6 +195,14 @@ interface JobSubmissionProtocol extends 
   public String getSystemDir();  
   
   /**
+   * Get a hint from the JobTracker 
+   * where job-specific files are to be placed.
+   * 
+   * @return the directory where job-specific files are to be placed.
+   */
+  public String getStagingAreaDir() throws IOException;
+  
+  /**
    * Gets set of Job Queues associated with the Job Tracker
    * 
    * @return Array of the Job Queue Information Object

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:41:31 2011
@@ -22,6 +22,7 @@ import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -68,6 +69,7 @@ import org.apache.hadoop.fs.LocalDirAllo
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
@@ -155,7 +157,7 @@ public class JobTracker implements MRCon
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
-
+  static final String JOB_INFO_FILE = "job-info";
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
@@ -166,9 +168,9 @@ public class JobTracker implements MRCon
 
   private static final LocalDirAllocator lDirAlloc = 
                               new LocalDirAllocator("mapred.local.dir");
-  // system directories are world-wide readable and owner readable
+  //system directory is completely owned by the JobTracker
   final static FsPermission SYSTEM_DIR_PERMISSION =
-    FsPermission.createImmutable((short) 0733); // rwx-wx-wx
+    FsPermission.createImmutable((short) 0700); // rwx------
 
   // system files should have 700 permission
   final static FsPermission SYSTEM_FILE_PERMISSION =
@@ -1608,7 +1610,10 @@ public class JobTracker implements MRCon
           }
 
           // Create the job
-          job = new JobInProgress(id, JobTracker.this, conf, user, 
+          /* THIS PART OF THE CODE IS USELESS. JOB RECOVERY SHOULD BE
+           * BACKPORTED (MAPREDUCE-873)
+           */
+          job = new JobInProgress(JobTracker.this, conf, null, 
                                   restartCount);
 
           // 2. Check if the user has appropriate access
@@ -1834,10 +1839,6 @@ public class JobTracker implements MRCon
   Map<String, Node> hostnameToNodeMap = 
     Collections.synchronizedMap(new TreeMap<String, Node>());
   
-  // job-id->username during staging
-  Map<JobID, String> jobToUserMap = 
-    Collections.synchronizedMap(new TreeMap<JobID, String>()); 
-
   // Number of resolved entries
   int numResolved;
     
@@ -2075,6 +2076,18 @@ public class JobTracker implements MRCon
         if(systemDir == null) {
           systemDir = new Path(getSystemDir());    
         }
+        try {
+          FileStatus systemDirStatus = fs.getFileStatus(systemDir);
+          if (!systemDirStatus.getOwner().equals(mrOwner.getUserName())) {
+            throw new AccessControlException("The systemdir " + systemDir +
+                " is not owned by " + mrOwner.getUserName());
+          }
+          if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
+            LOG.warn("Incorrect permissions on " + systemDir +
+                ". Setting it to " + SYSTEM_DIR_PERMISSION);
+            fs.setPermission(systemDir, SYSTEM_DIR_PERMISSION);
+          }
+        } catch (FileNotFoundException fnf) {} //ignore
         // Make sure that the backup data is preserved
         FileStatus[] systemDirData = fs.listStatus(this.systemDir);
         // Check if the history is enabled .. as we cant have persistence with 
@@ -2542,17 +2555,6 @@ public class JobTracker implements MRCon
     // mark the job for cleanup at all the trackers
     addJobForCleanup(id);
 
-    try {
-      File userFileForJob =
-        new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id,
-                                              conf).toString());
-      if (userFileForJob != null) {
-        userFileForJob.delete();
-      }
-    } catch (IOException ioe) {
-      LOG.info("Failed to delete job id mapping for job " + id, ioe);
-    }
-
     // add the blacklisted trackers to potentially faulty list
     if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
       if (job.getNoOfBlackListedTrackers() > 0) {
@@ -3494,17 +3496,7 @@ public class JobTracker implements MRCon
    * Allocates a new JobId string.
    */
   public synchronized JobID getNewJobId() throws IOException {
-    JobID id = new JobID(getTrackerIdentifier(), nextJobId++);
-
-    // get the user group info
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
-
-    // mark the user for this id
-    jobToUserMap.put(id, ugi.getUserName());
-
-    LOG.info("Job id " + id + " assigned to user " + ugi.getUserName());
-
-    return id;
+    return new JobID(getTrackerIdentifier(), nextJobId++);
   }
 
   /**
@@ -3515,64 +3507,25 @@ public class JobTracker implements MRCon
    * of the JobTracker.  But JobInProgress adds info that's useful for
    * the JobTracker alone.
    */
-  public synchronized JobStatus submitJob(JobID jobId) throws IOException {
+  public synchronized JobStatus submitJob(JobID jobId, String jobSubmitDir)
+  throws IOException {
     if(jobs.containsKey(jobId)) {
       //job already running, don't start twice
       return jobs.get(jobId).getStatus();
     }
-
-    // check if the owner is uploding the splits or not
-    // get the user group info
     UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
-
-    // check if the user invoking this api is the owner of this job
-    if (!jobToUserMap.get(jobId).equals(ugi.getUserName())) {
-      throw new IOException("User " + ugi.getUserName() 
-                            + " is not the owner of the job " + jobId);
-    }
-    
-    jobToUserMap.remove(jobId);
-
-    // persist
-    File userFileForJob =  
-      new File(lDirAlloc.getLocalPathForWrite(SUBDIR + "/" + jobId, 
-                                              conf).toString());
-    if (userFileForJob == null) {
-      LOG.info("Failed to create job-id file for job " + jobId + " at " + userFileForJob);
-    } else {
-      FileOutputStream fout = new FileOutputStream(userFileForJob);
-      BufferedWriter writer = null;
-
-      try {
-        writer = new BufferedWriter(new OutputStreamWriter(fout));
-        writer.write(ugi.getUserName() + "\n");
-      } finally {
-        if (writer != null) {
-          writer.close();
-        }
-        fout.close();
-      }
-
-      LOG.info("Job " + jobId + " user info persisted to file : " + userFileForJob);
-    }
-
+    JobInfo jobInfo = new JobInfo(jobId, new Text(ugi.getUserName()),
+        new Path(jobSubmitDir));
     JobInProgress job = null;
     try {
-      job = new JobInProgress(jobId, this, this.conf, ugi.getUserName(), 0);
+      job = new JobInProgress(this, this.conf, jobInfo, 0);
     } catch (Exception e) {
-      if (userFileForJob != null) {
-        userFileForJob.delete();
-      }
       throw new IOException(e);
     }
     
     String queue = job.getProfile().getQueueName();
     if(!(queueManager.getQueues().contains(queue))) {      
-      new CleanupQueue().addToQueue(fs,getSystemDirectoryForJob(jobId));
       job.fail();
-      if (userFileForJob != null) {
-        userFileForJob.delete();
-      }
       throw new IOException("Queue \"" + queue + "\" does not exist");        
     }
 
@@ -3583,10 +3536,6 @@ public class JobTracker implements MRCon
        LOG.warn("Access denied for user " + job.getJobConf().getUser() 
                 + ". Ignoring job " + jobId, ioe);
       job.fail();
-      if (userFileForJob != null) {
-        userFileForJob.delete();
-      }
-      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw ioe;
     }
 
@@ -3595,11 +3544,35 @@ public class JobTracker implements MRCon
     try {
       checkMemoryRequirements(job);
     } catch (IOException ioe) {
-      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw ioe;
     }
+    boolean recovered = true; //TODO: Once the Job recovery code is there,
+                              //(MAPREDUCE-873) we
+                              //must pass the "recovered" flag accurately.
+                              //This is handled in the trunk/0.22
+    if (!recovered) {
+      //Store the information in a file so that the job can be recovered
+      //later (if at all)
+      Path jobDir = getSystemDirectoryForJob(jobId);
+      FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
+      FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
+      jobInfo.write(out);
+      out.close();
+    }
+    return addJob(jobId, job);
+  }
 
-   return addJob(jobId, job); 
+  /**
+   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
+   */
+  public String getStagingAreaDir() throws IOException {
+    Path stagingRootDir =
+      new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
+          "/tmp/hadoop/mapred/staging"));
+    FileSystem fs = stagingRootDir.getFileSystem(conf);
+    String user = UserGroupInformation.getCurrentUGI().getUserName();
+    return fs.makeQualified(new Path(stagingRootDir,
+                                user+"/.staging")).toString();
   }
 
   /**
@@ -4090,6 +4063,11 @@ public class JobTracker implements MRCon
   Path getSystemDirectoryForJob(JobID id) {
     return new Path(getSystemDir(), id.toString());
   }
+  
+  //Get the job token file in system directory
+  Path getSystemFileForJob(JobID id) {
+    return new Path(getSystemDirectoryForJob(id)+"/" + JOB_INFO_FILE);
+  }
 
   /**
    * Change the run-time priority of the given job.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar  4 03:41:31 2011
@@ -21,20 +21,17 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.JobTrackerMetricsInst;
 import org.apache.hadoop.mapred.JvmTask;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
 class LocalJobRunner implements JobSubmissionProtocol {
@@ -46,6 +43,7 @@ class LocalJobRunner implements JobSubmi
   private JobConf conf;
   private int map_tasks = 0;
   private int reduce_tasks = 0;
+  final Random rand = new Random();
 
   private JobTrackerInstrumentation myMetrics = null;
 
@@ -60,6 +58,7 @@ class LocalJobRunner implements JobSubmi
     private Path file;
     private JobID id;
     private JobConf job;
+    private Path systemJobDir;
 
     private JobStatus status;
     private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
@@ -80,8 +79,9 @@ class LocalJobRunner implements JobSubmi
       return TaskUmbilicalProtocol.versionID;
     }
     
-    public Job(JobID jobid, JobConf conf) throws IOException {
-      this.file = new Path(getSystemDir(), jobid + "/job.xml");
+    public Job(JobID jobid, String jobSubmitDir) throws IOException {
+      this.systemJobDir = new Path(jobSubmitDir);
+      this.file = new Path(systemJobDir, "job.xml");
       this.id = jobid;
       this.mapoutputFile = new MapOutputFile(jobid);
       this.mapoutputFile.setConf(conf);
@@ -110,46 +110,8 @@ class LocalJobRunner implements JobSubmi
       JobContext jContext = new JobContext(conf, jobId);
       OutputCommitter outputCommitter = job.getOutputCommitter();
       try {
-        // split input into minimum number of splits
-        RawSplit[] rawSplits;
-        if (job.getUseNewMapper()) {
-          org.apache.hadoop.mapreduce.InputFormat<?,?> input =
-              ReflectionUtils.newInstance(jContext.getInputFormatClass(), jContext.getJobConf());
-                    
-          List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(jContext);
-          rawSplits = new RawSplit[splits.size()];
-          DataOutputBuffer buffer = new DataOutputBuffer();
-          SerializationFactory factory = new SerializationFactory(conf);
-          Serializer serializer = 
-            factory.getSerializer(splits.get(0).getClass());
-          serializer.open(buffer);
-          for (int i = 0; i < splits.size(); i++) {
-            buffer.reset();
-            serializer.serialize(splits.get(i));
-            RawSplit rawSplit = new RawSplit();
-            rawSplit.setClassName(splits.get(i).getClass().getName());
-            rawSplit.setDataLength(splits.get(i).getLength());
-            rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-            rawSplit.setLocations(splits.get(i).getLocations());
-            rawSplits[i] = rawSplit;
-          }
-
-        } else {
-          InputSplit[] splits = job.getInputFormat().getSplits(job, 1);
-          rawSplits = new RawSplit[splits.length];
-          DataOutputBuffer buffer = new DataOutputBuffer();
-          for (int i = 0; i < splits.length; i++) {
-            buffer.reset();
-            splits[i].write(buffer);
-            RawSplit rawSplit = new RawSplit();
-            rawSplit.setClassName(splits[i].getClass().getName());
-            rawSplit.setDataLength(splits[i].getLength());
-            rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-            rawSplit.setLocations(splits[i].getLocations());
-            rawSplits[i] = rawSplit;
-          }
-        }
-        
+        TaskSplitMetaInfo[] taskSplitMetaInfos =
+          SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);        
         int numReduceTasks = job.getNumReduceTasks();
         if (numReduceTasks > 1 || numReduceTasks < 0) {
           // we only allow 0 or 1 reducer in local mode
@@ -159,15 +121,13 @@ class LocalJobRunner implements JobSubmi
         outputCommitter.setupJob(jContext);
         status.setSetupProgress(1.0f);
         
-        for (int i = 0; i < rawSplits.length; i++) {
+        for (int i = 0; i < taskSplitMetaInfos.length; i++) {
           if (!this.isInterrupted()) {
             TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i),0);  
             mapIds.add(mapId);
             MapTask map = new MapTask(file.toString(),  
                                       mapId, i,
-                                      rawSplits[i].getClassName(),
-                                      rawSplits[i].getBytes(), 1, 
-                                      job.getUser());
+                                      taskSplitMetaInfos[i].getSplitIndex(), 1);
             JobConf localConf = new JobConf(job);
             map.setJobFile(localFile.toString());
             map.localizeConfiguration(localConf);
@@ -207,7 +167,7 @@ class LocalJobRunner implements JobSubmi
             if (!this.isInterrupted()) {
               ReduceTask reduce = new ReduceTask(file.toString(), 
                                                  reduceId, 0, mapIds.size(), 
-                                                 1, job.getUser());
+                                                 1);
               JobConf localConf = new JobConf(job);
               reduce.setJobFile(localFile.toString());
               reduce.localizeConfiguration(localConf);
@@ -358,7 +318,7 @@ class LocalJobRunner implements JobSubmi
   }
 
   public LocalJobRunner(JobConf conf) throws IOException {
-    this.fs = FileSystem.get(conf);
+    this.fs = FileSystem.getLocal(conf);
     this.conf = conf;
     myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
   }
@@ -370,8 +330,9 @@ class LocalJobRunner implements JobSubmi
     return new JobID("local", ++jobid);
   }
 
-  public JobStatus submitJob(JobID jobid) throws IOException {
-    return new Job(jobid, this.conf).status;
+  public JobStatus submitJob(JobID jobid, String jobSubmitDir) 
+  throws IOException {
+    return new Job(jobid, jobSubmitDir).status;
   }
 
   public void killJob(JobID id) {
@@ -459,6 +420,24 @@ class LocalJobRunner implements JobSubmi
     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
     return fs.makeQualified(sysDir).toString();
   }
+  
+  /**
+   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
+   */
+  public String getStagingAreaDir() throws IOException {
+    Path stagingRootDir = 
+      new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
+        "/tmp/hadoop/mapred/staging"));
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+    String user;
+    if (ugi != null) {
+      user = ugi.getUserName() + rand.nextInt();
+    } else {
+      user = "dummy" + rand.nextInt();
+    }
+    return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
+  }
+
 
   @Override
   public JobStatus[] getJobsFromQueue(String queue) throws IOException {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Mar  4 03:41:31 2011
@@ -40,6 +40,7 @@ import java.util.concurrent.locks.Reentr
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -58,6 +59,10 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
@@ -74,7 +79,7 @@ class MapTask extends Task {
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
   
 
-  private BytesWritable split = new BytesWritable();
+  private TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
   private String splitClass;
   private final static int APPROX_HEADER_LENGTH = 150;
 
@@ -89,11 +94,10 @@ class MapTask extends Task {
   }
 
   public MapTask(String jobFile, TaskAttemptID taskId, 
-                 int partition, String splitClass, BytesWritable split,
-                 int numSlotsRequired, String username) {
-    super(jobFile, taskId, partition, numSlotsRequired, username);
-    this.splitClass = splitClass;
-    this.split = split;
+                 int partition, TaskSplitIndex splitIndex,
+                 int numSlotsRequired) {
+    super(jobFile, taskId, partition, numSlotsRequired);
+    this.splitMetaInfo = splitIndex;
   }
 
   @Override
@@ -104,13 +108,13 @@ class MapTask extends Task {
   @Override
   public void localizeConfiguration(JobConf conf) throws IOException {
     super.localizeConfiguration(conf);
-    if (isMapOrReduce()) {
-      Path localSplit = new Path(new Path(getJobFile()).getParent(), 
-                                 "split.dta");
-      LOG.debug("Writing local split to " + localSplit);
-      DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
-      Text.writeString(out, splitClass);
-      split.write(out);
+    if (supportIsolationRunner(conf) && isMapOrReduce()) {
+      // localize the split meta-information
+      Path localSplitMeta = new Path(new Path(getJobFile()).getParent(), 
+                                 "split.info");
+      LOG.debug("Writing local split to " + localSplitMeta);
+      DataOutputStream out = FileSystem.getLocal(conf).create(localSplitMeta);
+      splitMetaInfo.write(out);
       out.close();
     }
   }
@@ -125,9 +129,8 @@ class MapTask extends Task {
   public void write(DataOutput out) throws IOException {
     super.write(out);
     if (isMapOrReduce()) {
-      Text.writeString(out, splitClass);
-      split.write(out);
-      split = null;
+      splitMetaInfo.write(out);
+      splitMetaInfo = null;
     }
   }
   
@@ -135,8 +138,7 @@ class MapTask extends Task {
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     if (isMapOrReduce()) {
-      splitClass = Text.readString(in);
-      split.readFields(in);
+      splitMetaInfo.readFields(in);
     }
   }
 
@@ -302,36 +304,51 @@ class MapTask extends Task {
     }
 
     if (useNewApi) {
-      runNewMapper(job, split, umbilical, reporter);
+      runNewMapper(job, splitMetaInfo, umbilical, reporter);
     } else {
-      runOldMapper(job, split, umbilical, reporter);
+      runOldMapper(job, splitMetaInfo, umbilical, reporter);
     }
     done(umbilical, reporter);
   }
-
+  @SuppressWarnings("unchecked")
+  private <T> T getSplitDetails(Path file, long offset)
+   throws IOException {
+    FileSystem fs = file.getFileSystem(conf);
+    FSDataInputStream inFile = fs.open(file);
+    inFile.seek(offset);
+    String className = Text.readString(inFile);
+    Class<T> cls;
+    try {
+      cls = (Class<T>) conf.getClassByName(className);
+    } catch (ClassNotFoundException ce) {
+      IOException wrap = new IOException("Split class " + className +
+                                          " not found");
+      wrap.initCause(ce);
+      throw wrap;
+    }
+    SerializationFactory factory = new SerializationFactory(conf);
+    Deserializer<T> deserializer =
+      (Deserializer<T>) factory.getDeserializer(cls);
+    deserializer.open(inFile);
+    T split = deserializer.deserialize(null);
+    long pos = inFile.getPos();
+    getCounters().findCounter(
+         Task.Counter.SPLIT_RAW_BYTES).increment(pos - offset);
+    inFile.close();
+    return split;
+  }
+  
   @SuppressWarnings("unchecked")
   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
   void runOldMapper(final JobConf job,
-                    final BytesWritable rawSplit,
+                    final TaskSplitIndex splitIndex,
                     final TaskUmbilicalProtocol umbilical,
                     TaskReporter reporter
                     ) throws IOException, InterruptedException,
                              ClassNotFoundException {
-    InputSplit inputSplit = null;
-    // reinstantiate the split
-    try {
-      inputSplit = (InputSplit) 
-        ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
-    } catch (ClassNotFoundException exp) {
-      IOException wrap = new IOException("Split class " + splitClass + 
-                                         " not found");
-      wrap.initCause(exp);
-      throw wrap;
-    }
-    DataInputBuffer splitBuffer = new DataInputBuffer();
-    splitBuffer.reset(split.getBytes(), 0, split.getLength());
-    inputSplit.readFields(splitBuffer);
-    
+    InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
+           splitIndex.getStartOffset());
+
     updateJobWithSplit(job, inputSplit);
     reporter.setInputSplit(inputSplit);
 
@@ -557,7 +574,7 @@ class MapTask extends Task {
   @SuppressWarnings("unchecked")
   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
   void runNewMapper(final JobConf job,
-                    final BytesWritable rawSplit,
+                    final TaskSplitIndex splitIndex,
                     final TaskUmbilicalProtocol umbilical,
                     TaskReporter reporter
                     ) throws IOException, ClassNotFoundException,
@@ -575,15 +592,8 @@ class MapTask extends Task {
         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
     // rebuild the input split
     org.apache.hadoop.mapreduce.InputSplit split = null;
-    DataInputBuffer splitBuffer = new DataInputBuffer();
-    splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
-    SerializationFactory factory = new SerializationFactory(job);
-    Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>
-      deserializer = 
-        (Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>) 
-        factory.getDeserializer(job.getClassByName(splitClass));
-    deserializer.open(splitBuffer);
-    split = deserializer.deserialize(null);
+    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
+        splitIndex.getStartOffset());
 
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
       new NewTrackingRecordReader<INKEY,INVALUE>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar  4 03:41:31 2011
@@ -154,9 +154,8 @@ class ReduceTask extends Task {
   }
 
   public ReduceTask(String jobFile, TaskAttemptID taskId,
-                    int partition, int numMaps, int numSlotsRequired,
-                    String username) {
-    super(jobFile, taskId, partition, numSlotsRequired, username);
+                    int partition, int numMaps, int numSlotsRequired) {
+    super(jobFile, taskId, partition, numSlotsRequired);
     this.numMaps = numMaps;
   }
   

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar  4 03:41:31 2011
@@ -80,7 +80,8 @@ abstract public class Task implements Wr
     REDUCE_OUTPUT_RECORDS,
     REDUCE_SKIPPED_GROUPS,
     REDUCE_SKIPPED_RECORDS,
-    SPILLED_RECORDS
+    SPILLED_RECORDS,
+    SPLIT_RAW_BYTES
   }
   
   /**
@@ -119,6 +120,7 @@ abstract public class Task implements Wr
   ////////////////////////////////////////////
 
   private String jobFile;                         // job configuration file
+  private String user;
   private TaskAttemptID taskId;                   // unique, includes job id
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
@@ -145,7 +147,6 @@ abstract public class Task implements Wr
   protected TaskAttemptContext taskContext;
   protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
   protected org.apache.hadoop.mapreduce.OutputCommitter committer;
-  protected String username;
   protected final Counters.Counter spilledRecordsCounter;
   private String pidFile = "";
   protected TaskUmbilicalProtocol umbilical;
@@ -164,8 +165,7 @@ abstract public class Task implements Wr
   }
 
   public Task(String jobFile, TaskAttemptID taskId, int partition, 
-              int numSlotsRequired, String username) {
-    this.username = username;
+              int numSlotsRequired) {
     this.jobFile = jobFile;
     this.taskId = taskId;
      
@@ -366,9 +366,17 @@ abstract public class Task implements Wr
     return !jobSetup && !jobCleanup && !taskCleanup;
   }
   
+  /**
+   * Get the name of the user running the job/task. TaskTracker needs task's
+   * user name even before it's JobConf is localized. So we explicitly serialize
+   * the user name.
+   * 
+   * @return user
+   */
   String getUser() {
-    return username;
+    return user;
   }
+  
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -386,9 +394,9 @@ abstract public class Task implements Wr
       WritableUtils.writeEnum(out, jobRunStateForCleanup);
     }
     out.writeBoolean(jobSetup);
-    Text.writeString(out, username);
     out.writeBoolean(writeSkipRecs);
-    out.writeBoolean(taskCleanup);  
+    out.writeBoolean(taskCleanup); 
+    Text.writeString(out, user);
   }
   
   public void readFields(DataInput in) throws IOException {
@@ -408,12 +416,12 @@ abstract public class Task implements Wr
         WritableUtils.readEnum(in, JobStatus.State.class);
     }
     jobSetup = in.readBoolean();
-    username = Text.readString(in);
     writeSkipRecs = in.readBoolean();
     taskCleanup = in.readBoolean();
     if (taskCleanup) {
       setPhase(TaskStatus.Phase.CLEANUP);
     }
+    user = Text.readString(in);
   }
 
   @Override
@@ -895,9 +903,22 @@ abstract public class Task implements Wr
                             + JobStatus.State.FAILED + " or "
                             + JobStatus.State.KILLED);
     }
+    // delete the staging area for the job
+    JobConf conf = new JobConf(jobContext.getConfiguration());
+    if (!supportIsolationRunner(conf)) {
+      String jobTempDir = conf.get("mapreduce.job.dir");
+      Path jobTempDirPath = new Path(jobTempDir);
+      FileSystem fs = jobTempDirPath.getFileSystem(conf);
+      fs.delete(jobTempDirPath, true);
+    }
     done(umbilical, reporter);
   }
 
+  protected boolean supportIsolationRunner(JobConf conf) {
+    return (conf.getKeepTaskFilesPattern() != null || conf
+         .getKeepFailedTaskFiles());
+  }
+  
   protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
                              TaskReporter reporter
                              ) throws IOException, InterruptedException {
@@ -926,6 +947,7 @@ abstract public class Task implements Wr
         NetUtils.addStaticResolution(name, resolvedName);
       }
     }
+    this.user = this.conf.getUser();
   }
 
   public Configuration getConf() {



Mime
View raw message