hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r690020 - in /hadoop/core/trunk: CHANGES.txt src/test/org/apache/hadoop/mapred/MiniMRCluster.java
Date Thu, 28 Aug 2008 21:50:50 GMT
Author: omalley
Date: Thu Aug 28 14:50:50 2008
New Revision: 690020

URL: http://svn.apache.org/viewvc?rev=690020&view=rev
Log:
HADOOP-3950. Cause the Mini MR cluster to wait for task trackers to 
register before continuing. (enis via omalley)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=690020&r1=690019&r2=690020&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Aug 28 14:50:50 2008
@@ -417,6 +417,9 @@
     HADOOP-4037. Fix the eclipse plugin for versions of kfs and log4j. (nigel
     via omalley)
 
+    HADOOP-3950. Cause the Mini MR cluster to wait for task trackers to 
+    register before continuing. (enis via omalley)
+
 Release 0.18.1 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=690020&r1=690019&r2=690020&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Thu Aug 28 14:50:50
2008
@@ -17,17 +17,20 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.StaticMapping;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.fs.FileSystem;
 
 /**
  * This class creates a single-process Map-Reduce cluster for junit testing.
@@ -50,6 +53,8 @@
   private String namenode;
   private UnixUserGroupInformation ugi = null;
     
+  private JobConf job;
+  
   /**
    * An inner class that runs a job tracker.
    */
@@ -222,8 +227,7 @@
    * @return the absolute pathname of the local dir
    */
   public String getTaskTrackerLocalDir(int taskTracker) {
-    return ((TaskTrackerRunner) 
-            taskTrackerList.get(taskTracker)).getLocalDir();
+    return (taskTrackerList.get(taskTracker)).getLocalDir();
   }
 
   public JobTrackerRunner getJobTrackerRunner() {
@@ -241,8 +245,32 @@
    * Wait until the system is idle.
    */
   public void waitUntilIdle() {
-    for(Iterator itr= taskTrackerList.iterator(); itr.hasNext();) {
-      TaskTrackerRunner runner = (TaskTrackerRunner) itr.next();
+    waitTaskTrackers();
+    
+    JobClient client;
+    try {
+      client = new JobClient(job);
+      while(client.getClusterStatus().getTaskTrackers()<taskTrackerList.size()) {
+        for(TaskTrackerRunner runner : taskTrackerList) {
+          if(runner.isDead) {
+            throw new RuntimeException("TaskTracker is dead");
+          }
+        }
+        Thread.sleep(1000);
+      }
+    }
+    catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+    catch (InterruptedException ex) {
+      throw new RuntimeException(ex);
+    }
+    
+  }
+
+  private void waitTaskTrackers() {
+    for(Iterator<TaskTrackerRunner> itr= taskTrackerList.iterator(); itr.hasNext();)
{
+      TaskTrackerRunner runner = itr.next();
       while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle())) {
         if (!runner.isInitialized) {
           LOG.info("Waiting for task tracker to start.");
@@ -256,7 +284,7 @@
       }
     }
   }
-
+  
   /** 
    * Get the actual rpc port used.
    */
@@ -269,6 +297,9 @@
   }
 
   public JobConf createJobConf(JobConf conf) {
+    if(conf == null) {
+      conf = new JobConf();
+    }
     JobConf result = new JobConf(conf);
     FileSystem.setDefaultUri(result, namenode);
     result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
@@ -399,7 +430,7 @@
     this.numTaskTrackers = numTaskTrackers;
     this.namenode = namenode;
     this.ugi = ugi;
-
+    
     // Create the JobTracker
     jobTracker = new JobTrackerRunner(conf);
     jobTrackerThread = new Thread(jobTracker);
@@ -439,7 +470,8 @@
     for (Thread taskTrackerThread : taskTrackerThreadList){
       taskTrackerThread.start();
     }
-
+    
+    this.job = createJobConf(conf);
     waitUntilIdle();
   }
     
@@ -448,10 +480,10 @@
    */
   public void shutdown() {
     try {
-      waitUntilIdle();
+      waitTaskTrackers();
       for (int idx = 0; idx < numTaskTrackers; idx++) {
-        TaskTrackerRunner taskTracker = (TaskTrackerRunner) taskTrackerList.get(idx);
-        Thread taskTrackerThread = (Thread) taskTrackerThreadList.get(idx);
+        TaskTrackerRunner taskTracker = taskTrackerList.get(idx);
+        Thread taskTrackerThread = taskTrackerThreadList.get(idx);
         taskTracker.shutdown();
         taskTrackerThread.interrupt();
         try {



Mime
View raw message