hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r794591 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobTracker.java src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java
Date Thu, 16 Jul 2009 08:30:40 GMT
Author: ddas
Date: Thu Jul 16 08:30:39 2009
New Revision: 794591

URL: http://svn.apache.org/viewvc?rev=794591&view=rev
Log:
MAPREDUCE-626. Improves the execution time of TestLostTracker. Contributed by Jothi Padmanabhan.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=794591&r1=794590&r2=794591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul 16 08:30:39 2009
@@ -108,6 +108,9 @@
     MAPREDUCE-630. Improves execution time of TestKillCompletedJob.
     (Jothi Padmanabhan via ddas)
 
+    MAPREDUCE-626. Improves the execution time of TestLostTracker.
+    (Jothi Padmanabhan via ddas)
+
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=794591&r1=794590&r2=794591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Jul 16 08:30:39
2009
@@ -344,60 +344,7 @@
           // for a task tracker.
           //
           Thread.sleep(tasktrackerExpiryInterval / 3);
-
-          //
-          // Loop through all expired items in the queue
-          //
-          // Need to lock the JobTracker here since we are
-          // manipulating it's data-structures via
-          // ExpireTrackers.run -> JobTracker.lostTaskTracker ->
-          // JobInProgress.failedTask -> JobTracker.markCompleteTaskAttempt
-          // Also need to lock JobTracker before locking 'taskTracker' &
-          // 'trackerExpiryQueue' to prevent deadlock:
-          // @see {@link JobTracker.processHeartbeat(TaskTrackerStatus, boolean)} 
-          synchronized (JobTracker.this) {
-            synchronized (taskTrackers) {
-              synchronized (trackerExpiryQueue) {
-                long now = clock.getTime();
-                TaskTrackerStatus leastRecent = null;
-                while ((trackerExpiryQueue.size() > 0) &&
-                       (leastRecent = trackerExpiryQueue.first()) != null &&
-                       ((now - leastRecent.getLastSeen()) > tasktrackerExpiryInterval))
{
-                        
-                  // Remove profile from head of queue
-                  trackerExpiryQueue.remove(leastRecent);
-                  String trackerName = leastRecent.getTrackerName();
-                        
-                  // Figure out if last-seen time should be updated, or if tracker is dead
-                  TaskTracker current = getTaskTracker(trackerName);
-                  TaskTrackerStatus newProfile = 
-                    (current == null ) ? null : current.getStatus();
-                  // Items might leave the taskTracker set through other means; the
-                  // status stored in 'taskTrackers' might be null, which means the
-                  // tracker has already been destroyed.
-                  if (newProfile != null) {
-                    if ((now - newProfile.getLastSeen()) > tasktrackerExpiryInterval)
{
-                      // Remove completely after marking the tasks as 'KILLED'
-                      lostTaskTracker(current);
-                      // tracker is lost, and if it is blacklisted, remove 
-                      // it from the count of blacklisted trackers in the cluster
-                      if (isBlacklisted(trackerName)) {
-                        faultyTrackers.numBlacklistedTrackers -= 1;
-                      }
-                      updateTaskTrackerStatus(trackerName, null);
-                      statistics.taskTrackerRemoved(trackerName);
-                      // remove the mapping from the hosts list
-                      String hostname = newProfile.getHost();
-                      hostnameToTaskTracker.get(hostname).remove(trackerName);
-                    } else {
-                      // Update time by inserting latest profile
-                      trackerExpiryQueue.add(newProfile);
-                    }
-                  }
-                }
-              }
-            }
-          }
+          checkExpiredTrackers();
         } catch (InterruptedException iex) {
           break;
         } catch (Exception t) {
@@ -406,7 +353,65 @@
         }
       }
     }
-        
+  }
+  
+  void checkExpiredTrackers() {
+    //
+    // Loop through all expired items in the queue
+    //
+    // Need to lock the JobTracker here since we are
+    // manipulating it's data-structures via
+    // ExpireTrackers.run -> JobTracker.lostTaskTracker ->
+    // JobInProgress.failedTask -> JobTracker.markCompleteTaskAttempt
+    // Also need to lock JobTracker before locking 'taskTracker' &
+    // 'trackerExpiryQueue' to prevent deadlock:
+    // @see {@link JobTracker.processHeartbeat(TaskTrackerStatus, boolean)} 
+    synchronized (JobTracker.this) {
+      synchronized (taskTrackers) {
+        synchronized (trackerExpiryQueue) {
+          long now = clock.getTime();
+          TaskTrackerStatus leastRecent = null;
+          while ((trackerExpiryQueue.size() > 0) &&
+              (leastRecent = trackerExpiryQueue.first()) != null &&
+              ((now - leastRecent.getLastSeen()) > 
+                  tasktrackerExpiryInterval)) {
+
+            // Remove profile from head of queue
+            trackerExpiryQueue.remove(leastRecent);
+            String trackerName = leastRecent.getTrackerName();
+
+            // Figure out if last-seen time should be updated, or if 
+            // tracker is dead
+            TaskTracker current = getTaskTracker(trackerName);
+            TaskTrackerStatus newProfile = 
+              (current == null ) ? null : current.getStatus();
+            // Items might leave the taskTracker set through other means; the
+            // status stored in 'taskTrackers' might be null, which means the
+            // tracker has already been destroyed.
+            if (newProfile != null) {
+              if ((now - newProfile.getLastSeen()) >
+                  tasktrackerExpiryInterval) {
+                // Remove completely after marking the tasks as 'KILLED'
+                lostTaskTracker(current);
+                // tracker is lost, and if it is blacklisted, remove 
+                // it from the count of blacklisted trackers in the cluster
+                if (isBlacklisted(trackerName)) {
+                  faultyTrackers.numBlacklistedTrackers -= 1;
+                }
+                updateTaskTrackerStatus(trackerName, null);
+                statistics.taskTrackerRemoved(trackerName);
+                // remove the mapping from the hosts list
+                String hostname = newProfile.getHost();
+                hostnameToTaskTracker.get(hostname).remove(trackerName);
+              } else {
+                // Update time by inserting latest profile
+                trackerExpiryQueue.add(newProfile);
+              }
+            }
+          }
+        }
+      }
+    }
   }
 
   ///////////////////////////////////////////////////////
@@ -2035,6 +2040,11 @@
     return NetUtils.createSocketAddr(jobTrackerStr);
   }
 
+  void  startExpireTrackersThread() {
+    this.expireTrackersThread = new Thread(this.expireTrackers, "expireTrackers");
+    this.expireTrackersThread.start();
+  }
+
   /**
    * Run forever
    */
@@ -2065,9 +2075,8 @@
     // disallowed trackers
     refreshHosts();
     
-    this.expireTrackersThread = new Thread(this.expireTrackers,
-                                          "expireTrackers");
-    this.expireTrackersThread.start();
+    startExpireTrackersThread();
+
     this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
     this.retireJobsThread.start();
     expireLaunchingTaskThread.start();
@@ -2103,15 +2112,9 @@
       LOG.info("Stopping interTrackerServer");
       this.interTrackerServer.stop();
     }
-    if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive())
{
-      LOG.info("Stopping expireTrackers");
-      this.expireTrackersThread.interrupt();
-      try {
-        this.expireTrackersThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
+
+    stopExpireTrackersThread();
+
     if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {
       LOG.info("Stopping retirer");
       this.retireJobsThread.interrupt();
@@ -2146,6 +2149,19 @@
     LOG.info("stopped all jobtracker services");
     return;
   }
+
+  void stopExpireTrackersThread() {
+    if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive())
{
+      LOG.info("Stopping expireTrackers");
+      this.expireTrackersThread.interrupt();
+      try {
+        this.expireTrackersThread.join();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
+    }
+  }
+
     
   ///////////////////////////////////////////////////////
   // Maintain lookup tables; called by JobInProgress

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=794591&r1=794590&r2=794591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
Thu Jul 16 08:30:39 2009
@@ -73,11 +73,12 @@
     @Override
     public synchronized void initTasks() throws IOException {
       maps = new TaskInProgress[numMapTasks];
+      JobClient.RawSplit[] splits = new JobClient.RawSplit[numMapTasks];
       for (int i = 0; i < numMapTasks; i++) {
-        JobClient.RawSplit split = new JobClient.RawSplit();
-        split.setLocations(new String[0]);
+        splits[i] = new JobClient.RawSplit();
+        splits[i].setLocations(new String[0]);
         maps[i] = new TaskInProgress(getJobID(), "test", 
-            split, jobtracker, getJobConf(), this, i, 1);
+            splits[i], jobtracker, getJobConf(), this, i, 1);
         nonLocalMaps.add(maps[i]);
       }
       reduces = new TaskInProgress[numReduceTasks];
@@ -88,6 +89,7 @@
         nonRunningReduces.add(reduces[i]);
       }
       tasksInited.set(true);
+      nonRunningMapCache = createCache(splits, maxLevel);
     }
     
     private TaskAttemptID findTask(String trackerName, String trackerHost,
@@ -175,7 +177,7 @@
               : Phase.REDUCE, new Counters());
       updateTaskStatus(tip, status);
     }
-    
+
     public void cleanUpMetrics() {
     }
     

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java?rev=794591&r1=794590&r2=794591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java Thu
Jul 16 08:30:39 2009
@@ -17,163 +17,77 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import java.io.IOException;
 
 import junit.framework.TestCase;
-import java.io.*;
 
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+
+/**
+ * A test to verify JobTracker's resilience to lost task trackers. 
+ * 
+ */
+@SuppressWarnings("deprecation")
 public class TestLostTracker extends TestCase {
-  final Path testDir = new Path("/jt-lost-tt");
-  final Path inDir = new Path(testDir, "input");
-  final Path shareDir = new Path(testDir, "share");
-  final Path outputDir = new Path(testDir, "output");
-  
-  private JobConf configureJob(JobConf conf, int maps, int reduces,
-                               String mapSignal, String redSignal) 
-  throws IOException {
-    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, 
-        maps, reduces, "test-lost-tt", 
-        mapSignal, redSignal);
-    return conf;
-  }
+
+  FakeJobInProgress job;
+  static FakeJobTracker jobTracker;
+ 
+  static FakeClock clock;
   
-  public void testLostTracker(MiniDFSCluster dfs,
-                              MiniMRCluster mr) 
-  throws IOException {
-    FileSystem fileSys = dfs.getFileSystem();
-    JobConf jobConf = mr.createJobConf();
-    int numMaps = 10;
-    int numReds = 1;
-    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
-    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
-    
-    // Configure the job
-    JobConf job = configureJob(jobConf, numMaps, numReds, 
-                               mapSignalFile, redSignalFile);
-      
-    fileSys.delete(shareDir, true);
-    
-    // Submit the job   
-    JobClient jobClient = new JobClient(job);
-    RunningJob rJob = jobClient.submitJob(job);
-    JobID id = rJob.getID();
-    
-    // wait for the job to be inited
-    mr.initializeJob(id);
-    
-    // Make sure that the master job is 50% completed
-    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() 
-           < 0.5f) {
-      UtilsForTests.waitFor(10);
-    }
-
-    // get a completed task on 1st tracker 
-    TaskAttemptID taskid = mr.getTaskTrackerRunner(0).getTaskTracker().
-                              getNonRunningTasks().get(0).getTaskID();
-
-    // Kill the 1st tasktracker
-    mr.stopTaskTracker(0);
-
-    // Signal all the maps to complete
-    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
-    
-    // Signal the reducers to complete
-    UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
-                              redSignalFile);
-    // wait till the job is done
-    UtilsForTests.waitTillDone(jobClient);
-
-    // Check if the tasks on the lost tracker got killed and re-executed
-    assertEquals(jobClient.getClusterStatus().getTaskTrackers(), 1);
-    assertEquals(JobStatus.SUCCEEDED, rJob.getJobState());
-    TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
-                         getTip(taskid.getTaskID());
-    assertTrue(tip.isComplete());
-    assertEquals(tip.numKilledTasks(), 1);
-    
-    // check if the task statuses for the tasks are sane
-    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
-    for (TaskInProgress taskInProgress : jt.getJob(id).getMapTasks()) {
-      testTaskStatuses(taskInProgress.getTaskStatuses());
-    }
-    
-    // validate the history file
-    TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);
-    TestJobHistory.validateJobHistoryFileContent(mr, rJob, job);
+  static String trackers[] = new String[] {"tracker_tracker1:1000",
+      "tracker_tracker2:1000"};
+
+  @Override
+  protected void setUp() throws Exception {
+    JobConf conf = new JobConf();
+    conf.set("mapred.job.tracker", "localhost:0");
+    conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+    conf.setLong("mapred.tasktracker.expiry.interval", 1000);
+    jobTracker = new FakeJobTracker(conf, (clock = new FakeClock()), trackers);
+    jobTracker.startExpireTrackersThread();
   }
   
-  private void testTaskStatuses(TaskStatus[] tasks) {
-    for (TaskStatus status : tasks) {
-      assertTrue("Invalid start time " + status.getStartTime(), 
-                 status.getStartTime() > 0);
-      assertTrue("Invalid finish time " + status.getFinishTime(), 
-                 status.getFinishTime() > 0);
-      assertTrue("Start time (" + status.getStartTime() + ") is greater than " 
-                 + "the finish time (" + status.getFinishTime() + ")", 
-                 status.getStartTime() <= status.getFinishTime());
-      assertNotNull("Task phase information is null", status.getPhase());
-      assertNotNull("Task run-state information is null", status.getRunState());
-      assertNotNull("TaskTracker information is null", status.getTaskTracker());
-    }
+  @Override
+  protected void tearDown() throws Exception {
+    jobTracker.stopExpireTrackersThread();
   }
 
   public void testLostTracker() throws IOException {
-    String namenode = null;
-    MiniDFSCluster dfs = null;
-    MiniMRCluster mr = null;
-    FileSystem fileSys = null;
-
-    try {
-      Configuration conf = new Configuration();
-      conf.setBoolean("dfs.replication.considerLoad", false);
-      dfs = new MiniDFSCluster(conf, 1, true, null, null);
-      dfs.waitActive();
-      fileSys = dfs.getFileSystem();
-      
-      // clean up
-      fileSys.delete(testDir, true);
-      
-      if (!fileSys.mkdirs(inDir)) {
-        throw new IOException("Mkdirs failed to create " + inDir.toString());
-      }
-
-      // Write the input file
-      UtilsForTests.writeFile(dfs.getNameNode(), conf, 
-                              new Path(inDir + "/file"), (short)1);
-
-      dfs.startDataNodes(conf, 1, true, null, null, null, null);
-      dfs.waitActive();
-
-      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
-                 + (dfs.getFileSystem()).getUri().getPort();
-
-      JobConf jtConf = new JobConf();
-      jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
-      jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
-      jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000);
-      jtConf.setInt("mapred.reduce.copy.backoff", 4);
-      
-      mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf);
-      
-      // Test Lost tracker case
-      testLostTracker(dfs, mr);
-    } finally {
-      if (mr != null) {
-        try {
-          mr.shutdown();
-        } catch (Exception e) {}
-      }
-      if (dfs != null) {
-        try {
-          dfs.shutdown();
-        } catch (Exception e) {}
-      }
-    }
-  }
+    // Tracker 0 contacts JT
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);
+
+    TaskAttemptID[] tid = new TaskAttemptID[2];
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+    
+    // Tracker 0 gets the map task
+    tid[0] = job.findMapTask(trackers[0]);
 
-  public static void main(String[] args) throws IOException {
-    new TestLostTracker().testLostTracker();
+    job.finishTask(tid[0]);
+
+    // Advance clock. Tracker 0 would have got lost
+    clock.advance(8 * 1000);
+
+    jobTracker.checkExpiredTrackers();
+    
+    // Tracker 1 establishes contact with JT 
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);
+    
+    // Tracker1 should get assigned the lost map task
+    tid[1] =  job.findMapTask(trackers[1]);
+
+    assertNotNull("Map Task from Lost Tracker did not get reassigned", tid[1]);
+    
+    assertEquals("Task ID of reassigned map task does not match",
+        tid[0].getTaskID().toString(), tid[1].getTaskID().toString());
+    
+    job.finishTask(tid[1]);
+    
   }
-}
+}
\ No newline at end of file



Mime
View raw message