hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r743816 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/TaskInProgress.java src/test/org/apache/hadoop/mapred/MiniMRCluster.java src/test/org/apache/hadoop/mapred/TestLostTracker.java
Date Thu, 12 Feb 2009 17:48:43 GMT
Author: ddas
Date: Thu Feb 12 17:48:42 2009
New Revision: 743816

URL: http://svn.apache.org/viewvc?rev=743816&view=rev
Log:
HADOOP-5067. Fixes TaskInProgress.java to keep track of count of failed and killed tasks correctly.
Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=743816&r1=743815&r2=743816&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Feb 12 17:48:42 2009
@@ -841,6 +841,9 @@
     HADOOP-5166. Fix JobTracker restart to work when ACLs are configured
     for the JobTracker. (Amar Kamat via yhemanth).
 
+    HADOOP-5067. Fixes TaskInProgress.java to keep track of count of failed and
+    killed tasks correctly. (Amareshwari Sriramadasu via ddas)
+
 Release 0.19.0 - 2008-11-18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=743816&r1=743815&r2=743816&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Thu Feb 12 17:48:42
2009
@@ -94,6 +94,8 @@
   // Map from task Id -> TaskTracker Id, contains tasks that are
   // currently runnings
   private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID,
String>();
+  // All attempt Ids of this TIP
+  private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
   private JobConf conf;
   private Map<TaskAttemptID,List<String>> taskDiagnosticData =
     new TreeMap<TaskAttemptID,List<String>>();
@@ -602,9 +604,7 @@
       }
     }
 
-    // Note that there can be failures of tasks that are hosted on a machine 
-    // that has not yet registered with restarted jobtracker
-    boolean isPresent = this.activeTasks.remove(taskid) != null;
+    this.activeTasks.remove(taskid);
     
     // Since we do not fail completed reduces (whose outputs go to hdfs), we 
     // should note this failure only for completed maps, only if this taskid;
@@ -618,8 +618,10 @@
       resetSuccessfulTaskid();
     }
 
+    // Note that there can be failures of tasks that are hosted on a machine 
+    // that has not yet registered with restarted jobtracker
     // recalculate the counts only if its a genuine failure
-    if (isPresent) {
+    if (tasks.contains(taskid)) {
       if (taskState == TaskStatus.State.FAILED) {
         numTaskFailures++;
         machinesWhereFailed.add(trackerHostName);
@@ -924,6 +926,7 @@
     }
 
     activeTasks.put(taskid, taskTracker);
+    tasks.add(taskid);
 
     // Ask JobTracker to note that the task exists
     jobtracker.createTaskEntry(taskid, taskTracker, this);

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=743816&r1=743815&r2=743816&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Thu Feb 12 17:48:42
2009
@@ -235,6 +235,9 @@
     return jobTracker;
   }
   
+  TaskTrackerRunner getTaskTrackerRunner(int id) {
+    return taskTrackerList.get(id);
+  }
   /**
    * Get the number of task trackers in the cluster
    */

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java?rev=743816&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java Thu Feb 12 17:48:42
2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import junit.framework.TestCase;
+import java.io.*;
+
+public class TestLostTracker extends TestCase {
+  final Path testDir = new Path("/jt-lost-tt");
+  final Path inDir = new Path(testDir, "input");
+  final Path shareDir = new Path(testDir, "share");
+  final Path outputDir = new Path(testDir, "output");
+  
+  private JobConf configureJob(JobConf conf, int maps, int reduces,
+                               String mapSignal, String redSignal) 
+  throws IOException {
+    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, 
+        maps, reduces, "test-lost-tt", 
+        mapSignal, redSignal);
+    return conf;
+  }
+  
+  public void testLostTracker(MiniDFSCluster dfs,
+                              MiniMRCluster mr) 
+  throws IOException {
+    FileSystem fileSys = dfs.getFileSystem();
+    JobConf jobConf = mr.createJobConf();
+    int numMaps = 10;
+    int numReds = 1;
+    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
+    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
+    
+    // Configure the job
+    JobConf job = configureJob(jobConf, numMaps, numReds, 
+                               mapSignalFile, redSignalFile);
+      
+    fileSys.delete(shareDir, true);
+    
+    // Submit the job   
+    JobClient jobClient = new JobClient(job);
+    RunningJob rJob = jobClient.submitJob(job);
+    JobID id = rJob.getID();
+    
+    // wait for the job to be inited
+    mr.initializeJob(id);
+    
+    // Make sure that the master job is 50% completed
+    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() 
+           < 0.5f) {
+      UtilsForTests.waitFor(10);
+    }
+
+    // get a completed task on 1st tracker 
+    TaskAttemptID taskid = mr.getTaskTrackerRunner(0).getTaskTracker().
+                              getNonRunningTasks().get(0).getTaskID();
+
+    // Kill the 1st tasktracker
+    mr.stopTaskTracker(0);
+
+    // Signal all the maps to complete
+    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
+    
+    // Signal the reducers to complete
+    UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
+                              redSignalFile);
+    // wait till the job is done
+    UtilsForTests.waitTillDone(jobClient);
+
+    // Check if the tasks on the lost tracker got killed and re-executed
+    assertTrue(jobClient.getClusterStatus().getTaskTrackers() 
+                < mr.getNumTaskTrackers());
+    assertEquals(JobStatus.SUCCEEDED, rJob.getJobState());
+    TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
+                         getTip(taskid.getTaskID());
+    assertTrue(tip.isComplete());
+    assertEquals(tip.numKilledTasks(), 1);
+    
+  }
+  
+  public void testLostTracker() throws IOException {
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, null, null);
+      dfs.waitActive();
+      fileSys = dfs.getFileSystem();
+      
+      // clean up
+      fileSys.delete(testDir, true);
+      
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+
+      // Write the input file
+      UtilsForTests.writeFile(dfs.getNameNode(), conf, 
+                              new Path(inDir + "/file"), (short)1);
+
+      dfs.startDataNodes(conf, 1, true, null, null, null, null);
+      dfs.waitActive();
+
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
+                 + (dfs.getFileSystem()).getUri().getPort();
+
+      JobConf jtConf = new JobConf();
+      jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+      jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+      jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000);
+      jtConf.setInt("mapred.reduce.copy.backoff", 4);
+      
+      mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf);
+      
+      // Test Lost tracker case
+      testLostTracker(dfs, mr);
+    } finally {
+      if (mr != null) {
+        try {
+          mr.shutdown();
+        } catch (Exception e) {}
+      }
+      if (dfs != null) {
+        try {
+          dfs.shutdown();
+        } catch (Exception e) {}
+      }
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    new TestLostTracker().testLostTracker();
+  }
+}



Mime
View raw message