Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 6473 invoked from network); 21 Dec 2009 17:37:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 21 Dec 2009 17:37:20 -0000 Received: (qmail 87900 invoked by uid 500); 21 Dec 2009 17:37:20 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 87883 invoked by uid 500); 21 Dec 2009 17:37:19 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 87873 invoked by uid 99); 21 Dec 2009 17:37:19 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Dec 2009 17:37:19 +0000 X-ASF-Spam-Status: No, hits=-2.2 required=5.0 tests=AWL,BAYES_00,FB_GET_MEDS X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Dec 2009 17:37:10 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D6C9F23889B9; Mon, 21 Dec 2009 17:36:49 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r892893 [1/3] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ src... Date: Mon, 21 Dec 2009 17:36:48 -0000 To: mapreduce-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091221173649.D6C9F23889B9@eris.apache.org> Author: ddas Date: Mon Dec 21 17:36:44 2009 New Revision: 892893 URL: http://svn.apache.org/viewvc?rev=892893&view=rev Log: MAPREDUCE-181. Changes the job submission process to be secure. Contributed by Devaraj Das. Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInfo.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplit.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java hadoop/mapreduce/trunk/src/java/mapred-default.xml hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java hadoop/mapreduce/trunk/src/test/mapred-site.xml hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCh.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Mon Dec 21 17:36:44 2009 @@ -84,6 +84,9 @@ can be refreshed in the JobTracker via command line. (Boris Shkolnik via ddas) + MAPREDUCE-181. Changes the job submission process to be secure. + (Devaraj Das) + OPTIMIZATIONS MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java (original) +++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java Mon Dec 21 17:36:44 2009 @@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; +import org.apache.hadoop.mapreduce.split.JobSplit; import org.apache.hadoop.security.SecurityUtil.AccessControlList; @@ -349,9 +350,9 @@ } } TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning); + JobSplit.TaskSplitMetaInfo split = JobSplit.EMPTY_TASK_SPLIT; Task task = new MapTask( - "", attemptId, 0, "", new BytesWritable(), - super.numSlotsPerMap) { + "", attemptId, 0, split.getSplitIndex(), super.numSlotsPerMap) { @Override public String toString() { return String.format("%s on %s", getTaskID(), tts.getTrackerName()); @@ -362,7 +363,7 @@ // create a fake TIP and keep track of it FakeTaskInProgress mapTip = new FakeTaskInProgress( getJobID(), - getJobConf(), task, true, this); + getJobConf(), task, true, this, split); mapTip.taskStatus.setRunState(TaskStatus.State.RUNNING); if (areAllMapsRunning) { speculativeMapTasks++; @@ -408,7 +409,7 @@ // create a fake TIP and keep track of it FakeTaskInProgress reduceTip = new FakeTaskInProgress( getJobID(), - getJobConf(), task, false, this); + getJobConf(), task, false, this, null); reduceTip.taskStatus.setRunState(TaskStatus.State.RUNNING); if (areAllReducesRunning) { speculativeReduceTasks++; @@ -499,8 +500,9 @@ FakeTaskInProgress( JobID jId, JobConf jobConf, Task t, - boolean isMap, FakeJobInProgress job) { - super(jId, "", new Job.RawSplit(), null, jobConf, job, 0, 1); + boolean isMap, FakeJobInProgress job, + JobSplit.TaskSplitMetaInfo split) { + super(jId, "", split, null, jobConf, job, 0, 1); this.isMap = isMap; this.fakeJob = job; activeTasks = new TreeMap(); Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original) +++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Mon Dec 21 17:36:44 2009 @@ -43,6 +43,7 @@ import org.apache.hadoop.mapred.UtilsForTests.FakeClock; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; +import org.apache.hadoop.mapreduce.split.JobSplit; import org.apache.hadoop.net.Node; public class TestFairScheduler extends TestCase { @@ -111,12 +112,14 @@ // create maps numMapTasks = conf.getNumMapTasks(); maps = new TaskInProgress[numMapTasks]; + // empty format + JobSplit.TaskSplitMetaInfo split = JobSplit.EMPTY_TASK_SPLIT; for (int i = 0; i < numMapTasks; i++) { String[] inputLocations = null; if (mapInputLocations != null) inputLocations = mapInputLocations[i]; maps[i] = new FakeTaskInProgress(getJobID(), i, - getJobConf(), this, inputLocations); + getJobConf(), this, inputLocations, split); if (mapInputLocations == null) // Job has no locality info nonLocalMaps.add(maps[i]); } @@ -137,7 +140,8 @@ if (!tip.isRunning() && !tip.isComplete() && getLocalityLevel(tip, tts) < localityLevel) { TaskAttemptID attemptId = getTaskAttemptID(tip); - Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1) { + JobSplit.TaskSplitMetaInfo split = JobSplit.EMPTY_TASK_SPLIT; + Task task = new MapTask("", attemptId, 0, split.getSplitIndex(), 1) { @Override public String toString() { return String.format("%s on %s", getTaskID(), tts.getTrackerName()); @@ -231,8 +235,9 @@ // Constructor for map FakeTaskInProgress(JobID jId, int id, JobConf jobConf, - FakeJobInProgress job, String[] inputLocations) { - super(jId, "", new Job.RawSplit(), null, jobConf, job, id, 1); + FakeJobInProgress job, String[] inputLocations, + JobSplit.TaskSplitMetaInfo split) { + super(jId, "", split, null, jobConf, job, id, 1); this.isMap = true; this.fakeJob = job; this.inputLocations = inputLocations; Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Mon Dec 21 17:36:44 2009 @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; @@ -162,7 +163,8 @@ switch (type) { case MAP: runInputBytes[i] = counters.findCounter("FileSystemCounters", - "HDFS_BYTES_READ").getValue(); + "HDFS_BYTES_READ").getValue() - + counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getValue(); runInputRecords[i] = (int)counters.findCounter(MAP_INPUT_RECORDS).getValue(); runOutputBytes[i] = Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java (original) +++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java Mon Dec 21 17:36:44 2009 @@ -349,6 +349,6 @@ } SimulatorJobCache.put(org.apache.hadoop.mapred.JobID.downgrade(jobId), job); - return jobTracker.submitJob(jobId); + return jobTracker.submitJob(jobId, "dummy-path"); } } Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java (original) +++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java Mon Dec 21 17:36:44 2009 @@ -28,11 +28,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job.RawSplit; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.net.Node; @@ -48,10 +48,10 @@ // cache private final JobStory jobStory; - RawSplit[] splits; + TaskSplitMetaInfo[] taskSplitMetaInfo; @SuppressWarnings("deprecation") - public SimulatorJobInProgress(JobID jobid, JobTracker jobtracker, + public SimulatorJobInProgress(JobID jobid, String jobSubmitDir, JobTracker jobtracker, JobConf default_conf, JobStory jobStory) { super(); // jobSetupCleanupNeeded set to false in parent cstr, though @@ -63,7 +63,7 @@ this.jobtracker = jobtracker; this.conf = jobStory.getJobConf(); this.priority = conf.getJobPriority(); - Path jobDir = jobtracker.getSystemDirectoryForJob(jobid); + Path jobDir = new Path(jobSubmitDir); this.jobFile = new Path(jobDir, "job.xml"); this.status = new JobStatus(jobid, 0.0f, 0.0f, 0.0f, 0.0f, JobStatus.PREP, priority, conf.getUser(), conf.getJobName(), jobFile.toString(), url); @@ -127,16 +127,17 @@ } final String jobFile = "default"; - splits = getRawSplits(jobStory.getInputSplits()); + taskSplitMetaInfo = createSplits(jobStory); if (loggingEnabled) { LOG.debug("(initTasks@SJIP) Created splits for job = " + jobId - + " number of splits = " + splits.length); + + " number of splits = " + taskSplitMetaInfo.length); } - createMapTasks(jobFile, splits); + createMapTasks(jobFile, taskSplitMetaInfo); if (numMapTasks > 0) { - nonRunningMapCache = createCache(splits, maxLevel); + nonRunningMapCache = createCache(taskSplitMetaInfo, + maxLevel); if (loggingEnabled) { LOG.debug("initTasks:numMaps=" + numMapTasks + " Size of nonRunningMapCache=" + nonRunningMapCache.size() @@ -167,25 +168,25 @@ } } - RawSplit[] getRawSplits(InputSplit[] splits) throws IOException { + + TaskSplitMetaInfo[] createSplits(JobStory story) throws IOException { + InputSplit[] splits = story.getInputSplits(); if (splits == null || splits.length != numMapTasks) { throw new IllegalArgumentException("Input split size mismatch: expected=" + numMapTasks + ", actual=" + ((splits == null) ? -1 : splits.length)); } - RawSplit rawSplits[] = new RawSplit[splits.length]; - for (int i = 0; i < splits.length; i++) { + TaskSplitMetaInfo[] splitMetaInfo = + new TaskSplitMetaInfo[story.getNumberMaps()]; + int i = 0; + for (InputSplit split : splits) { try { - rawSplits[i] = new RawSplit(); - rawSplits[i].setClassName(splits[i].getClass().getName()); - rawSplits[i].setDataLength(splits[i].getLength()); - rawSplits[i].setLocations(splits[i].getLocations()); + splitMetaInfo[i++] = new TaskSplitMetaInfo(split,0); } catch (InterruptedException ie) { throw new IOException(ie); } } - - return rawSplits; + return splitMetaInfo; } /** @@ -208,7 +209,8 @@ assert (jobid == getJobID()); // Get splits for the TaskAttempt - RawSplit split = splits[taskAttemptID.getTaskID().getId()]; + TaskSplitMetaInfo split = + taskSplitMetaInfo[taskAttemptID.getTaskID().getId()]; int locality = getClosestLocality(taskTracker, split); TaskID taskId = taskAttemptID.getTaskID(); @@ -232,7 +234,7 @@ return taskAttemptInfo; } - private int getClosestLocality(TaskTracker taskTracker, RawSplit split) { + private int getClosestLocality(TaskTracker taskTracker, TaskSplitMetaInfo split) { int locality = 2; Node taskTrackerNode = jobtracker Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java (original) +++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java Mon Dec 21 17:36:44 2009 @@ -173,7 +173,8 @@ } @Override - public synchronized JobStatus submitJob(JobID jobId) throws IOException { + public synchronized JobStatus submitJob(JobID jobId, String jobSubmitDir) + throws IOException { boolean loggingEnabled = LOG.isDebugEnabled(); if (loggingEnabled) { LOG.debug("submitJob for jobname = " + jobId); @@ -191,7 +192,7 @@ } validateAndSetClock(jobStory.getSubmissionTime()); - SimulatorJobInProgress job = new SimulatorJobInProgress(jobId, this, + SimulatorJobInProgress job = new SimulatorJobInProgress(jobId, jobSubmitDir, this, this.conf, jobStory); return addJob(jobId, job); Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java (original) +++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java Mon Dec 21 17:36:44 2009 @@ -43,6 +43,7 @@ import org.apache.hadoop.tools.rumen.TaskInfo; import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo; import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo; +import org.apache.hadoop.mapreduce.split.JobSplit.*; // // Mock jobtracker class that check heartbeat() in parameters and // sends responses based on a prepopulated table @@ -76,7 +77,7 @@ } @Override - public JobStatus submitJob(JobID jobId) throws IOException { + public JobStatus submitJob(JobID jobId, String jobSubmitDir) throws IOException { JobStatus status = new JobStatus(jobId, 0.0f, 0.0f, 0.0f, 0.0f, JobStatus.State.RUNNING, JobPriority.NORMAL, "", "", "", ""); return status; @@ -172,8 +173,8 @@ final int numSlotsRequired = 1; org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi = org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId); - Task task = new MapTask("dummyjobfile", taskIdOldApi, 0, "dummysplitclass", - null, numSlotsRequired); + Task task = new MapTask("dummyjobfile", taskIdOldApi, 0, new TaskSplitIndex(), + numSlotsRequired); // all byte counters are 0 TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); MapTaskAttemptInfo taskAttemptInfo = @@ -302,6 +303,11 @@ public String getSystemDir() { throw new UnsupportedOperationException(); } + + @Override + public String getStagingAreaDir() { + throw new UnsupportedOperationException(); + } @Override public String getBuildVersion() { Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java (original) +++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java Mon Dec 21 17:36:44 2009 @@ -79,7 +79,7 @@ FakeJobs job = new FakeJobs("job1", 0, numMaps, numReduces); SimulatorJobCache.put(org.apache.hadoop.mapred.JobID.downgrade(jobId), job); - jobTracker.submitJob(jobId); + jobTracker.submitJob(jobId, "dummy-path"); } } Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original) +++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Mon Dec 21 17:36:44 2009 @@ -248,6 +248,17 @@ The class responsible for scheduling the tasks. + + + mapreduce.job.split.metainfo.maxsize + 10000000 + The maximum permissible size of the split metainfo file. + The JobTracker won't attempt to read split metainfo files bigger than + the configured value. + No limits if set to -1. + + + mapreduce.jobtracker.taskscheduler.maxrunningtasks.perjob Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Mon Dec 21 17:36:44 2009 @@ -30,10 +30,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JvmTask; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; /** * IsolationRunner is intended to facilitate debugging by re-running a specific @@ -180,19 +179,21 @@ Thread.currentThread().setContextClassLoader(classLoader); conf.setClassLoader(classLoader); - // split.dta file is used only by IsolationRunner. The file can now be in - // any of the configured local disks, so use LocalDirAllocator to find out - // where it is. - Path localSplit = + // split.dta/split.meta files are used only by IsolationRunner. + // The file can now be in any of the configured local disks, + // so use LocalDirAllocator to find out where it is. + Path localMetaSplit = new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead( - TaskTracker.getLocalSplitFile(conf.getUser(), taskId.getJobID() - .toString(), taskId.toString()), conf); - DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit); - String splitClass = Text.readString(splitFile); - BytesWritable split = new BytesWritable(); - split.readFields(splitFile); + TaskTracker.getLocalSplitMetaFile(conf.getUser(), + taskId.getJobID().toString(), taskId + .toString()), conf); + DataInputStream splitFile = FileSystem.getLocal(conf).open(localMetaSplit); + TaskSplitIndex splitIndex = new TaskSplitIndex(); + splitIndex.readFields(splitFile); splitFile.close(); - Task task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split, 1); + + Task task = + new MapTask(jobFilename.toString(), taskId, partition, splitIndex, 1); task.setConf(conf); task.run(conf, new FakeUmbilical()); return true; Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon Dec 21 17:36:44 2009 @@ -476,6 +476,13 @@ } /** + * Get a handle to the Cluster + */ + public Cluster getClusterHandle() { + return cluster; + } + + /** * Submit a job to the MR system. * * This returns a handle to the {@link RunningJob} which can be used to track @@ -523,37 +530,6 @@ } } - /** - * Checks if the job directory is clean and has all the required components - * for (re) starting the job - */ - public static boolean isJobDirValid(Path jobDirPath, FileSystem fs) - throws IOException { - FileStatus[] contents = null; - - try { - contents = fs.listStatus(jobDirPath); - } catch(FileNotFoundException fnfe) { - return false; - } - - int matchCount = 0; - if (contents.length >=2) { - for (FileStatus status : contents) { - if ("job.xml".equals(status.getPath().getName())) { - ++matchCount; - } - if ("job.split".equals(status.getPath().getName())) { - ++matchCount; - } - } - if (matchCount == 2) { - return true; - } - } - return false; - } - /** * Get an {@link RunningJob} object to track an ongoing job. Returns * null if the id does not correspond to any known job. Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Dec 21 17:36:44 2009 @@ -17,7 +17,6 @@ */ package org.apache.hadoop.mapred; -import java.io.DataInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -38,16 +37,16 @@ import java.util.Vector; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.JobSubmissionFiles; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistory; import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent; @@ -66,12 +65,16 @@ import org.apache.hadoop.mapreduce.security.JobTokens; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; +import org.apache.hadoop.mapreduce.split.JobSplit; +import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; +import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.fs.FSDataOutputStream; @@ -101,7 +104,6 @@ JobStatus status; Path jobFile = null; Path localJobFile = null; - Path localJarFile = null; TaskInProgress maps[] = new TaskInProgress[0]; TaskInProgress reduces[] = new TaskInProgress[0]; @@ -298,6 +300,7 @@ new HashMap(); private Map trackersReservedForReduces = new HashMap(); + private Path jobSubmitDir = null; /** * Create an almost empty JobInProgress, which can be used only for tests @@ -352,45 +355,52 @@ * Create a JobInProgress with the given job file, plus a handle * to the tracker. */ - public JobInProgress(JobID jobid, JobTracker jobtracker, - JobConf default_conf, int rCount) throws IOException { + public JobInProgress(JobTracker jobtracker, + JobConf default_conf, int rCount, + JobInfo jobInfo) throws IOException { this.restartCount = rCount; - this.jobId = jobid; + this.jobId = JobID.downgrade(jobInfo.getJobID()); String url = "http://" + jobtracker.getJobTrackerMachine() + ":" - + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid; + + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + this.jobId; this.jobtracker = jobtracker; this.jobHistory = jobtracker.getJobHistory(); this.startTime = System.currentTimeMillis(); this.localFs = jobtracker.getLocalFileSystem(); - JobConf default_job_conf = new JobConf(default_conf); - this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR - +"/"+jobid + ".xml"); - this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR - +"/"+ jobid + ".jar"); - Path jobDir = jobtracker.getSystemDirectoryForJob(jobId); - fs = jobtracker.getFileSystem(jobDir); - jobFile = new Path(jobDir, "job.xml"); + // use the user supplied token to add user credentials to the conf + jobSubmitDir = jobInfo.getJobSubmitDir(); + String user = jobInfo.getUser().toString(); + conf = new JobConf(); + conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, + new UnixUserGroupInformation(user, + new String[]{UnixUserGroupInformation.DEFAULT_GROUP}).toString()); + fs = jobSubmitDir.getFileSystem(conf); + + this.localJobFile = + default_conf.getLocalPath(JobTracker.SUBDIR + "/" + this.jobId + ".xml"); + + jobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir); fs.copyToLocalFile(jobFile, localJobFile); conf = new JobConf(localJobFile); + if (conf.getUser() == null) { + this.conf.setUser(user); + } + if (!conf.getUser().equals(user)) { + throw new IOException("The username obtained from the conf doesn't " + + "match the username the user authenticated as"); + } this.priority = conf.getJobPriority(); - this.profile = new JobProfile(conf.getUser(), jobid, - jobFile.toString(), url, conf.getJobName(), - conf.getQueueName()); - this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP, + this.profile = new JobProfile(conf.getUser(), this.jobId, + jobFile.toString(), url, + conf.getJobName(), conf.getQueueName()); + this.status = new JobStatus(this.jobId, 0.0f, 0.0f, JobStatus.PREP, profile.getUser(), profile.getJobName(), profile.getJobFile(), profile.getURL().toString()); - this.jobtracker.getInstrumentation().addPrepJob(conf, jobid); + this.jobtracker.getInstrumentation().addPrepJob(conf, this.jobId); status.setStartTime(startTime); this.status.setJobPriority(this.priority); - String jarFile = conf.getJar(); - if (jarFile != null) { - fs.copyToLocalFile(new Path(jarFile), localJarFile); - conf.setJar(localJarFile.toString()); - } - this.numMapTasks = conf.getNumMapTasks(); this.numReduceTasks = conf.getNumReduceTasks(); this.taskCompletionEvents = new ArrayList @@ -406,7 +416,7 @@ this.jobMetrics.setTag("user", conf.getUser()); this.jobMetrics.setTag("sessionId", conf.getSessionId()); this.jobMetrics.setTag("jobName", conf.getJobName()); - this.jobMetrics.setTag("jobId", jobid.toString()); + this.jobMetrics.setTag("jobId", this.jobId.toString()); hasSpeculativeMaps = conf.getMapSpeculativeExecution(); hasSpeculativeReduces = conf.getReduceSpeculativeExecution(); this.maxLevel = jobtracker.getNumTaskCacheLevels(); @@ -473,7 +483,7 @@ } Map> createCache( - Job.RawSplit[] splits, int maxLevel) { + TaskSplitMetaInfo[] splits, int maxLevel) { Map> cache = new IdentityHashMap>(maxLevel); @@ -586,26 +596,25 @@ // // read input splits and create a map per a split // - String jobFile = profile.getJobFile(); - - Job.RawSplit[] splits = createSplits(); - numMapTasks = splits.length; + TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId); + numMapTasks = taskSplitMetaInfo.length; checkTaskLimits(); jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks); jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks); - createMapTasks(jobFile, splits); + createMapTasks(jobFile.toString(), taskSplitMetaInfo); if (numMapTasks > 0) { - nonRunningMapCache = createCache(splits, maxLevel); + nonRunningMapCache = createCache(taskSplitMetaInfo, + maxLevel); } // set the launch time this.launchTime = JobTracker.getClock().getTime(); - createReduceTasks(jobFile); + createReduceTasks(jobFile.toString()); // Calculate the minimum number of maps to be complete before // we should start scheduling reduces @@ -615,7 +624,7 @@ DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * numMapTasks)); - initSetupCleanupTasks(jobFile); + initSetupCleanupTasks(jobFile.toString()); synchronized(jobInitKillStatus){ jobInitKillStatus.initDone = true; @@ -669,16 +678,11 @@ } - Job.RawSplit[] createSplits() throws IOException { - DataInputStream splitFile = - fs.open(new Path(conf.get(JobContext.SPLIT_FILE))); - Job.RawSplit[] splits; - try { - splits = Job.readSplitFile(splitFile); - } finally { - splitFile.close(); - } - return splits; + TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId) + throws IOException { + TaskSplitMetaInfo[] allTaskSplitMetaInfo = + SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf, jobSubmitDir); + return allTaskSplitMetaInfo; } /** @@ -695,10 +699,11 @@ } } - synchronized void createMapTasks(String jobFile, Job.RawSplit[] splits) { + synchronized void createMapTasks(String jobFile, + TaskSplitMetaInfo[] splits) { maps = new TaskInProgress[numMapTasks]; for(int i=0; i < numMapTasks; ++i) { - inputLength += splits[i].getDataLength(); + inputLength += splits[i].getInputDataLength(); maps[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, @@ -720,6 +725,7 @@ } } + synchronized void initSetupCleanupTasks(String jobFile) { if (!jobSetupCleanupNeeded) { // nothing to initialize @@ -730,7 +736,7 @@ // cleanup map tip. This map doesn't use any splits. Just assign an empty // split. - Job.RawSplit emptySplit = new Job.RawSplit(); + TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT; cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks, 1); cleanup[0].setJobCleanupTask(); @@ -3216,20 +3222,7 @@ localFs.delete(localJobFile, true); localJobFile = null; } - if (localJarFile != null) { - localFs.delete(localJarFile, true); - localJarFile = null; - } - - // clean up splits - for (int i = 0; i < maps.length; i++) { - maps[i].clearSplit(); - } - // JobClient always creates a new directory with job files - // so we remove that directory to cleanup - // Delete temp dfs dirs created if any, like in case of - // speculative exn of reduces. Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID()); new CleanupQueue().addToQueue(jobtracker.getFileSystem(tempDir), tempDir); } catch (IOException e) { @@ -3534,7 +3527,8 @@ */ private void generateJobTokens(Path jobDir) throws IOException{ Path keysFile = new Path(jobDir, JobTokens.JOB_TOKEN_FILENAME); - FSDataOutputStream os = fs.create(keysFile); + // we need to create this file using the jobtracker's filesystem + FSDataOutputStream os = jobtracker.getFileSystem().create(keysFile); //create JobTokens file and add key to it JobTokens jt = new JobTokens(); byte [] key; Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInfo.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInfo.java?rev=892893&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInfo.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInfo.java Mon Dec 21 17:36:44 2009 @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * Represents the basic information that is saved per a job when the + * JobTracker receives a submitJob request. The information is saved + * so that the JobTracker can recover incomplete jobs upon restart. + */ +class JobInfo implements Writable { + private org.apache.hadoop.mapreduce.JobID id; + private Text user; + private Path jobSubmitDir; + public JobInfo() {} + + public JobInfo(org.apache.hadoop.mapreduce.JobID id, + Text user, + Path jobSubmitDir) { + this.id = id; + this.user = user; + this.jobSubmitDir = jobSubmitDir; + } + + /** + * Get the job id. + */ + public org.apache.hadoop.mapreduce.JobID getJobID() { + return id; + } + + /** + * Get the configured job's user-name. + */ + public Text getUser() { + return user; + } + + /** + * Get the job submission directory + */ + public Path getJobSubmitDir() { + return this.jobSubmitDir; + } + + public void readFields(DataInput in) throws IOException { + id = new org.apache.hadoop.mapreduce.JobID(); + id.readFields(in); + user = new Text(); + user.readFields(in); + jobSubmitDir = new Path(WritableUtils.readString(in)); + } + + public void write(DataOutput out) throws IOException { + id.write(out); + user.write(out); + WritableUtils.writeString(out, jobSubmitDir.toString()); + } +} \ No newline at end of file Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Dec 21 17:36:44 2009 @@ -64,6 +64,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.http.HttpServer; +import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.RPC.VersionMismatch; @@ -147,6 +148,8 @@ public static enum State { INITIALIZING, RUNNING } State state = State.INITIALIZING; private static final int FS_ACCESS_RETRY_PERIOD = 10000; + + static final String JOB_INFO_FILE = "job-info"; private DNSToSwitchMapping dnsToSwitchMapping; NetworkTopology clusterMap = new NetworkTopology(); @@ -156,9 +159,9 @@ private final List jobInProgressListeners = new CopyOnWriteArrayList(); - // system directories are world-wide readable and owner readable + // system directory is completely owned by the JobTracker final static FsPermission SYSTEM_DIR_PERMISSION = - FsPermission.createImmutable((short) 0733); // rwx-wx-wx + FsPermission.createImmutable((short) 0700); // rwx------ // system files should have 700 permission final static FsPermission SYSTEM_FILE_PERMISSION = @@ -172,6 +175,8 @@ private MRAsyncDiskService asyncDiskService; + private final String defaultStagingBaseDir; + /** * A client tried to submit a job before the Job Tracker was ready. */ @@ -991,36 +996,10 @@ return jobsToRecover; } - /** Check if the given string represents a job-id or not - */ - private boolean isJobNameValid(String str) { - if(str == null) { - return false; - } - String[] parts = str.split("_"); - if(parts.length == 3) { - if(parts[0].equals("job")) { - // other 2 parts should be parseable - return JobTracker.validateIdentifier(parts[1]) - && JobTracker.validateJobNumber(parts[2]); - } - } - return false; - } - - // checks if the job dir has the required files - public void checkAndAddJob(FileStatus status) throws IOException { - String fileName = status.getPath().getName(); - if (isJobNameValid(fileName)) { - if (JobClient.isJobDirValid(status.getPath(), fs)) { - recoveryManager.addJobForRecovery(JobID.forName(fileName)); - shouldRecover = true; // enable actual recovery if num-files > 1 - } else { - LOG.info("Found an incomplete job directory " + fileName + "." - + " Deleting it!!"); - fs.delete(status.getPath(), true); - } - } + // add the job + void addJobForRecovery(FileStatus status) throws IOException { + recoveryManager.addJobForRecovery(JobID.forName(status.getPath().getName())); + shouldRecover = true; // enable actual recovery if num-files > 1 } @@ -1127,7 +1106,16 @@ for (JobID jobId : jobsToRecover) { LOG.info("Submitting job "+ jobId); try { - submitJob(jobId, restartCount); + Path jobInfoFile = getSystemFileForJob(jobId); + FSDataInputStream in = fs.open(jobInfoFile); + JobInfo token = new JobInfo(); + token.readFields(in); + in.close(); + UnixUserGroupInformation ugi = new UnixUserGroupInformation( + token.getUser().toString(), + new String[]{UnixUserGroupInformation.DEFAULT_GROUP}); + submitJob(token.getJobID(), restartCount, + ugi, token.getJobSubmitDir().toString(), true); recovered++; } catch (Exception e) { LOG.warn("Could not recover job " + jobId, e); @@ -1311,6 +1299,7 @@ tasktrackerExpiryInterval = 0; myInstrumentation = new JobTrackerMetricsInst(this, new JobConf()); mrOwner = null; + defaultStagingBaseDir = "/Users"; } @@ -1466,6 +1455,18 @@ if(systemDir == null) { systemDir = new Path(getSystemDir()); } + try { + FileStatus systemDirStatus = fs.getFileStatus(systemDir); + if (!systemDirStatus.getOwner().equals(mrOwner.getUserName())) { + throw new AccessControlException("The systemdir " + systemDir + + " is not owned by " + mrOwner.getUserName()); + } + if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) { + LOG.warn("Incorrect permissions on " + systemDir + + ". Setting it to " + SYSTEM_DIR_PERMISSION); + fs.setPermission(systemDir, SYSTEM_DIR_PERMISSION); + } + } catch (FileNotFoundException fnf) {} //ignore // Make sure that the backup data is preserved FileStatus[] systemDirData; try { @@ -1480,7 +1481,7 @@ && systemDirData != null) { for (FileStatus status : systemDirData) { try { - recoveryManager.checkAndAddJob(status); + recoveryManager.addJobForRecovery(status); } catch (Throwable t) { LOG.warn("Failed to add the job " + status.getPath().getName(), t); @@ -1536,6 +1537,8 @@ //initializes the job status store completedJobStatusStore = new CompletedJobStatusStore(conf); + Path homeDir = fs.getHomeDirectory(); + defaultStagingBaseDir = homeDir.getParent().toString(); } private static SimpleDateFormat getDateFormat() { @@ -2871,8 +2874,8 @@ * the JobTracker alone. */ public synchronized org.apache.hadoop.mapreduce.JobStatus submitJob( - org.apache.hadoop.mapreduce.JobID jobId) throws IOException { - return submitJob(JobID.downgrade(jobId)); + org.apache.hadoop.mapreduce.JobID jobId, String jobSubmitDir) throws IOException { + return submitJob(JobID.downgrade(jobId), jobSubmitDir); } /** @@ -2883,45 +2886,49 @@ * of the JobTracker. But JobInProgress adds info that's useful for * the JobTracker alone. * @deprecated Use - * {@link #submitJob(org.apache.hadoop.mapreduce.JobID)} instead + * {@link #submitJob(org.apache.hadoop.mapreduce.JobID, String)} instead */ @Deprecated - public synchronized JobStatus submitJob(JobID jobId) throws IOException { - return submitJob(jobId, 0); + public synchronized JobStatus submitJob(JobID jobId, String jobSubmitDir) + throws IOException { + return submitJob(jobId, 0, + UserGroupInformation.getCurrentUGI(), + jobSubmitDir, false); } /** * Submits either a new job or a job from an earlier run. */ - private synchronized JobStatus submitJob(JobID jobId, - int restartCount) throws IOException { + private synchronized JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobID, + int restartCount, UserGroupInformation ugi, String jobSubmitDir, + boolean recovered) throws IOException { + JobID jobId = JobID.downgrade(jobID); if(jobs.containsKey(jobId)) { //job already running, don't start twice return jobs.get(jobId).getStatus(); } - - JobInProgress job = new JobInProgress(jobId, this, this.conf, restartCount); + + //the conversion from String to Text for the UGI's username will + //not be required when we have the UGI to return us the username as + //Text. + JobInfo jobInfo = new JobInfo(jobId, new Text(ugi.getUserName()), + new Path(jobSubmitDir)); + JobInProgress job = new JobInProgress(this, this.conf, restartCount, jobInfo); String queue = job.getProfile().getQueueName(); if(!(queueManager.getLeafQueueNames().contains(queue))) { - new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId)); throw new IOException("Queue \"" + queue + "\" does not exist"); } //check if queue is RUNNING if(!queueManager.isRunning(queue)) { - new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId)); throw new IOException("Queue \"" + queue + "\" is not running"); } try { - // check for access - UserGroupInformation ugi = - UserGroupInformation.readFrom(job.getJobConf()); checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi); } catch (IOException ioe) { - LOG.warn("Access denied for user " + job.getJobConf().getUser() - + ". Ignoring job " + jobId, ioe); - new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId)); + LOG.warn("Access denied for user " + job.getJobConf().getUser() + + ". Ignoring job " + jobId, ioe); throw ioe; } @@ -2930,11 +2937,19 @@ try { checkMemoryRequirements(job); } catch (IOException ioe) { - new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId)); throw ioe; } - return addJob(jobId, job); + if (!recovered) { + //Store the information in a file so that the job can be recovered + //later (if at all) + Path jobDir = getSystemDirectoryForJob(jobId); + FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION)); + FSDataOutputStream out = fs.create(getSystemFileForJob(jobId)); + jobInfo.write(out); + out.close(); + } + return addJob(jobId, job); } /** @@ -3600,6 +3615,17 @@ } /** + * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir() + */ + public String getStagingAreaDir() { + Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, + defaultStagingBaseDir)); + String user = UserGroupInformation.getCurrentUGI().getUserName(); + return fs.makeQualified(new Path(stagingRootDir, + user+"/.staging")).toString(); + } + + /** * @see * org.apache.hadoop.mapreduce.protocol.ClientProtocol#getJobHistoryDir() */ @@ -3614,10 +3640,15 @@ return jobs.get(jobid); } - // Get the job directory in system directory + //Get the job directory in system directory Path getSystemDirectoryForJob(JobID id) { return new Path(getSystemDir(), id.toString()); } + + //Get the job token file in system directory + Path getSystemFileForJob(JobID id) { + return new Path(getSystemDirectoryForJob(id)+"/" + JOB_INFO_FILE); + } /** * Change the run-time priority of the given job. @@ -4304,6 +4335,8 @@ //initializes the job status store completedJobStatusStore = new CompletedJobStatusStore(conf); + Path homeDir = fs.getHomeDirectory(); + defaultStagingBaseDir = homeDir.getParent().toString(); } /** Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Dec 21 17:36:44 2009 @@ -23,19 +23,16 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Job.RawSplit; import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.io.serializer.Serializer; import org.apache.hadoop.mapreduce.ClusterMetrics; import org.apache.hadoop.mapreduce.QueueInfo; @@ -48,7 +45,9 @@ import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.server.jobtracker.State; -import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; +import org.apache.hadoop.security.UserGroupInformation; /** Implements MapReduce locally, in-process, for debugging. */ public class LocalJobRunner implements ClientProtocol { @@ -60,6 +59,7 @@ private JobConf conf; private int map_tasks = 0; private int reduce_tasks = 0; + final Random rand = new Random(); private JobTrackerInstrumentation myMetrics = null; @@ -68,33 +68,6 @@ public long getProtocolVersion(String protocol, long clientVersion) { return ClientProtocol.versionID; } - - @SuppressWarnings("unchecked") - static RawSplit[] getRawSplits(JobContext jContext, JobConf job) - throws Exception { - JobConf jobConf = jContext.getJobConf(); - org.apache.hadoop.mapreduce.InputFormat input = - ReflectionUtils.newInstance(jContext.getInputFormatClass(), jobConf); - - List splits = input.getSplits(jContext); - RawSplit[] rawSplits = new RawSplit[splits.size()]; - DataOutputBuffer buffer = new DataOutputBuffer(); - SerializationFactory factory = new SerializationFactory(jobConf); - Serializer serializer = - factory.getSerializer(splits.get(0).getClass()); - serializer.open(buffer); - for (int i = 0; i < splits.size(); i++) { - buffer.reset(); - serializer.serialize(splits.get(i)); - RawSplit rawSplit = new RawSplit(); - rawSplit.setClassName(splits.get(i).getClass().getName()); - rawSplit.setDataLength(splits.get(i).getLength()); - rawSplit.setBytes(buffer.getData(), 0, buffer.getLength()); - rawSplit.setLocations(splits.get(i).getLocations()); - rawSplits[i] = rawSplit; - } - return rawSplits; - } private class Job extends Thread implements TaskUmbilicalProtocol { // The job directory on the system: JobClient places job configurations here. @@ -130,8 +103,8 @@ return TaskUmbilicalProtocol.versionID; } - public Job(JobID jobid) throws IOException { - this.systemJobDir = new Path(getSystemDir(), jobid.toString()); + public Job(JobID jobid, String jobSubmitDir) throws IOException { + this.systemJobDir = new Path(jobSubmitDir); this.systemJobFile = new Path(systemJobDir, "job.xml"); this.id = jobid; JobConf conf = new JobConf(systemJobFile); @@ -202,26 +175,10 @@ JobContext jContext = new JobContextImpl(job, jobId); OutputCommitter outputCommitter = job.getOutputCommitter(); try { - // split input into minimum number of splits - RawSplit[] rawSplits; - if (job.getUseNewMapper()) { - rawSplits = getRawSplits(jContext, job); - } else { - InputSplit[] splits = job.getInputFormat().getSplits(job, 1); - rawSplits = new RawSplit[splits.length]; - DataOutputBuffer buffer = new DataOutputBuffer(); - for (int i = 0; i < splits.length; i++) { - buffer.reset(); - splits[i].write(buffer); - RawSplit rawSplit = new RawSplit(); - rawSplit.setClassName(splits[i].getClass().getName()); - rawSplit.setDataLength(splits[i].getLength()); - rawSplit.setBytes(buffer.getData(), 0, buffer.getLength()); - rawSplit.setLocations(splits[i].getLocations()); - rawSplits[i] = rawSplit; - } - } + TaskSplitMetaInfo[] taskSplitMetaInfos = + SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir); + int numReduceTasks = job.getNumReduceTasks(); if (numReduceTasks > 1 || numReduceTasks < 0) { // we only allow 0 or 1 reducer in local mode @@ -233,15 +190,14 @@ Map mapOutputFiles = new HashMap(); - for (int i = 0; i < rawSplits.length; i++) { + for (int i = 0; i < taskSplitMetaInfos.length; i++) { if (!this.isInterrupted()) { TaskAttemptID mapId = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, i),0); mapIds.add(mapId); MapTask map = new MapTask(systemJobFile.toString(), mapId, i, - rawSplits[i].getClassName(), - rawSplits[i].getBytes(), 1); + taskSplitMetaInfos[i].getSplitIndex(), 1); JobConf localConf = new JobConf(job); TaskRunner.setupChildMapredLocalDirs(map, localConf); @@ -459,9 +415,9 @@ } public org.apache.hadoop.mapreduce.JobStatus submitJob( - org.apache.hadoop.mapreduce.JobID jobid) + org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir) throws IOException { - return new Job(JobID.downgrade(jobid)).status; + return new Job(JobID.downgrade(jobid), jobSubmitDir).status; } public void killJob(org.apache.hadoop.mapreduce.JobID id) { @@ -564,6 +520,22 @@ return fs.makeQualified(sysDir).toString(); } + /** + * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir() + */ + public String getStagingAreaDir() { + Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, + "/tmp/hadoop/mapred/staging")); + UserGroupInformation ugi = UserGroupInformation.getCurrentUGI(); + String user; + if (ugi != null) { + user = ugi.getUserName() + rand.nextInt(); + } else { + user = "dummy" + rand.nextInt(); + } + return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString(); + } + public String getJobHistoryDir() { return null; } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Dec 21 17:36:44 2009 @@ -33,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; @@ -43,6 +44,7 @@ import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; @@ -54,6 +56,10 @@ import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; +import org.apache.hadoop.mapreduce.split.JobSplit; +import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.task.MapContextImpl; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -72,7 +78,7 @@ */ public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24; - private BytesWritable split = new BytesWritable(); + private TaskSplitIndex splitMetaInfo = new TaskSplitIndex(); private String splitClass; private final static int APPROX_HEADER_LENGTH = 150; @@ -81,7 +87,6 @@ private Progress mapPhase; private Progress sortPhase; - { // set phase for this task setPhase(TaskStatus.Phase.MAP); getProgress().setStatus("map"); @@ -92,11 +97,10 @@ } public MapTask(String jobFile, TaskAttemptID taskId, - int partition, String splitClass, BytesWritable split, + int partition, TaskSplitIndex splitIndex, int numSlotsRequired) { super(jobFile, taskId, partition, numSlotsRequired); - this.splitClass = splitClass; - this.split = split; + this.splitMetaInfo = splitIndex; } @Override @@ -108,26 +112,26 @@ public void localizeConfiguration(JobConf conf) throws IOException { super.localizeConfiguration(conf); - // split.dta file is used only by IsolationRunner. + // split.dta/split.info files are used only by IsolationRunner. // Write the split file to the local disk if it is a normal map task (not a // job-setup or a job-cleanup task) and if the user wishes to run // IsolationRunner either by setting keep.failed.tasks.files to true or by // using keep.tasks.files.pattern - if (isMapOrReduce() - && (conf.getKeepTaskFilesPattern() != null || conf - .getKeepFailedTaskFiles())) { - Path localSplit = + if (supportIsolationRunner(conf) && isMapOrReduce()) { + // localize the split meta-information + Path localSplitMeta = new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathForWrite( - TaskTracker.getLocalSplitFile(conf.getUser(), getJobID() - .toString(), getTaskID().toString()), conf); - LOG.debug("Writing local split to " + localSplit); - DataOutputStream out = FileSystem.getLocal(conf).create(localSplit); - Text.writeString(out, splitClass); - split.write(out); + TaskTracker.getLocalSplitMetaFile(conf.getUser(), + getJobID().toString(), getTaskID() + .toString()), conf); + LOG.debug("Writing local split to " + localSplitMeta); + DataOutputStream out = FileSystem.getLocal(conf).create(localSplitMeta); + splitMetaInfo.write(out); out.close(); } } + @Override public TaskRunner createRunner(TaskTracker tracker, TaskTracker.TaskInProgress tip) { @@ -138,9 +142,8 @@ public void write(DataOutput out) throws IOException { super.write(out); if (isMapOrReduce()) { - Text.writeString(out, splitClass); - split.write(out); - split = null; + splitMetaInfo.write(out); + splitMetaInfo = null; } } @@ -148,8 +151,7 @@ public void readFields(DataInput in) throws IOException { super.readFields(in); if (isMapOrReduce()) { - splitClass = Text.readString(in); - split.readFields(in); + splitMetaInfo.readFields(in); } } @@ -320,36 +322,52 @@ } if (useNewApi) { - runNewMapper(job, split, umbilical, reporter); + runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { - runOldMapper(job, split, umbilical, reporter); + runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); } + @SuppressWarnings("unchecked") + private T getSplitDetails(Path file, long offset) + throws IOException { + FileSystem fs = file.getFileSystem(conf); + FSDataInputStream inFile = fs.open(file); + inFile.seek(offset); + String className = Text.readString(inFile); + Class cls; + try { + cls = (Class) conf.getClassByName(className); + } catch (ClassNotFoundException ce) { + IOException wrap = new IOException("Split class " + className + + " not found"); + wrap.initCause(ce); + throw wrap; + } + SerializationFactory factory = new SerializationFactory(conf); + Deserializer deserializer = + (Deserializer) factory.getDeserializer(cls); + deserializer.open(inFile); + T split = deserializer.deserialize(null); + long pos = inFile.getPos(); + getCounters().findCounter( + TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset); + inFile.close(); + return split; + } + @SuppressWarnings("unchecked") private void runOldMapper(final JobConf job, - final BytesWritable rawSplit, + final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, InterruptedException, ClassNotFoundException { - InputSplit inputSplit = null; - // reinstantiate the split - try { - inputSplit = (InputSplit) - ReflectionUtils.newInstance(job.getClassByName(splitClass), job); - } catch (ClassNotFoundException exp) { - IOException wrap = new IOException("Split class " + splitClass + - " not found"); - wrap.initCause(exp); - throw wrap; - } - DataInputBuffer splitBuffer = new DataInputBuffer(); - splitBuffer.reset(split.getBytes(), 0, split.getLength()); - inputSplit.readFields(splitBuffer); - + InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()), + splitIndex.getStartOffset()); + updateJobWithSplit(job, inputSplit); reporter.setInputSplit(inputSplit); @@ -578,7 +596,7 @@ @SuppressWarnings("unchecked") private void runNewMapper(final JobConf job, - final BytesWritable rawSplit, + final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, @@ -597,15 +615,8 @@ ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null; - DataInputBuffer splitBuffer = new DataInputBuffer(); - splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength()); - SerializationFactory factory = new SerializationFactory(job); - Deserializer - deserializer = - (Deserializer) - factory.getDeserializer(job.getClassByName(splitClass)); - deserializer.open(splitBuffer); - split = deserializer.deserialize(null); + split = getSplitDetails(new Path(splitIndex.getSplitLocation()), + splitIndex.getStartOffset()); org.apache.hadoop.mapreduce.RecordReader input = new NewTrackingRecordReader Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Dec 21 17:36:44 2009 @@ -922,8 +922,22 @@ + JobStatus.State.FAILED + " or " + JobStatus.State.KILLED); } + + // delete the staging area for the job + JobConf conf = new JobConf(jobContext.getConfiguration()); + if (!supportIsolationRunner(conf)) { + String jobTempDir = conf.get("mapreduce.job.dir"); + Path jobTempDirPath = new Path(jobTempDir); + FileSystem fs = jobTempDirPath.getFileSystem(conf); + fs.delete(jobTempDirPath, true); + } done(umbilical, reporter); } + + protected boolean supportIsolationRunner(JobConf conf) { + return (conf.getKeepTaskFilesPattern() != null || conf + .getKeepFailedTaskFiles()); + } protected void runJobSetupTask(TaskUmbilicalProtocol umbilical, TaskReporter reporter Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon Dec 21 17:36:44 2009 @@ -31,13 +31,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobInProgress.DataStatistics; import org.apache.hadoop.mapred.SortedRanges.Range; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.jobhistory.JobHistory; import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.net.Node; @@ -65,7 +64,7 @@ // Defines the TIP private String jobFile = null; - private Job.RawSplit rawSplit; + private TaskSplitMetaInfo splitInfo; private int numMaps; private int partition; private JobTracker jobtracker; @@ -140,12 +139,12 @@ * Constructor for MapTask */ public TaskInProgress(JobID jobid, String jobFile, - Job.RawSplit rawSplit, + TaskSplitMetaInfo split, JobTracker jobtracker, JobConf conf, JobInProgress job, int partition, int numSlotsRequired) { this.jobFile = jobFile; - this.rawSplit = rawSplit; + this.splitInfo = split; this.jobtracker = jobtracker; this.job = job; this.conf = conf; @@ -316,7 +315,7 @@ * Whether this is a map task */ public boolean isMapTask() { - return rawSplit != null; + return splitInfo != null; } /** @@ -807,7 +806,7 @@ */ public String[] getSplitLocations() { if (isMapTask() && !jobSetup && !jobCleanup) { - return rawSplit.getLocations(); + return splitInfo.getLocations(); } return new String[0]; } @@ -1012,16 +1011,8 @@ if (isMapTask()) { LOG.debug("attempt " + numTaskFailures + " sending skippedRecords " + failedRanges.getIndicesCount()); - String splitClass = null; - BytesWritable split; - if (!jobSetup && !jobCleanup) { - splitClass = rawSplit.getClassName(); - split = rawSplit.getBytes(); - } else { - split = new BytesWritable(); - } - t = new MapTask(jobFile, taskid, partition, splitClass, split, - numSlotsNeeded); + t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(), + numSlotsNeeded); } else { t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded); } @@ -1134,7 +1125,7 @@ if (!isMapTask() || jobSetup || jobCleanup) { return ""; } - String[] splits = rawSplit.getLocations(); + String[] splits = splitInfo.getLocations(); Node[] nodes = new Node[splits.length]; for (int i = 0; i < splits.length; i++) { nodes[i] = jobtracker.getNode(splits[i]); @@ -1164,16 +1155,12 @@ public long getMapInputSize() { if(isMapTask() && !jobSetup && !jobCleanup) { - return rawSplit.getDataLength(); + return splitInfo.getInputDataLength(); } else { return 0; } } - public void clearSplit() { - rawSplit.clearBytes(); - } - /** * Compare most recent task attempts dispatch time to current system time so * that task progress rate will slow down as time proceeds even if no progress Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Dec 21 17:36:44 2009 @@ -86,6 +86,8 @@ import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.ConfiguredPolicy; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; @@ -218,6 +220,7 @@ static final String OUTPUT = "output"; private static final String JARSDIR = "jars"; static final String LOCAL_SPLIT_FILE = "split.dta"; + static final String LOCAL_SPLIT_META_FILE = "split.info"; static final String JOBFILE = "job.xml"; static final String JOB_TOKEN_FILE="jobToken"; //localized file @@ -474,11 +477,16 @@ return getLocalJobDir(user, jobid) + Path.SEPARATOR + MRConstants.WORKDIR; } - static String getLocalSplitFile(String user, String jobid, String taskid) { + static String getLocalSplitMetaFile(String user, String jobid, String taskid){ return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR - + TaskTracker.LOCAL_SPLIT_FILE; + + TaskTracker.LOCAL_SPLIT_META_FILE; } + static String getLocalSplitFile(String user, String jobid, String taskid) { + return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR + + TaskTracker.LOCAL_SPLIT_FILE; + } + static String getIntermediateOutputDir(String user, String jobid, String taskid) { return getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR @@ -898,6 +906,13 @@ launchTaskForJob(tip, new JobConf(rjob.jobConf)); } + private void setUgi(String user, Configuration conf) { + //The dummy-group used here will not be required once we have UGI + //object creation with just the user name. + conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, + user+","+UnixUserGroupInformation.DEFAULT_GROUP); + } + /** * Localize the job on this tasktracker. Specifically *
    @@ -940,6 +955,7 @@ } System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath()); localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath()); + setUgi(userName, localJobConf); // Download the job.jar for this job from the system FS localizeJobJarFile(userName, jobId, localFs, localJobConf); @@ -958,12 +974,17 @@ */ private Path localizeJobConfFile(Path jobFile, String user, JobID jobId) throws IOException { - // Get sizes of JobFile and JarFile + JobConf conf = new JobConf(getJobConf()); + setUgi(user, conf); + + FileSystem userFs = jobFile.getFileSystem(conf); + // Get sizes of JobFile // sizes are -1 if they are not present. FileStatus status = null; long jobFileSize = -1; try { - status = systemFS.getFileStatus(jobFile); + + status = userFs.getFileStatus(jobFile); jobFileSize = status.getLen(); } catch(FileNotFoundException fe) { jobFileSize = -1; @@ -974,7 +995,7 @@ jobFileSize, fConf); // Download job.xml - systemFS.copyToLocalFile(jobFile, localJobFile); + userFs.copyToLocalFile(jobFile, localJobFile); return localJobFile; } @@ -996,8 +1017,9 @@ long jarFileSize = -1; if (jarFile != null) { Path jarFilePath = new Path(jarFile); + FileSystem fs = jarFilePath.getFileSystem(localJobConf); try { - status = systemFS.getFileStatus(jarFilePath); + status = fs.getFileStatus(jarFilePath); jarFileSize = status.getLen(); } catch (FileNotFoundException fe) { jarFileSize = -1; @@ -1009,7 +1031,7 @@ getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf); // Download job.jar - systemFS.copyToLocalFile(jarFilePath, localJarFile); + fs.copyToLocalFile(jarFilePath, localJarFile); localJobConf.setJar(localJarFile.toString()); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java Mon Dec 21 17:36:44 2009 @@ -46,6 +46,7 @@ private Configuration conf; private FileSystem fs = null; private Path sysDir = null; + private Path stagingAreaDir = null; private Path jobHistoryDir = null; static { @@ -76,6 +77,7 @@ ClientProtocol client; String tracker = conf.get("mapred.job.tracker", "local"); if ("local".equals(tracker)) { + conf.setInt("mapreduce.job.maps", 1); client = new LocalJobRunner(conf); } else { client = createRPCProxy(JobTracker.getAddress(conf), conf); @@ -222,6 +224,19 @@ } return sysDir; } + + /** + * Grab the jobtracker's view of the staging directory path where + * job-specific files will be placed. + * + * @return the staging directory where job-specific files are to be placed. + */ + public Path getStagingAreaDir() throws IOException, InterruptedException { + if (stagingAreaDir == null) { + stagingAreaDir = new Path(client.getStagingAreaDir()); + } + return stagingAreaDir; + } /** * Get the job history file path for a given job id. The job history file at Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java?rev=892893&r1=892892&r2=892893&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java Mon Dec 21 17:36:44 2009 @@ -20,8 +20,6 @@ import java.io.BufferedReader; import java.io.BufferedWriter; -import java.io.DataInput; -import java.io.DataOutput; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -30,7 +28,6 @@ import java.io.OutputStreamWriter; import java.net.URL; import java.net.URLConnection; -import java.util.Arrays; import java.net.URI; import javax.security.auth.login.LoginException; @@ -40,12 +37,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.util.ConfigUtil; @@ -956,7 +950,7 @@ ensureState(JobState.DEFINE); setUseNewAPI(); status = new JobSubmitter(cluster.getFileSystem(), - cluster.getClient()).submitJobInternal(this); + cluster.getClient()).submitJobInternal(this, cluster); state = JobState.RUNNING; } @@ -1211,106 +1205,4 @@ } return ugi; } - - /** - * Read a splits file into a list of raw splits. - * - * @param in the stream to read from - * @return the complete list of splits - * @throws IOException - */ - public static RawSplit[] readSplitFile(DataInput in) throws IOException { - byte[] header = new byte[JobSubmitter.SPLIT_FILE_HEADER.length]; - in.readFully(header); - if (!Arrays.equals(JobSubmitter.SPLIT_FILE_HEADER, header)) { - throw new IOException("Invalid header on split file"); - } - int vers = WritableUtils.readVInt(in); - if (vers != JobSubmitter.CURRENT_SPLIT_FILE_VERSION) { - throw new IOException("Unsupported split version " + vers); - } - int len = WritableUtils.readVInt(in); - RawSplit[] result = new RawSplit[len]; - for (int i=0; i < len; ++i) { - result[i] = new RawSplit(); - result[i].readFields(in); - } - return result; - } - - public static class RawSplit implements Writable { - private String splitClass; - private BytesWritable bytes = new BytesWritable(); - private String[] locations; - long dataLength; - - public RawSplit() { - } - - protected RawSplit(String splitClass, BytesWritable bytes, - String[] locations, long dataLength) { - this.splitClass = splitClass; - this.bytes = bytes; - this.locations = locations; - this.dataLength = dataLength; - } - - public void setBytes(byte[] data, int offset, int length) { - bytes.set(data, offset, length); - } - - public void setClassName(String className) { - splitClass = className; - } - - public String getClassName() { - return splitClass; - } - - public BytesWritable getBytes() { - return bytes; - } - - public void clearBytes() { - bytes = null; - } - - public void setLocations(String[] locations) { - this.locations = locations; - } - - public String[] getLocations() { - return locations; - } - - public long getDataLength() { - return dataLength; - } - - public void setDataLength(long l) { - dataLength = l; - } - - public void readFields(DataInput in) throws IOException { - splitClass = Text.readString(in); - dataLength = in.readLong(); - bytes.readFields(in); - int len = WritableUtils.readVInt(in); - locations = new String[len]; - for (int i=0; i < len; ++i) { - locations[i] = Text.readString(in); - } - } - - public void write(DataOutput out) throws IOException { - Text.writeString(out, splitClass); - out.writeLong(dataLength); - bytes.write(out); - WritableUtils.writeVInt(out, locations.length); - for (int i = 0; i < locations.length; i++) { - Text.writeString(out, locations[i]); - } - } - } - }