hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r777570 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/ src/webapps/job/
Date Fri, 22 May 2009 15:23:41 GMT
Author: ddas
Date: Fri May 22 15:23:40 2009
New Revision: 777570

URL: http://svn.apache.org/viewvc?rev=777570&view=rev
Log:
HADOOP-5850. Fixes a problem to do with not being able to jobs with  0 maps/reduces. Contributed
by Vinod K V.

Added:
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java
Removed:
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
    hadoop/core/trunk/src/webapps/job/taskdetails.jsp

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=777570&r1=777569&r2=777570&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri May 22 15:23:40 2009
@@ -735,6 +735,9 @@
     HADOOP-5210. Solves a problem in the progress report of the reduce task.
     (Ravi Gummadi via ddas)
 
+    HADOOP-5850. Fixes a problem to do with not being able to jobs with
+    0 maps/reduces. (Vinod K V via ddas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=777570&r1=777569&r2=777570&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri May 22 15:23:40
2009
@@ -385,7 +385,7 @@
       jobInitKillStatus.initStarted = true;
     }
 
-    LOG.debug("initializing " + this.jobId);
+    LOG.info("Initializing " + jobId);
 
     // log job info
     JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
@@ -428,38 +428,15 @@
                                    splits[i], 
                                    jobtracker, conf, this, i);
     }
-    LOG.info("Input size for job "+ jobId + " = " + inputLength);
+    LOG.info("Input size for job " + jobId + " = " + inputLength
+        + ". Number of splits = " + splits.length);
     if (numMapTasks > 0) { 
-      LOG.info("Split info for job:" + jobId + " with " + 
-               splits.length + " splits:");
       nonRunningMapCache = createCache(splits, maxLevel);
     }
         
     // set the launch time
     this.launchTime = System.currentTimeMillis();
 
-    // if no split is returned, job is considered completed and successful
-    if (numMapTasks == 0) {
-      // Finished time need to be setted here to prevent this job to be retired
-      // from the job tracker jobs at the next retire iteration.
-      this.finishTime = this.launchTime;
-      status.setSetupProgress(1.0f);
-      status.setMapProgress(1.0f);
-      status.setReduceProgress(1.0f);
-      status.setCleanupProgress(1.0f);
-      status.setRunState(JobStatus.SUCCEEDED);
-      tasksInited.set(true);
-      JobHistory.JobInfo.logInited(profile.getJobID(), 
-                                    this.launchTime, 0, 0);
-      JobHistory.JobInfo.logFinished(profile.getJobID(), 
-                                     this.finishTime, 0, 0, 0, 0,
-                                     getCounters());
-      // Special case because the Job is not queued
-      JobEndNotifier.registerNotification(this.getJobConf(), this.getStatus());
-
-      return;
-    }
-
     //
     // Create reduce tasks
     //
@@ -481,9 +458,11 @@
 
     // create cleanup two cleanup tips, one map and one reduce.
     cleanup = new TaskInProgress[2];
-    // cleanup map tip. This map is doesn't use split. 
-    // Just assign splits[0]
-    cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
+
+    // cleanup map tip. This map doesn't use any splits. Just assign an empty
+    // split.
+    JobClient.RawSplit emptySplit = new JobClient.RawSplit();
+    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
             jobtracker, conf, this, numMapTasks);
     cleanup[0].setJobCleanupTask();
 
@@ -494,9 +473,10 @@
 
     // create two setup tips, one map and one reduce.
     setup = new TaskInProgress[2];
-    // setup map tip. This map is doesn't use split. 
-    // Just assign splits[0]
-    setup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
+
+    // setup map tip. This map doesn't use any split. Just assign an empty
+    // split.
+    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
             jobtracker, conf, this, numMapTasks + 1 );
     setup[0].setJobSetupTask();
 
@@ -889,20 +869,11 @@
     if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
       double progressDelta = tip.getProgress() - oldProgress;
       if (tip.isMapTask()) {
-        if (maps.length == 0) {
-          this.status.setMapProgress(1.0f);
-        } else {
           this.status.setMapProgress((float) (this.status.mapProgress() +
                                               progressDelta / maps.length));
-        }
       } else {
-        if (reduces.length == 0) {
-          this.status.setReduceProgress(1.0f);
-        } else {
-          this.status.setReduceProgress
-            ((float) (this.status.reduceProgress() +
-                      (progressDelta / reduces.length)));
-        }
+        this.status.setReduceProgress((float) (this.status.reduceProgress() + 
+                                           (progressDelta / reduces.length)));
       }
     }
   }
@@ -1129,8 +1100,10 @@
         status.getRunState() != JobStatus.PREP) {
       return false;
     }
-    // check if cleanup task has been launched already. 
-    if (launchedCleanup) {
+    // check if cleanup task has been launched already or if setup isn't
+    // launched already. The later check is useful when number of maps is
+    // zero.
+    if (launchedCleanup || !isSetupFinished()) {
       return false;
     }
     // check if job has failed or killed
@@ -1164,7 +1137,6 @@
       if (!canLaunchSetupTask()) {
         return null;
       }
-      
       String taskTracker = tts.getTrackerName();
       // Update the last-known clusterSize
       this.clusterSize = clusterSize;
@@ -2111,6 +2083,12 @@
     if (this.status.getRunState() == JobStatus.RUNNING ) {
       this.status.setRunState(JobStatus.SUCCEEDED);
       this.status.setCleanupProgress(1.0f);
+      if (maps.length == 0) {
+        this.status.setMapProgress(1.0f);
+      }
+      if (reduces.length == 0) {
+        this.status.setReduceProgress(1.0f);
+      }
       this.finishTime = System.currentTimeMillis();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
@@ -2428,6 +2406,14 @@
     }
   }
 
+  boolean isSetupFinished() {
+    if (setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
+        || setup[1].isFailed()) {
+      return true;
+    }
+    return false;
+  }
+
   /**
    * Fail a task with a given reason, but without a status object.
    * 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=777570&r1=777569&r2=777570&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri May 22 15:23:40
2009
@@ -30,6 +30,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -731,7 +732,10 @@
    * Get the split locations 
    */
   public String[] getSplitLocations() {
-    return rawSplit.getLocations();
+    if (isMapTask() && !jobSetup && !jobCleanup) {
+      return rawSplit.getLocations();
+    }
+    return new String[0];
   }
   
   /**
@@ -915,12 +919,18 @@
                              boolean taskCleanup) {
     // create the task
     Task t = null;
-    if (isMapTask()) {
-      LOG.debug("attempt "+  numTaskFailures   +
-          " sending skippedRecords "+failedRanges.getIndicesCount());
-      t = new MapTask(jobFile, taskid, partition, 
-          rawSplit.getClassName(), rawSplit.getBytes());
-    } else {
+    if (isMapTask() && !jobSetup && !jobCleanup) {
+      LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
+          + failedRanges.getIndicesCount());
+
+      t =
+          new MapTask(jobFile, taskid, partition, rawSplit.getClassName(),
+              rawSplit.getBytes());
+
+    } else if (jobSetup || jobCleanup) {
+      t = new MapTask(jobFile, taskid, partition, null, new BytesWritable());
+    }
+    else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps);
     }
     if (jobCleanup) {
@@ -1029,7 +1039,7 @@
    * Gets the Node list of input split locations sorted in rack order.
    */ 
   public String getSplitNodes() {
-    if ( rawSplit == null) {
+    if (!isMapTask() || jobSetup || jobCleanup) {
       return "";
     }
     String[] splits = rawSplit.getLocations();

Added: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java?rev=777570&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java (added)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java Fri May 22
15:23:40 2009
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * A JUnit test to test Map-Reduce empty jobs.
+ */
+public class TestEmptyJob extends TestCase {
+  private static final Log LOG =
+      LogFactory.getLog(TestEmptyJob.class.getName());
+
+  private static String TEST_ROOT_DIR =
+      new File(System.getProperty("test.build.data", "/tmp")).toURI()
+          .toString().replace(' ', '+');
+
+  MiniMRCluster mr = null;
+
+  /**
+   * Simple method running a MapReduce job with no input data. Used to test that
+   * such a job is successful.
+   * 
+   * @param fileSys
+   * @param numMaps
+   * @param numReduces
+   * @return true if the MR job is successful, otherwise false
+   * @throws IOException
+   */
+  private boolean launchEmptyJob(URI fileSys, int numMaps, int numReduces)
+      throws IOException {
+    // create an empty input dir
+    final Path inDir = new Path(TEST_ROOT_DIR, "testing/empty/input");
+    final Path outDir = new Path(TEST_ROOT_DIR, "testing/empty/output");
+    JobConf conf = mr.createJobConf();
+    FileSystem fs = FileSystem.get(fileSys, conf);
+    fs.delete(outDir, true);
+    if (!fs.mkdirs(inDir)) {
+      LOG.warn("Can't create " + inDir);
+      return false;
+    }
+
+    // use WordCount example
+    FileSystem.setDefaultUri(conf, fileSys);
+    conf.setJobName("empty");
+    // use an InputFormat which returns no split
+    conf.setInputFormat(EmptyInputFormat.class);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(IntWritable.class);
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReduces);
+
+    // run job and wait for completion
+    JobClient jc = new JobClient(conf);
+    RunningJob runningJob = jc.submitJob(conf);
+    while (true) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+      }
+      if (runningJob.isComplete()) {
+        break;
+      }
+    }
+
+    assertTrue(runningJob.isComplete());
+    assertTrue(runningJob.isSuccessful());
+    JobID jobID = runningJob.getID();
+
+    TaskReport[] jobSetupTasks = jc.getSetupTaskReports(jobID);
+    assertTrue("Number of job-setup tips is not 2!", jobSetupTasks.length == 2);
+    assertTrue("Setup progress is " + runningJob.setupProgress()
+        + " and not 1.0", runningJob.setupProgress() == 1.0);
+    assertTrue("Setup task is not finished!", mr.getJobTrackerRunner()
+        .getJobTracker().getJob(jobID).isSetupFinished());
+
+    assertTrue("Number of maps is not zero!", jc.getMapTaskReports(runningJob
+        .getID()).length == 0);
+    assertTrue(
+        "Map progress is " + runningJob.mapProgress() + " and not 1.0!",
+        runningJob.mapProgress() == 1.0);
+
+    assertTrue("Reduce progress is " + runningJob.reduceProgress()
+        + " and not 1.0!", runningJob.reduceProgress() == 1.0);
+    assertTrue("Number of reduces is not " + numReduces, jc
+        .getReduceTaskReports(runningJob.getID()).length == numReduces);
+
+    TaskReport[] jobCleanupTasks = jc.getCleanupTaskReports(jobID);
+    assertTrue("Number of job-cleanup tips is not 2!",
+        jobCleanupTasks.length == 2);
+    assertTrue("Cleanup progress is " + runningJob.cleanupProgress()
+        + " and not 1.0", runningJob.cleanupProgress() == 1.0);
+
+    assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
+    FileStatus[] list = fs.listStatus(outDir, new OutputLogFilter());
+    assertTrue("Number of part-files is " + list.length + " and not "
+        + numReduces, list.length == numReduces);
+
+    // cleanup
+    fs.delete(outDir, true);
+
+    // return job result
+    LOG.info("job is complete: " + runningJob.isSuccessful());
+    return (runningJob.isSuccessful());
+  }
+
+  /**
+   * Test that a job with no input data (and thus with no input split and no map
+   * task to execute) is successful.
+   * 
+   * @throws IOException
+   */
+  public void testEmptyJob()
+      throws IOException {
+    FileSystem fileSys = null;
+    try {
+      final int taskTrackers = 1;
+      JobConf conf = new JobConf();
+      fileSys = FileSystem.get(conf);
+
+      conf.set("mapred.job.tracker.handler.count", "1");
+      conf.set("mapred.job.tracker", "127.0.0.1:0");
+      conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
+      conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
+
+      mr =
+          new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1,
+              null, null, conf);
+
+      assertTrue(launchEmptyJob(fileSys.getUri(), 3, 1));
+      assertTrue(launchEmptyJob(fileSys.getUri(), 0, 0));
+    } finally {
+      if (fileSys != null) {
+        fileSys.close();
+      }
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+}

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=777570&r1=777569&r2=777570&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java Fri
May 22 15:23:40 2009
@@ -37,7 +37,7 @@
   //end of the job (indirectly testing whether all tasktrackers
   //got a KillJobAction).
   private static final Log LOG =
-        LogFactory.getLog(TestEmptyJobWithDFS.class.getName());
+        LogFactory.getLog(TestEmptyJob.class.getName());
   private void runSleepJob(JobConf conf) throws Exception {
     String[] args = { "-m", "1", "-r", "10", "-mt", "1000", "-rt", "10000" };
     ToolRunner.run(conf, new SleepJob(), args);

Modified: hadoop/core/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/taskdetails.jsp?rev=777570&r1=777569&r2=777570&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/taskdetails.jsp Fri May 22 15:23:40 2009
@@ -278,7 +278,7 @@
 </center>
 
 <%
-      if (ts[0].getIsMap()) {
+      if (ts[0].getIsMap() && !isCleanupOrSetup) {
 %>
 <h3>Input Split Locations</h3>
 <table border=2 cellpadding="5" cellspacing="2">



Mime
View raw message