hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r750852 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Fri, 06 Mar 2009 11:25:11 GMT
Author: yhemanth
Date: Fri Mar  6 11:25:05 2009
New Revision: 750852

URL: http://svn.apache.org/viewvc?rev=750852&view=rev
Log:
HADOOP-5376. Fixes the code handling lost tasktrackers to set the task state to KILLED_UNCLEAN
only for relevant type of tasks. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=750852&r1=750851&r2=750852&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Mar  6 11:25:05 2009
@@ -966,6 +966,10 @@
     HADOOP-5384. Fix a problem that DataNodeCluster creates blocks with
     generationStamp == 1.  (szetszwo)
 
+    HADOOP-5376. Fixes the code handling lost tasktrackers to set the task state
+    to KILLED_UNCLEAN only for relevant type of tasks.
+    (Amareshwari Sriramadasu via yhemanth)
+ 
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=750852&r1=750851&r2=750852&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  6 11:25:05
2009
@@ -3264,13 +3264,16 @@
           // if the job is done, we don't want to change anything
           if (job.getStatus().getRunState() == JobStatus.RUNNING ||
               job.getStatus().getRunState() == JobStatus.PREP) {
+            // the state will be KILLED_UNCLEAN, if the task(map or reduce) 
+            // was RUNNING on the tracker
+            TaskStatus.State killState = (tip.isRunningTask(taskId) && 
+              !tip.isJobSetupTask() && !tip.isJobCleanupTask()) ? 
+              TaskStatus.State.KILLED_UNCLEAN : TaskStatus.State.KILLED;
             job.failedTask(tip, taskId, ("Lost task tracker: " + trackerName), 
                            (tip.isMapTask() ? 
                                TaskStatus.Phase.MAP : 
                                TaskStatus.Phase.REDUCE), 
-                            tip.isRunningTask(taskId) ? 
-                              TaskStatus.State.KILLED_UNCLEAN : 
-                              TaskStatus.State.KILLED,
+                            killState,
                             trackerName, myInstrumentation);
             jobsWithFailures.add(job);
           }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=750852&r1=750851&r2=750852&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Mar  6 11:25:05
2009
@@ -591,7 +591,7 @@
       // Check if the user manually KILLED/FAILED this task-attempt...
       Boolean shouldFail = tasksToKill.remove(taskid);
       if (shouldFail != null) {
-        if (isCleanupAttempt(taskid)) {
+        if (isCleanupAttempt(taskid) || jobSetup || jobCleanup) {
           taskState = (shouldFail) ? TaskStatus.State.FAILED :
                                      TaskStatus.State.KILLED;
         } else {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=750852&r1=750851&r2=750852&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar  6 11:25:05
2009
@@ -590,6 +590,19 @@
   }
   
   /**
+   * Get the tasktrackerID in MiniMRCluster with given trackerName.
+   */
+  int getTaskTrackerID(String trackerName) {
+    for (int id=0; id < numTaskTrackers; id++) {
+      if (taskTrackerList.get(id).getTaskTracker().getName().equals(
+          trackerName)) {
+        return id;
+      }
+    }
+    return -1;
+  }
+  
+  /**
    * Shut down the servers.
    */
   public void shutdown() {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=750852&r1=750851&r2=750852&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Fri
Mar  6 11:25:05 2009
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 
 import junit.framework.TestCase;
@@ -29,8 +28,18 @@
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 
+/**
+ * Tests various failures in setup/cleanup of job, like 
+ * throwing exception, command line kill and lost tracker 
+ */
 public class TestSetupAndCleanupFailure extends TestCase {
 
+  final Path inDir = new Path("./input");
+  final Path outDir = new Path("./output");
+  static Path setupSignalFile = new Path("/setup-signal");
+  static Path cleanupSignalFile = new Path("/cleanup-signal");
+  
+  // Commiter with setupJob throwing exception
   static class CommitterWithFailSetup extends FileOutputCommitter {
     @Override
     public void setupJob(JobContext context) throws IOException {
@@ -38,79 +47,212 @@
     }
   }
 
+  // Commiter with cleanupJob throwing exception
   static class CommitterWithFailCleanup extends FileOutputCommitter {
     @Override
     public void cleanupJob(JobContext context) throws IOException {
       throw new IOException();
     }
   }
+
+  // Committer waits for a file to be created on dfs.
+  static class CommitterWithLongSetupAndCleanup extends FileOutputCommitter {
+    
+    private void waitForSignalFile(FileSystem fs, Path signalFile) 
+    throws IOException {
+      while (!fs.exists(signalFile)) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {
+         break;
+        }
+      }
+    }
+    
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+      waitForSignalFile(FileSystem.get(context.getJobConf()), setupSignalFile);
+      super.setupJob(context);
+    }
+    
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+      waitForSignalFile(FileSystem.get(context.getJobConf()), cleanupSignalFile);
+      super.cleanupJob(context);
+    }
+  }
+
+  // Among these tips only one of the tasks will be running,
+  // get the taskid for that task 
+  private TaskAttemptID getRunningTaskID(TaskInProgress[] tips) {
+    TaskAttemptID taskid = null;
+    while (taskid == null) {
+      for (TaskInProgress tip :tips) {
+        TaskStatus[] statuses = tip.getTaskStatuses();
+        for (TaskStatus status : statuses) {
+          if (status.getRunState() == TaskStatus.State.RUNNING) {
+            taskid = status.getTaskID();
+            break;
+          }
+        }
+        if (taskid != null) break;
+      }
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ie) {}
+    }
+    return taskid;
+  }
   
-  public RunningJob launchJob(JobConf conf,
-                              Path inDir,
-                              Path outDir,
-                              String input) 
+  // Tests the failures in setup/cleanup job. Job should cleanly fail.
+  private void testFailCommitter(Class<? extends OutputCommitter> theClass,
+                                 JobConf jobConf) 
   throws IOException {
-    // set up the input file system and write input text.
-    FileSystem inFs = inDir.getFileSystem(conf);
-    FileSystem outFs = outDir.getFileSystem(conf);
-    outFs.delete(outDir, true);
-    if (!inFs.mkdirs(inDir)) {
-      throw new IOException("Mkdirs failed to create " + inDir.toString());
-    }
-    {
-      // write input into input file
-      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
-      file.writeBytes(input);
-      file.close();
-    }
-
-    // configure the mapred Job
-    conf.setMapperClass(IdentityMapper.class);        
-    conf.setReducerClass(IdentityReducer.class);
-    FileInputFormat.setInputPaths(conf, inDir);
-    FileOutputFormat.setOutputPath(conf, outDir);
-    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
-                                    "/tmp")).toString().replace(' ', '+');
-    conf.set("test.build.data", TEST_ROOT_DIR);
+    jobConf.setOutputCommitter(theClass);
+    RunningJob job = UtilsForTests.runJob(jobConf, inDir, outDir);
+    // wait for the job to finish.
+    job.waitForCompletion();
+    assertEquals(JobStatus.FAILED, job.getJobState());
+  }
+  
+  // launch job with CommitterWithLongSetupAndCleanup as committer
+  // and wait till the job is inited.
+  private RunningJob launchJobWithWaitingSetupAndCleanup(MiniMRCluster mr) 
+  throws IOException {
+    // launch job with waiting setup/cleanup
+    JobConf jobConf = mr.createJobConf();
+    jobConf.setOutputCommitter(CommitterWithLongSetupAndCleanup.class);
+    RunningJob job = UtilsForTests.runJob(jobConf, inDir, outDir);
+    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = jt.getJob(job.getID());
+    while (!jip.inited()) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ie) {}
+    }
+    return job;
+  }
+  
+  /**
+   * Tests setup and cleanup attempts getting killed from command-line 
+   * and lost tracker
+   * 
+   * @param mr
+   * @param dfs
+   * @param commandLineKill if true, test with command-line kill
+   *                        else, test with lost tracker
+   * @throws IOException
+   */
+  private void testSetupAndCleanupKill(MiniMRCluster mr, 
+                                       MiniDFSCluster dfs, 
+                                       boolean commandLineKill) 
+  throws IOException {
+    // launch job with waiting setup/cleanup
+    RunningJob job = launchJobWithWaitingSetupAndCleanup(mr);
+    
+    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = jt.getJob(job.getID());
+    // get the running setup task id
+    TaskAttemptID setupID = getRunningTaskID(jip.getSetupTasks());
+    if (commandLineKill) {
+      killTaskFromCommandLine(job, setupID, jt);
+    } else {
+      killTaskWithLostTracker(mr, setupID);
+    }
+    // signal the setup to complete
+    UtilsForTests.writeFile(dfs.getNameNode(), 
+                            dfs.getFileSystem().getConf(), 
+                            setupSignalFile, (short)3);
+    // wait for maps and reduces to complete
+    while (job.reduceProgress() != 1.0f) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ie) {}
+    }
+    // get the running cleanup task id
+    TaskAttemptID cleanupID = getRunningTaskID(jip.getCleanupTasks());
+    if (commandLineKill) {
+      killTaskFromCommandLine(job, cleanupID, jt);
+    } else {
+      killTaskWithLostTracker(mr, cleanupID);
+    }
+    // signal the cleanup to complete
+    UtilsForTests.writeFile(dfs.getNameNode(), 
+                            dfs.getFileSystem().getConf(), 
+                            cleanupSignalFile, (short)3);
+    // wait for the job to finish.
+    job.waitForCompletion();
+    assertEquals(JobStatus.SUCCEEDED, job.getJobState());
+    assertEquals(TaskStatus.State.KILLED, 
+                 jt.getTaskStatus(setupID).getRunState());
+    assertEquals(TaskStatus.State.KILLED, 
+                 jt.getTaskStatus(cleanupID).getRunState());
+  }
+  
+  // kill the task from command-line 
+  // wait till it kill is reported back
+  private void killTaskFromCommandLine(RunningJob job, 
+                                       TaskAttemptID taskid,
+                                       JobTracker jt) 
+  throws IOException {
+    job.killTask(taskid, false);
+    // wait till the kill happens
+    while (jt.getTaskStatus(taskid).getRunState() != 
+           TaskStatus.State.KILLED) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ie) {}
+    }
 
-    // return the RunningJob handle.
-    return new JobClient(conf).submitJob(conf);
+  }
+  // kill the task by losing the tracker
+  private void killTaskWithLostTracker(MiniMRCluster mr, 
+                                       TaskAttemptID taskid) {
+    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+    String trackerName = jt.getTaskStatus(taskid).getTaskTracker();
+    int trackerID = mr.getTaskTrackerID(trackerName);
+    assertTrue(trackerID != -1);
+    mr.stopTaskTracker(trackerID);
   }
   
+  // Tests the failures in setup/cleanup job. Job should cleanly fail.
+  // Also Tests the command-line kill for setup/cleanup attempts. 
+  // tests the setup/cleanup attempts getting killed if 
+  // they were running on a lost tracker
   public void testWithDFS() throws IOException {
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
     FileSystem fileSys = null;
     try {
-      final int taskTrackers = 2;
-
+      final int taskTrackers = 4;
       Configuration conf = new Configuration();
-      dfs = new MiniDFSCluster(conf, 2, true, null);
+      dfs = new MiniDFSCluster(conf, 4, true, null);
       fileSys = dfs.getFileSystem();
-      mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
-      JobConf jobConf = mr.createJobConf();
-      final Path inDir = new Path("./input");
-      final Path outDir = new Path("./output");
-      String input = "The quick brown fox\nhas many silly\nred fox sox\n";
-      RunningJob job = null;
-
-      jobConf.setOutputCommitter(CommitterWithFailSetup.class);
-      job = launchJob(jobConf, inDir, outDir, input);
-      // wait for the job to finish.
-      job.waitForCompletion();
-      assertEquals(JobStatus.FAILED, job.getJobState());
-
-      jobConf.setOutputCommitter(CommitterWithFailCleanup.class);
-      job = launchJob(jobConf, inDir, outDir, input);
-      // wait for the job to finish.
-      job.waitForCompletion();
-      assertEquals(JobStatus.FAILED, job.getJobState());
+      JobConf jtConf = new JobConf();
+      jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+      jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+      jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000);
+      jtConf.setInt("mapred.reduce.copy.backoff", 4);
+      mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1,
+                             null, null, jtConf);
+      // test setup/cleanup throwing exceptions
+      testFailCommitter(CommitterWithFailSetup.class, mr.createJobConf());
+      testFailCommitter(CommitterWithFailCleanup.class, mr.createJobConf());
+      // test the command-line kill for setup/cleanup attempts. 
+      testSetupAndCleanupKill(mr, dfs, true);
+      // remove setup/cleanup signal files.
+      fileSys.delete(setupSignalFile , true);
+      fileSys.delete(cleanupSignalFile , true);
+      // test the setup/cleanup attempts getting killed if 
+      // they were running on a lost tracker
+      testSetupAndCleanupKill(mr, dfs, false);
     } finally {
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();
       }
     }
   }
+
   public static void main(String[] argv) throws Exception {
     TestSetupAndCleanupFailure td = new TestSetupAndCleanupFailure();
     td.testWithDFS();



Mime
View raw message