hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r761632 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Fri, 03 Apr 2009 11:58:44 GMT
Author: ddas
Date: Fri Apr  3 11:58:44 2009
New Revision: 761632

URL: http://svn.apache.org/viewvc?rev=761632&view=rev
Log:
HADOOP-5337. JobTracker, upon restart, now waits for the TaskTrackers to join back before
scheduling new tasks. This fixes race conditions associated with greedy scheduling as was
the case earlier. Contributed by Amar Kamat.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=761632&r1=761631&r2=761632&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Apr  3 11:58:44 2009
@@ -1181,6 +1181,10 @@
 
     HADOOP-5605. All the replicas incorrectly got marked as corrupt. (hairong)
 
+    HADOOP-5337. JobTracker, upon restart, now waits for the TaskTrackers to
+    join back before scheduling new tasks. This fixes race conditions associated
+    with greedy scheduling as was the case earlier. (Amar Kamat via ddas) 
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=761632&r1=761631&r2=761632&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Apr  3 11:58:44
2009
@@ -672,6 +672,9 @@
     Set<JobID> jobsToRecover; // set of jobs to be recovered
     
     private int totalEventsRecovered = 0;
+
+    Set<String> recoveredTrackers = 
+      Collections.synchronizedSet(new HashSet<String>());
     
     /** A custom listener that replays the events in the order in which the 
      * events (task attempts) occurred. 
@@ -848,6 +851,18 @@
       return jobsToRecover.size() != 0;
     }
 
+    public boolean shouldSchedule() {
+      return recoveredTrackers.isEmpty();
+    }
+
+    private void markTracker(String trackerName) {
+      recoveredTrackers.add(trackerName);
+    }
+
+    void unMarkTracker(String trackerName) {
+      recoveredTrackers.remove(trackerName);
+    }
+
     Set<JobID> getJobsToRecover() {
       return jobsToRecover;
     }
@@ -984,6 +999,7 @@
       // IV. Register a new tracker
       boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
       if (!isTrackerRegistered) {
+        markTracker(trackerName); // add the tracker to recovery-manager
         addNewTracker(ttStatus);
       }
       
@@ -2318,6 +2334,8 @@
         // started JobTracker
         if (hasRestarted()) {
           addRestartInfo = true;
+          // inform the recovery manager about this tracker joining back
+          recoveryManager.unMarkTracker(trackerName);
         } else {
           // Jobtracker might have restarted but no recovery is needed
           // otherwise this code should not be reached
@@ -2359,7 +2377,7 @@
     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
       
     // Check for new tasks to be executed on the tasktracker
-    if (acceptNewTasks && !isBlacklisted) {
+    if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted)
{
       TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
@@ -3303,6 +3321,9 @@
       trackerToJobsToCleanup.remove(trackerName);
     }
     
+    // Inform the recovery manager
+    recoveryManager.unMarkTracker(trackerName);
+    
     Set<TaskAttemptID> lostTasks = trackerToTaskMap.get(trackerName);
     trackerToTaskMap.remove(trackerName);
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=761632&r1=761631&r2=761632&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Apr  3 11:58:44
2009
@@ -438,7 +438,7 @@
     this.jobTrackerPort = jobTrackerPort;
     this.taskTrackerPort = taskTrackerPort;
     this.jobTrackerInfoPort = 0;
-    this.numTaskTrackers = numTaskTrackers;
+    this.numTaskTrackers = 0;
     this.namenode = namenode;
     this.ugi = ugi;
     this.conf = conf; // this is the conf the mr starts with
@@ -448,27 +448,18 @@
 
     // Create the TaskTrackers
     for (int idx = 0; idx < numTaskTrackers; idx++) {
+      String rack = null;
+      String host = null;
       if (racks != null) {
-        StaticMapping.addNodeToRack(hosts[idx],racks[idx]);
+        rack = racks[idx];
       }
       if (hosts != null) {
-        NetUtils.addStaticResolution(hosts[idx], "localhost");
+        host = hosts[idx];
       }
-      TaskTrackerRunner taskTracker;
-      taskTracker = new TaskTrackerRunner(idx, numDir, 
-          hosts == null ? null : hosts[idx], conf);
       
-      Thread taskTrackerThread = new Thread(taskTracker);
-      taskTrackerList.add(taskTracker);
-      taskTrackerThreadList.add(taskTrackerThread);
+      startTaskTracker(host, rack, idx, numDir);
     }
 
-    // Start the MiniMRCluster
-        
-    for (Thread taskTrackerThread : taskTrackerThreadList){
-      taskTrackerThread.start();
-    }
-    
     this.job = createJobConf(conf);
     waitUntilIdle();
   }
@@ -598,20 +589,44 @@
    * Kill the tasktracker.
    */
   public void stopTaskTracker(int id) {
-    taskTrackerList.get(id).shutdown();
+    TaskTrackerRunner tracker = taskTrackerList.remove(id);
+    tracker.shutdown();
 
-    taskTrackerThreadList.get(id).interrupt();
+    Thread thread = taskTrackerThreadList.remove(id);
+    thread.interrupt();
     
     try {
-      taskTrackerThreadList.get(id).join();
+      thread.join();
       // This will break the wait until idle loop
-      taskTrackerList.get(id).isDead = true;
+      tracker.isDead = true;
+      --numTaskTrackers;
     } catch (InterruptedException ex) {
       LOG.error("Problem waiting for task tracker to finish", ex);
     }
   }
   
   /**
+   * Start the tasktracker.
+   */
+  public void startTaskTracker(String host, String rack, int idx, int numDir) 
+  throws IOException {
+    if (rack != null) {
+      StaticMapping.addNodeToRack(host, rack);
+    }
+    if (host != null) {
+      NetUtils.addStaticResolution(host, "localhost");
+    }
+    TaskTrackerRunner taskTracker;
+    taskTracker = new TaskTrackerRunner(idx, numDir, host, conf);
+    
+    Thread taskTrackerThread = new Thread(taskTracker);
+    taskTrackerList.add(taskTracker);
+    taskTrackerThreadList.add(taskTrackerThread);
+    taskTrackerThread.start();
+    ++numTaskTrackers;
+  }
+  
+  /**
    * Get the tasktrackerID in MiniMRCluster with given trackerName.
    */
   int getTaskTrackerID(String trackerName) {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java?rev=761632&r1=761631&r2=761632&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
Fri Apr  3 11:58:44 2009
@@ -102,10 +102,9 @@
     UtilsForTests.waitTillDone(jobClient);
 
     // Check if the tracker got lost
-    assertTrue("Tracker killed while the jobtracker was down did not get lost "
-                + "upon restart", 
-                jobClient.getClusterStatus().getTaskTrackers() 
-                < mr.getNumTaskTrackers());
+    assertEquals("Tracker killed while the jobtracker was down did not get lost "
+                 + "upon restart", 
+                 jobClient.getClusterStatus().getTaskTrackers(), 1);
     
     //  Check if the tasks on the lost tracker got killed
     int failedMaps = 

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java?rev=761632&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java Fri Apr
 3 11:58:44 2009
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.HashSet;
+import java.util.Set;
+
+/** 
+ * This test checks jobtracker in safe mode. In safe mode the jobtracker upon 
+ * restart doesnt schedule any new tasks and waits for the (old) trackers to 
+ * join back.
+ */
+public class TestJobTrackerSafeMode extends TestCase {
+  final Path testDir = 
+    new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
+  final Path inDir = new Path(testDir, "input");
+  final Path shareDir = new Path(testDir, "share");
+  final Path outputDir = new Path(testDir, "output");
+  final int numDir = 1;
+  final int numTrackers = 2;
+  
+  private static final Log LOG = 
+    LogFactory.getLog(TestJobTrackerSafeMode.class);
+  
+  private JobConf configureJob(JobConf conf, int maps, int reduces,
+                               String mapSignal, String redSignal) 
+  throws IOException {
+    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, 
+        maps, reduces, "test-jobtracker-safemode", 
+        mapSignal, redSignal);
+    return conf;
+  }
+  
+  /**
+   * Tests the jobtracker's safemode. The test is as follows : 
+   *   - starts a cluster with 2 trackers
+   *   - submits a job with large (40) maps to make sure that all the trackers 
+   *     are logged to the job history
+   *   - wait for the job to be 50% done
+   *   - stop the jobtracker
+   *   - wait for the trackers to be done with all the tasks
+   *   - kill a task tracker
+   *   - start the jobtracker
+   *   - start 2 more trackers
+   *   - now check that while all the tracker are detected (or lost) the 
+   *     scheduling window is closed
+   *   - check that after all the trackers are recovered, scheduling is opened 
+   */
+  private void testSafeMode(MiniDFSCluster dfs, MiniMRCluster mr) 
+  throws IOException {
+    FileSystem fileSys = dfs.getFileSystem();
+    JobConf jobConf = mr.createJobConf();
+    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
+    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    int numTracker = jobtracker.getClusterStatus(false).getTaskTrackers();
+    
+    // Configure the jobs
+    JobConf job = configureJob(jobConf, 40, 0, mapSignalFile, redSignalFile);
+      
+    fileSys.delete(shareDir, true);
+    
+    // Submit a master job   
+    JobClient jobClient = new JobClient(job);
+    RunningJob rJob = jobClient.submitJob(job);
+    JobID id = rJob.getID();
+    
+    // wait for the job to be inited
+    mr.initializeJob(id);
+    
+    // Make sure that the master job is 50% completed
+    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() 
+           < 0.5f) {
+      LOG.info("Waiting for the job to be 50% done");
+      UtilsForTests.waitFor(100);
+    }
+
+    // Kill the jobtracker
+    mr.stopJobTracker();
+
+    // Enable recovery on restart
+    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
+                                      true);
+    
+    // Signal the maps to complete
+    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
+    
+    // Signal the reducers to complete
+    UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
+                              redSignalFile);
+    
+    // wait for the tasks to complete at the tracker
+    Set<String> trackers = new HashSet<String>();
+    for (int i = 0 ; i < numTracker; ++i) {
+      TaskTracker t = mr.getTaskTrackerRunner(i).getTaskTracker();
+      trackers.add(t.getName());
+      int runningCount = t.getRunningTaskStatuses().size();
+      while (runningCount != 0) {
+        LOG.info("Waiting for tracker " + t.getName() + " to stabilize");
+        UtilsForTests.waitFor(100);
+        runningCount = 0;
+        for (TaskStatus status : t.getRunningTaskStatuses()) {
+          if (status.getIsMap() 
+              && (status.getRunState() == TaskStatus.State.UNASSIGNED 
+                  || status.getRunState() == TaskStatus.State.RUNNING)) {
+            ++runningCount;
+          }
+        }
+      }
+    }
+
+    LOG.info("Trackers have stabilized");
+    
+    // Kill a tasktracker
+    int trackerToKill = --numTracker;
+    TaskTracker t = mr.getTaskTrackerRunner(trackerToKill).getTaskTracker();
+    
+    trackers.remove(t.getName()); // remove this from the set to check
+    
+    Set<String> lostTrackers = new HashSet<String>();
+    lostTrackers.add(t.getName());
+    
+    // get the attempt-id's to ignore
+    // stop the tracker
+    LOG.info("Stopping tracker : " + t.getName());
+    mr.getTaskTrackerRunner(trackerToKill).getTaskTracker().shutdown();
+    mr.stopTaskTracker(trackerToKill);
+
+    // Restart the jobtracker
+    mr.startJobTracker();
+
+    // Wait for the JT to be ready
+    UtilsForTests.waitForJobTracker(jobClient);
+
+    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    // Start a tracker
+    LOG.info("Start a new tracker");
+    mr.startTaskTracker(null, null, ++numTracker, numDir);
+    
+    // Start a tracker
+    LOG.info("Start a new tracker");
+    mr.startTaskTracker(null, null, ++numTracker, numDir);
+
+    // Check if the jobs are still running
+    
+    // Wait for the tracker to be lost
+    boolean shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
+    while (!checkTrackers(jobtracker, trackers, lostTrackers)) {
+      assertFalse("JobTracker has opened up scheduling before all the" 
+                  + " trackers were recovered", shouldSchedule);
+      UtilsForTests.waitFor(100);
+      
+      // snapshot jobtracker's scheduling status
+      shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
+    }
+
+    assertTrue("JobTracker hasnt opened up scheduling even all the" 
+               + " trackers were recovered", 
+               jobtracker.recoveryManager.shouldSchedule());
+    
+    assertEquals("Recovery manager is in inconsistent state", 
+                 0, jobtracker.recoveryManager.recoveredTrackers.size());
+    
+    // wait for the job to be complete
+    UtilsForTests.waitTillDone(jobClient);
+  }
+
+  private boolean checkTrackers(JobTracker jobtracker, Set<String> present, 
+                                Set<String> absent) {
+    long jobtrackerRecoveryFinishTime = 
+      jobtracker.getStartTime() + jobtracker.getRecoveryDuration();
+    for (String trackerName : present) {
+      TaskTrackerStatus status = jobtracker.getTaskTracker(trackerName);
+      // check if the status is present and also the tracker has contacted back
+      // after restart
+      if (status == null 
+          || status.getLastSeen() < jobtrackerRecoveryFinishTime) {
+        return false;
+      }
+    }
+    for (String trackerName : absent) {
+      TaskTrackerStatus status = jobtracker.getTaskTracker(trackerName);
+      // check if the status is still present
+      if ( status != null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Test {@link JobTracker}'s safe mode.
+   */
+  public void testJobTrackerSafeMode() throws IOException {
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, null, null);
+      dfs.waitActive();
+      fileSys = dfs.getFileSystem();
+      
+      // clean up
+      fileSys.delete(testDir, true);
+      
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+
+      // Write the input file
+      UtilsForTests.writeFile(dfs.getNameNode(), conf, 
+                              new Path(inDir + "/file"), (short)1);
+
+      dfs.startDataNodes(conf, 1, true, null, null, null, null);
+      dfs.waitActive();
+
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
+                 + (dfs.getFileSystem()).getUri().getPort();
+
+      // Make sure that jobhistory leads to a proper job restart
+      // So keep the blocksize and the buffer size small
+      JobConf jtConf = new JobConf();
+      jtConf.set("mapred.jobtracker.job.history.block.size", "512");
+      jtConf.set("mapred.jobtracker.job.history.buffer.size", "512");
+      jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+      jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+      jtConf.setLong("mapred.tasktracker.expiry.interval", 5000);
+      jtConf.setInt("mapred.reduce.copy.backoff", 4);
+      jtConf.setLong("mapred.job.reuse.jvm.num.tasks", -1);
+      
+      mr = new MiniMRCluster(numTrackers, namenode, numDir, null, null, jtConf);
+      
+      // Test Lost tracker case
+      testSafeMode(dfs, mr);
+    } finally {
+      if (mr != null) {
+        try {
+          mr.shutdown();
+        } catch (Exception e) {}
+      }
+      if (dfs != null) {
+        try {
+          dfs.shutdown();
+        } catch (Exception e) {}
+      }
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    new TestJobTrackerSafeMode().testJobTrackerSafeMode();
+  }
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java?rev=761632&r1=761631&r2=761632&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java Fri Apr  3 11:58:44
2009
@@ -86,8 +86,7 @@
     UtilsForTests.waitTillDone(jobClient);
 
     // Check if the tasks on the lost tracker got killed and re-executed
-    assertTrue(jobClient.getClusterStatus().getTaskTrackers() 
-                < mr.getNumTaskTrackers());
+    assertEquals(jobClient.getClusterStatus().getTaskTrackers(), 1);
     assertEquals(JobStatus.SUCCEEDED, rJob.getJobState());
     TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
                          getTip(taskid.getTaskID());



Mime
View raw message