hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r755965 [2/2] - in /hadoop/core/branches/HADOOP-3628: ./ ivy/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/http/ src/core/org/apache/hadoop/ipc/ src/core/org/apache/...
Date Thu, 19 Mar 2009 12:15:53 GMT
Modified: hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Mar 19 12:15:51 2009
@@ -81,11 +81,12 @@
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.Service;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
@@ -94,13 +95,19 @@
  * in a networked environment.  It contacts the JobTracker
  * for Task assignments and reporting results.
  *
+ *
+ * The TaskTracker has a complex lifecycle in that it
+ * can be "recycled"; after {@link #closeTaskTracker()} is called,
+ * it can be reset using {@link #initialize()}. This is
+ * within the {@link Service} lifecycle.
  *******************************************************/
-public class TaskTracker 
+public class TaskTracker extends Service
              implements MRConstants, TaskUmbilicalProtocol, Runnable {
-  static final long WAIT_FOR_DONE = 3 * 1000;
-  private int httpPort;
+  /** time to wait for a finished task to be reported as done: {@value}*/
+  private static final long WAIT_FOR_DONE = 3 * 1000;
+  int httpPort;
 
-  static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
+  enum State {NORMAL, STALE, INTERRUPTED, DENIED}
 
   static{
     Configuration.addDefaultResource("mapred-default.xml");
@@ -119,7 +126,10 @@
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
-  volatile boolean running = true;
+  /**
+   * Flag used to synchronize running state across threads.
+   */
+  private volatile boolean running = false;
 
   private LocalDirAllocator localDirAllocator;
   String taskTrackerName;
@@ -134,7 +144,7 @@
   // last heartbeat response recieved
   short heartbeatResponseId = -1;
 
-  /*
+  /**
    * This is the last 'status' report sent by this tracker to the JobTracker.
    * 
    * If the rpc call succeeds, this 'status' is cleared-out by this tracker;
@@ -144,14 +154,15 @@
    */
   TaskTrackerStatus status = null;
   
-  // The system-directory on HDFS where job files are stored 
+  /** The system-directory on HDFS where job files are stored */
   Path systemDirectory = null;
   
-  // The filesystem where job files are stored
+  /** The filesystem where job files are stored */
   FileSystem systemFS = null;
   
-  private final HttpServer server;
+  private HttpServer server;
     
+  /** Flag used to synchronize startup across threads. */
   volatile boolean shuttingDown = false;
     
   Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
@@ -344,33 +355,7 @@
   /**
    * A daemon-thread that pulls tips off the list of things to cleanup.
    */
-  private Thread taskCleanupThread = 
-    new Thread(new Runnable() {
-        public void run() {
-          while (true) {
-            try {
-              TaskTrackerAction action = tasksToCleanup.take();
-              if (action instanceof KillJobAction) {
-                purgeJob((KillJobAction) action);
-              } else if (action instanceof KillTaskAction) {
-                TaskInProgress tip;
-                KillTaskAction killAction = (KillTaskAction) action;
-                synchronized (TaskTracker.this) {
-                  tip = tasks.get(killAction.getTaskID());
-                }
-                LOG.info("Received KillTaskAction for task: " + 
-                         killAction.getTaskID());
-                purgeTask(tip, false);
-              } else {
-                LOG.error("Non-delete action given to cleanup thread: "
-                          + action);
-              }
-            } catch (Throwable except) {
-              LOG.warn(StringUtils.stringifyException(except));
-            }
-          }
-        }
-      }, "taskCleanup");
+  private TaskCleanupThread taskCleanupThread;
     
   private RunningJob addTaskToJob(JobID jobId, 
                                   TaskInProgress tip) {
@@ -481,11 +466,24 @@
   }
     
   /**
-   * Do the real constructor work here.  It's in a separate method
+   * Initialize the connection.
+   * This method will block until a job tracker is found
+   * It's in a separate method
    * so we can call it again and "recycle" the object after calling
    * close().
    */
   synchronized void initialize() throws IOException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Initializing Task Tracker: " + this);
+    }
+    //check that the server is not already live.
+
+    //allow this operation in only two service states: started and live
+    verifyServiceState(ServiceState.STARTED, ServiceState.LIVE);
+
+    //flip the running switch for our inner threads
+    running = true;
+    
     // use configured nameserver & interface to get local hostname
     if (fConf.get("slave.host.name") != null) {
       this.localHostname = fConf.get("slave.host.name");
@@ -572,10 +570,17 @@
     DistributedCache.purgeCache(this.fConf);
     cleanupStorage();
 
+    //mark as just started; this is used in heartbeats
+    this.justStarted = true;
+    int connectTimeout = fConf
+            .getInt("mapred.task.tracker.connect.timeout", 60000);
     this.jobClient = (InterTrackerProtocol) 
       RPC.waitForProxy(InterTrackerProtocol.class,
                        InterTrackerProtocol.versionID, 
-                       jobTrackAddr, this.fConf);
+                       jobTrackAddr, fConf, connectTimeout);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connected to JobTracker at " + jobTrackAddr);
+    }
     this.justInited = true;
     this.running = true;    
     // start the thread that will fetch map task completion events
@@ -611,7 +616,9 @@
    * startup, to remove any leftovers from previous run.
    */
   public void cleanupStorage() throws IOException {
-    this.fConf.deleteLocalFiles();
+    if (fConf != null) {
+      fConf.deleteLocalFiles();
+    }
   }
 
   // Object on wait which MapEventsFetcherThread is going to wait.
@@ -890,25 +897,117 @@
     }
   }
     
-  public synchronized void shutdown() throws IOException {
-    shuttingDown = true;
-    close();
-    if (this.server != null) {
-      try {
-        LOG.info("Shutting down StatusHttpServer");
-        this.server.stop();
+  /////////////////////////////////////////////////////
+  // Service Lifecycle
+  /////////////////////////////////////////////////////
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws IOException for any problem.
+   */
+  @Override
+  protected synchronized void innerStart() throws IOException {
+    JobConf conf = fConf;
+    maxCurrentMapTasks = conf.getInt(
+            "mapred.tasktracker.map.tasks.maximum", 2);
+    maxCurrentReduceTasks = conf.getInt(
+            "mapred.tasktracker.reduce.tasks.maximum", 2);
+    this.jobTrackAddr = JobTracker.getAddress(conf);
+    String infoAddr =
+            NetUtils.getServerAddress(conf,
+                    "tasktracker.http.bindAddress",
+                    "tasktracker.http.port",
+                    "mapred.task.tracker.http.address");
+    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
+    String httpBindAddress = infoSocAddr.getHostName();
+    int port = infoSocAddr.getPort();
+    this.server = new HttpServer("task", httpBindAddress, port,
+            port == 0, conf);
+    workerThreads = conf.getInt("tasktracker.http.threads", 40);
+    this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
+    server.setThreads(1, workerThreads);
+    // let the jsp pages get to the task tracker, config, and other relevant
+    // objects
+    FileSystem local = FileSystem.getLocal(conf);
+    this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
+    server.setAttribute("task.tracker", this);
+    server.setAttribute("local.file.system", local);
+    server.setAttribute("conf", conf);
+    server.setAttribute("log", LOG);
+    server.setAttribute("localDirAllocator", localDirAllocator);
+    server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
+    server.addInternalServlet("mapOutput", "/mapOutput",
+            MapOutputServlet.class);
+    server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
+    server.start();
+    this.httpPort = server.getPort();
+    initialize();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param status a status that can be updated with problems
+   * @throws IOException       for any problem
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (server == null || !server.isAlive()) {
+      status.addThrowable(
+              new IOException("TaskTracker HttpServer is not running on port "
+                      + httpPort));
+    }
+    if (taskReportServer == null) {
+      status.addThrowable(
+              new IOException("TaskTracker Report Server is not running on "
+              + taskReportAddress));
+    }
+  }
+  
+  /**
+   * {@inheritDoc}
+   *
+   * @throws IOException exceptions which will be logged
+   */
+  @Override
+  protected void innerClose() throws IOException {
+    synchronized (this) {
+      shuttingDown = true;
+      closeTaskTracker();
+      if (this.server != null) {
+        try {
+          LOG.info("Shutting down StatusHttpServer");
+          this.server.stop();
       } catch (Exception e) {
         LOG.warn("Exception shutting down TaskTracker", e);
+        }
       }
+      stopCleanupThreads();
     }
   }
+
+  /**
+   * A shutdown request triggers termination
+   * @throws IOException when errors happen during termination
+   */
+  public synchronized void shutdown() throws IOException {
+    close();
+  }
+
   /**
    * Close down the TaskTracker and all its components.  We must also shutdown
    * any running tasks or threads, and cleanup disk space.  A new TaskTracker
    * within the same process space might be restarted, so everything must be
    * clean.
+   * @throws IOException when errors happen during shutdown
    */
-  public synchronized void close() throws IOException {
+  public synchronized void closeTaskTracker() throws IOException {
+    if (!running) {
+      //this operation is a no-op when not already running
+      return;
+    }
+    running = false;
     //
     // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
     // because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -920,27 +1019,37 @@
       tip.jobHasFinished(false);
     }
     
-    this.running = false;
-        
     // Clear local storage
     cleanupStorage();
         
     // Shutdown the fetcher thread
-    this.mapEventsFetcher.interrupt();
+    if (mapEventsFetcher != null) {
+      mapEventsFetcher.interrupt();
+    }
     
     //stop the launchers
-    this.mapLauncher.interrupt();
-    this.reduceLauncher.interrupt();
-    
-    jvmManager.stop();
+    if (mapLauncher != null) {
+      mapLauncher.cleanTaskQueue();
+      mapLauncher.interrupt();
+    }
+    if (reduceLauncher != null) {
+      reduceLauncher.cleanTaskQueue();
+      reduceLauncher.interrupt();
+    }
     
+    if (jvmManager != null) {
+      jvmManager.stop();
+    }
+      
     // shutdown RPC connections
     RPC.stopProxy(jobClient);
 
     // wait for the fetcher thread to exit
     for (boolean done = false; !done; ) {
       try {
-        this.mapEventsFetcher.join();
+        if(mapEventsFetcher != null) {
+          mapEventsFetcher.join();
+        }
         done = true;
       } catch (InterruptedException e) {
       }
@@ -953,52 +1062,54 @@
   }
 
   /**
-   * Start with the local machine name, and the default JobTracker
+   * Create and start a task tracker.
+   * Subclasses must not subclass this constructor, as it may
+   * call their initialisation/startup methods before the constructor
+   * is complete.
+   * It is here for backwards compatibility.
+   * @param conf configuration
+   * @throws IOException for problems on startup
    */
   public TaskTracker(JobConf conf) throws IOException {
+    this(conf, true);
+  }
+
+  /**
+   * Subclasses should extend this constructor and pass start=false to the
+   * superclass to avoid race conditions in constructors and threads.
+   * @param conf configuration
+   * @param start flag to set to true to start the daemon
+   * @throws IOException for problems on startup
+   */
+  protected TaskTracker(JobConf conf, boolean start) throws IOException {
+    super(conf);
     fConf = conf;
-    maxCurrentMapTasks = conf.getInt(
-                  "mapred.tasktracker.map.tasks.maximum", 2);
-    maxCurrentReduceTasks = conf.getInt(
-                  "mapred.tasktracker.reduce.tasks.maximum", 2);
-    this.jobTrackAddr = JobTracker.getAddress(conf);
-    String infoAddr = 
-      NetUtils.getServerAddress(conf,
-                                "tasktracker.http.bindAddress", 
-                                "tasktracker.http.port",
-                                "mapred.task.tracker.http.address");
-    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
-    String httpBindAddress = infoSocAddr.getHostName();
-    int httpPort = infoSocAddr.getPort();
-    this.server = new HttpServer("task", httpBindAddress, httpPort,
-        httpPort == 0, conf);
-    workerThreads = conf.getInt("tasktracker.http.threads", 40);
-    this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
-    server.setThreads(1, workerThreads);
-    // let the jsp pages get to the task tracker, config, and other relevant
-    // objects
-    FileSystem local = FileSystem.getLocal(conf);
-    this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
-    server.setAttribute("task.tracker", this);
-    server.setAttribute("local.file.system", local);
-    server.setAttribute("conf", conf);
-    server.setAttribute("log", LOG);
-    server.setAttribute("localDirAllocator", localDirAllocator);
-    server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
-    server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
-    server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
-    server.start();
-    this.httpPort = server.getPort();
-    initialize();
+    //for backwards compatibility, the task tracker starts up unless told not
+    //to. Subclasses should be very cautious about having their superclass
+    //do that as subclassed methods can be invoked before the class is fully
+    //configured
+    if(start) {
+      deploy(this);
+    }
   }
 
   private void startCleanupThreads() throws IOException {
-    taskCleanupThread.setDaemon(true);
+    taskCleanupThread = new TaskCleanupThread();
     taskCleanupThread.start();
     directoryCleanupThread = new CleanupQueue();
   }
   
   /**
+   * Tell the cleanup threads that they should end themselves 
+   */
+  private void stopCleanupThreads() {
+    if (taskCleanupThread != null) {
+      taskCleanupThread.terminate();
+      taskCleanupThread = null;
+    }
+  }
+  
+  /**
    * The connection to the JobTracker, used by the TaskRunner 
    * for locating remote files.
    */
@@ -1045,6 +1156,7 @@
    */
   State offerService() throws Exception {
     long lastHeartbeat = 0;
+    boolean restartingService = true;
 
     while (running && !shuttingDown) {
       try {
@@ -1060,6 +1172,7 @@
         // 1. Verify the buildVersion
         // 2. Get the system directory & filesystem
         if(justInited) {
+          LOG.debug("Checking build version with JobTracker");
           String jobTrackerBV = jobClient.getBuildVersion();
           if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
             String msg = "Shutting down. Incompatible buildVersion." +
@@ -1069,7 +1182,7 @@
             try {
               jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
             } catch(Exception e ) {
-              LOG.info("Problem reporting to jobtracker: " + e);
+              LOG.info("Problem reporting to jobtracker: " + e, e);
             }
             return State.DENIED;
           }
@@ -1080,6 +1193,9 @@
           }
           systemDirectory = new Path(dir);
           systemFS = systemDirectory.getFileSystem(fConf);
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("System directory is " + systemDirectory);
+          }
         }
         
         // Send the heartbeat and process the jobtracker's directives
@@ -1132,6 +1248,15 @@
           return State.STALE;
         }
             
+        //At this point the job tracker is present and compatible,
+        //so the service is coming up.
+        //It is time to declare it as such
+        if (restartingService) {
+          //declare the service as live.
+          enterLiveState();
+          restartingService = false;
+        }
+            
         // resetting heartbeat interval from the response.
         heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
         justStarted = false;
@@ -1157,15 +1282,16 @@
             
         //we've cleaned up, resume normal operation
         if (!acceptNewTasks && isIdle()) {
+          LOG.info("ready to accept new tasks again");
           acceptNewTasks=true;
         }
       } catch (InterruptedException ie) {
         LOG.info("Interrupted. Closing down.");
         return State.INTERRUPTED;
       } catch (DiskErrorException de) {
-        String msg = "Exiting task tracker for disk error:\n" +
+          String msg = "Exiting task tracker for disk error:\n" +
           StringUtils.stringifyException(de);
-        LOG.error(msg);
+          LOG.error(msg, de);
         synchronized (this) {
           jobClient.reportTaskTrackerError(taskTrackerName, 
                                            "DiskErrorException", msg);
@@ -1177,10 +1303,9 @@
           LOG.info("Tasktracker disallowed by JobTracker.");
           return State.DENIED;
         }
-      } catch (Exception except) {
-        String msg = "Caught exception: " + 
-          StringUtils.stringifyException(except);
-        LOG.error(msg);
+      } catch (IOException except) {
+        String msg = "Caught exception: " + except;
+        LOG.error(msg, except);
       }
     }
 
@@ -1491,6 +1616,7 @@
       localMinSpaceKill = minSpaceKill;  
     }
     if (!enoughFreeSpace(localMinSpaceKill)) {
+      LOG.info("Tasktracker running out of space -not accepting new tasks");
       acceptNewTasks=false; 
       //we give up! do not accept new tasks until
       //all the ones running have finished and they're all cleared up
@@ -1752,7 +1878,7 @@
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
-      LOG.warn(msg);
+      LOG.warn(msg, e);
       tip.reportDiagnosticInfo(msg);
       try {
         tip.kill(true);
@@ -1812,17 +1938,22 @@
               if (!shuttingDown) {
                 LOG.info("Lost connection to JobTracker [" +
                          jobTrackAddr + "].  Retrying...", ex);
+                //enter the started state; we are no longer live
+                enterState(ServiceState.UNDEFINED, ServiceState.STARTED);
                 try {
                   Thread.sleep(5000);
                 } catch (InterruptedException ie) {
+                  LOG.info("Interrupted while waiting for the job tracker", ie);
                 }
               }
             }
           }
         } finally {
-          close();
+          closeTaskTracker();
+        }
+        if (shuttingDown) {
+          return;
         }
-        if (shuttingDown) { return; }
         LOG.warn("Reinitializing local state");
         initialize();
       }
@@ -1830,8 +1961,7 @@
         shutdown();
       }
     } catch (IOException iex) {
-      LOG.error("Got fatal exception while reinitializing TaskTracker: " +
-                StringUtils.stringifyException(iex));
+      LOG.error("Got fatal exception while reinitializing TaskTracker: " + iex, iex);
       return;
     }
   }
@@ -2502,6 +2632,20 @@
           }
           String taskDir = getLocalTaskDir(task.getJobID().toString(),
                              taskId.toString(), task.isTaskCleanupTask());
+          CleanupQueue cleaner = directoryCleanupThread;
+          boolean cleanupThread = cleaner == null;
+          if (!cleanupThread) {
+            LOG.info("Cannot clean up: no directory cleanup thread");
+          }
+          if (taskDir == null) {
+            throw new IOException("taskDir==null");
+          }
+          if(localJobConf==null) {
+              throw new IOException("localJobConf==null");
+          }
+          if (defaultJobConf == null) {
+            throw new IOException("defaultJobConf==null");
+          }
           if (needCleanup) {
             if (runner != null) {
               //cleans up the output directory of the task (where map outputs 
@@ -2785,7 +2929,17 @@
   String getName() {
     return taskTrackerName;
   }
-    
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return the name of this service
+   */
+  @Override
+  public String getServiceName() {
+    return taskTrackerName != null ? taskTrackerName : "Task Tracker";
+  }
+
   private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
                                           boolean sendCounters) {
     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
@@ -2902,7 +3056,9 @@
       // enable the server to track time spent waiting on locks
       ReflectionUtils.setContentionTracing
         (conf.getBoolean("tasktracker.contention.tracking", false));
-      new TaskTracker(conf).run();
+      TaskTracker tracker = new TaskTracker(conf, false);
+      Service.deploy(tracker);
+      tracker.run();
     } catch (Throwable e) {
       LOG.error("Can not start task tracker because "+
                 StringUtils.stringifyException(e));
@@ -3220,8 +3376,70 @@
       try {
         purgeTask(tip, wasFailure); // Marking it as failed/killed.
       } catch (IOException ioe) {
-        LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
+        LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe, ioe);
       }
     }
   }
+
+  /**
+   * Cleanup queue that can process actions to kill a job or task
+   */
+  private class TaskCleanupThread extends Daemon {
+
+    /**
+     * flag to halt work
+     */
+    private volatile boolean live = true;
+
+
+    /**
+     * Construct a daemon thread.
+     */
+    private TaskCleanupThread() {
+      setName("Task Tracker Task Cleanup Thread");
+    }
+
+    /**
+     * End the daemon. This is done by setting the live flag to false and
+     * interrupting ourselves.
+     */
+    public void terminate() {
+      live = false;
+      interrupt();
+    }
+
+    /**
+     * process task kill actions until told to stop being live.
+     */
+    public void run() {
+      LOG.debug("Task cleanup thread started");
+      while (live) {
+        try {
+          TaskTrackerAction action = tasksToCleanup.take();
+          if (action instanceof KillJobAction) {
+            purgeJob((KillJobAction) action);
+          } else if (action instanceof KillTaskAction) {
+            TaskInProgress tip;
+            KillTaskAction killAction = (KillTaskAction) action;
+            synchronized (TaskTracker.this) {
+              tip = tasks.get(killAction.getTaskID());
+            }
+            LOG.info("Received KillTaskAction for task: " +
+                    killAction.getTaskID());
+            purgeTask(tip, false);
+          } else {
+            LOG.error("Non-delete action given to cleanup thread: "
+                    + action);
+          }
+        } catch (InterruptedException except) {
+          //interrupted. this may have reset the live flag
+        } catch (Throwable except) {
+          LOG.warn("Exception in Cleanup thread: " + except,
+                  except);
+        }
+      }
+      LOG.debug("Task cleanup thread ending");
+    }
+  }
+
 }

Modified: hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java Thu Mar 19 12:15:51 2009
@@ -107,6 +107,15 @@
     return actionType;
   }
 
+  /**
+   * {@inheritDoc}
+   * @return the action type.
+   */
+  @Override
+  public String toString() {
+    return "TaskTrackerAction: " + actionType;
+  }
+
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeEnum(out, actionType);
   }

Modified: hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Mar 19 12:15:51 2009
@@ -301,6 +301,18 @@
   }  
   
   /**
+   * String value prints the basic status of the task tracker
+   * @return a string value for diagnostics
+   */
+  @Override
+  public String toString() {
+    return trackerName
+            + " at http://" + host + ":" + httpPort + "/"
+            + " current task count: " + taskReports.size()
+            + " failed task count: " + failures;
+  }
+  
+  /**
    * Return the {@link ResourceStatus} object configured with this
    * status.
    * 

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Thu Mar 19 12:15:51 2009
@@ -79,7 +79,7 @@
 
     FileOutputFormat.setOutputPath(conf, outDir);
 
-    JobClient.runJob(conf);
+    runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(outDir,
@@ -100,4 +100,14 @@
 
   }
 
+  /**
+   * Run a job, getting the timeout from a system property
+   * @param conf job configuration to use
+   * @throws IOException for any problem, including job failure
+   */
+  private void runJob(JobConf conf) throws IOException {
+    long timeout = Long.getLong("test.jobclient.timeout", 0);
+    JobClient.runJob(conf, timeout);
+  }
+
 }
\ No newline at end of file

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestCopyFiles.java Thu Mar 19 12:15:51 2009
@@ -64,6 +64,15 @@
   private static String TEST_ROOT_DIR =
     new Path(System.getProperty("test.build.data","/tmp"))
     .toString().replace(' ', '+');
+  private MiniDFSCluster cluster;
+
+  /**
+   * terminate any non-null cluster
+   */
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    MiniDFSCluster.close(cluster);
+  }
 
   /** class MyFile contains enough information to recreate the contents of
    * a single file.
@@ -269,8 +278,6 @@
   /** copy files from dfs file system to dfs file system */
   public void testCopyFromDfsToDfs() throws Exception {
     String namenode = null;
-    MiniDFSCluster cluster = null;
-    try {
       Configuration conf = new Configuration();
       cluster = new MiniDFSCluster(conf, 2, true, null);
       final FileSystem hdfs = cluster.getFileSystem();
@@ -291,15 +298,10 @@
         deldir(hdfs, "/srcdat");
         deldir(hdfs, "/logs");
       }
-    } finally {
-      if (cluster != null) { cluster.shutdown(); }
-    }
   }
   
   /** copy files from local file system to dfs file system */
   public void testCopyFromLocalToDfs() throws Exception {
-    MiniDFSCluster cluster = null;
-    try {
       Configuration conf = new Configuration();
       cluster = new MiniDFSCluster(conf, 1, true, null);
       final FileSystem hdfs = cluster.getFileSystem();
@@ -319,15 +321,10 @@
         deldir(hdfs, "/logs");
         deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat");
       }
-    } finally {
-      if (cluster != null) { cluster.shutdown(); }
-    }
   }
 
   /** copy files from dfs file system to local file system */
   public void testCopyFromDfsToLocal() throws Exception {
-    MiniDFSCluster cluster = null;
-    try {
       Configuration conf = new Configuration();
       final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
       cluster = new MiniDFSCluster(conf, 1, true, null);
@@ -348,16 +345,11 @@
         deldir(hdfs, "/logs");
         deldir(hdfs, "/srcdat");
       }
-    } finally {
-      if (cluster != null) { cluster.shutdown(); }
-    }
   }
 
   public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(conf, 2, true, null);
+    Configuration conf = new Configuration();
+    cluster = new MiniDFSCluster(conf, 2, true, null);
       final FileSystem hdfs = cluster.getFileSystem();
       final String namenode = hdfs.getUri().toString();
       if (namenode.startsWith("hdfs://")) {
@@ -408,9 +400,7 @@
         deldir(hdfs, "/srcdat");
         deldir(hdfs, "/logs");
       }
-    } finally {
-      if (cluster != null) { cluster.shutdown(); }
-    }
+
   }
 
   public void testCopyDuplication() throws Exception {
@@ -486,11 +476,9 @@
 
   public void testPreserveOption() throws Exception {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = null;
-    try {
-      cluster = new MiniDFSCluster(conf, 2, true, null);
-      String nnUri = FileSystem.getDefaultUri(conf).toString();
-      FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+    cluster = new MiniDFSCluster(conf, 2, true, null);
+    String nnUri = FileSystem.getDefaultUri(conf).toString();
+    FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
 
       {//test preserving user
         MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
@@ -551,19 +539,15 @@
         deldir(fs, "/destdat");
         deldir(fs, "/srcdat");
       }
-    } finally {
-      if (cluster != null) { cluster.shutdown(); }
     }
-  }
 
   public void testMapCount() throws Exception {
     String namenode = null;
-    MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
     try {
       Configuration conf = new Configuration();
-      dfs = new MiniDFSCluster(conf, 3, true, null);
-      FileSystem fs = dfs.getFileSystem();
+      cluster = new MiniDFSCluster(conf, 3, true, null);
+      FileSystem fs = cluster.getFileSystem();
       final FsShell shell = new FsShell(conf);
       namenode = fs.getUri().toString();
       mr = new MiniMRCluster(3, namenode, 1);
@@ -604,8 +588,7 @@
       assertTrue("Unexpected map count, logs.length=" + logs.length,
           logs.length == 2);
     } finally {
-      if (dfs != null) { dfs.shutdown(); }
-      if (mr != null) { mr.shutdown(); }
+      MiniMRCluster.close(mr);
     }
   }
 
@@ -714,9 +697,7 @@
   }
 
   public void testHftpAccessControl() throws Exception {
-    MiniDFSCluster cluster = null;
-    try {
-      final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true); 
+      final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true);
       final UnixUserGroupInformation USER_UGI = createUGI("user", false); 
 
       //start cluster by DFS_UGI
@@ -750,9 +731,6 @@
         fs.setPermission(srcrootpath, new FsPermission((short)0));
         assertEquals(-3, ToolRunner.run(distcp, args));
       }
-    } finally {
-      if (cluster != null) { cluster.shutdown(); }
-    }
   }
 
   /** test -delete */

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestFileSystem.java Thu Mar 19 12:15:51 2009
@@ -65,25 +65,25 @@
   private static final long MEGA = 1024 * 1024;
   private static final int SEEKS_PER_FILE = 4;
 
-  private static String ROOT = System.getProperty("test.build.data","fs_test");
-  private static Path CONTROL_DIR = new Path(ROOT, "fs_control");
-  private static Path WRITE_DIR = new Path(ROOT, "fs_write");
-  private static Path READ_DIR = new Path(ROOT, "fs_read");
-  private static Path DATA_DIR = new Path(ROOT, "fs_data");
+  private static final String ROOT = System.getProperty("test.build.data","fs_test");
+  private static final Path CONTROL_DIR = new Path(ROOT, "fs_control");
+  private static final Path WRITE_DIR = new Path(ROOT, "fs_write");
+  private static final Path READ_DIR = new Path(ROOT, "fs_read");
+  private static final Path DATA_DIR = new Path(ROOT, "fs_data");
 
   public void testFs() throws Exception {
-    testFs(10 * MEGA, 100, 0);
+    createTestFs(10 * MEGA, 100, 0);
   }
 
-  public static void testFs(long megaBytes, int numFiles, long seed)
+  private static void createTestFs(long megaBytes, int numFiles, long seed)
     throws Exception {
 
     FileSystem fs = FileSystem.get(conf);
 
-    if (seed == 0)
+    if (seed == 0) {
       seed = new Random().nextLong();
-
-    LOG.info("seed = "+seed);
+      LOG.info("seed = " + seed);
+    }
 
     createControlFile(fs, megaBytes, numFiles, seed);
     writeTest(fs, false);
@@ -553,7 +553,7 @@
         }
       }
     } finally {
-      if (cluster != null) cluster.shutdown(); 
+      MiniDFSCluster.close(cluster);
     }
   }
   
@@ -563,30 +563,44 @@
     fileSys.checkPath(new Path("hdfs://" + add.getHostName().toUpperCase() + ":" + add.getPort()));
   }
 
-  public void testFsClose() throws Exception {
-    {
-      Configuration conf = new Configuration();
-      new Path("file:///").getFileSystem(conf);
-      UnixUserGroupInformation.login(conf, true);
-      FileSystem.closeAll();
-    }
+  public void testCloseFileFS() throws Exception {
+    Configuration conf = new Configuration();
+    new Path("file:///").getFileSystem(conf);
+    UnixUserGroupInformation.login(conf, true);
+    FileSystem.closeAll();
+  }
 
-    {
+  public void testCloseHftpFS() throws Exception {
       Configuration conf = new Configuration();
       new Path("hftp://localhost:12345/").getFileSystem(conf);
       UnixUserGroupInformation.login(conf, true);
       FileSystem.closeAll();
-    }
+  }
 
-    {
+  public void testCloseHftpFSAltLogin() throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
+    UnixUserGroupInformation.login(fs.getConf(), true);
+    FileSystem.closeAll();
+  }
+
+
+  public void testCloseHDFS() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster(new Configuration(), 2, true, null);
+      URI uri = cluster.getFileSystem().getUri();
+      FileSystem fs = FileSystem.get(uri, new Configuration());
+      checkPath(cluster, fs);
       Configuration conf = new Configuration();
-      FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
-      UnixUserGroupInformation.login(fs.getConf(), true);
+      new Path(uri.toString()).getFileSystem(conf);
+      UnixUserGroupInformation.login(conf, true);
       FileSystem.closeAll();
+    } finally {
+      MiniDFSCluster.close(cluster);
     }
   }
 
-
   public void testCacheKeysAreCaseInsensitive()
     throws Exception
   {

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Thu Mar 19 12:15:51 2009
@@ -25,12 +25,14 @@
 import java.nio.channels.FileChannel;
 import java.util.Random;
 import java.io.RandomAccessFile;
+import java.io.Closeable;
 
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -43,13 +45,18 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.security.*;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * This class creates a single-process DFS cluster for junit testing.
  * The data directories for non-simulated DFS are under the testing directory.
  * For simulated data nodes, no underlying fs storage is used.
  */
-public class MiniDFSCluster {
+public class MiniDFSCluster implements Closeable {
+  private static final int WAIT_SLEEP_TIME_MILLIS = 500;
+  private static final int STARTUP_TIMEOUT_MILLIS = 30000;
 
   public class DataNodeProperties {
     DataNode datanode;
@@ -63,6 +70,7 @@
     }
   }
 
+  private static final Log LOG = LogFactory.getLog(MiniDFSCluster.class);
   private Configuration conf;
   private NameNode nameNode;
   private int numDataNodes;
@@ -281,17 +289,18 @@
   }
 
   /**
-   * wait for the cluster to get out of 
+   * wait for the cluster to get out of
    * safemode.
    */
   public void waitClusterUp() {
     if (numDataNodes > 0) {
-      while (!isClusterUp()) {
-        try {
-          System.err.println("Waiting for the Mini HDFS Cluster to start...");
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
+      try {
+        while (!isClusterUp()) {
+          LOG.warn("Waiting for the Mini HDFS Cluster to start...");
+          Thread.sleep(WAIT_SLEEP_TIME_MILLIS);
         }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted during startup", e);
       }
     }
   }
@@ -323,7 +332,6 @@
                              boolean manageDfsDirs, StartupOption operation, 
                              String[] racks, String[] hosts,
                              long[] simulatedCapacities) throws IOException {
-
     int curDatanodesNum = dataNodes.size();
     // for mincluster's the default initialDelay for BRs is 0
     if (conf.get("dfs.blockreport.initialDelay") == null) {
@@ -350,7 +358,7 @@
     }
     //Generate some hostnames if required
     if (racks != null && hosts == null) {
-      System.out.println("Generating host names for datanodes");
+      LOG.info("Generating host names for datanodes");
       hosts = new String[numDataNodes];
       for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
         hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
@@ -393,16 +401,16 @@
         dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
             simulatedCapacities[i-curDatanodesNum]);
       }
-      System.out.println("Starting DataNode " + i + " with dfs.data.dir: " 
+      LOG.info("Starting DataNode " + i + " with dfs.data.dir: "
                          + dnConf.get("dfs.data.dir"));
       if (hosts != null) {
         dnConf.set("slave.host.name", hosts[i - curDatanodesNum]);
-        System.out.println("Starting DataNode " + i + " with hostname set to: " 
+        LOG.info("Starting DataNode " + i + " with hostname set to: "
                            + dnConf.get("slave.host.name"));
       }
       if (racks != null) {
         String name = hosts[i - curDatanodesNum];
-        System.out.println("Adding node with hostname : " + name + " to rack "+
+        LOG.info("Adding node with hostname : " + name + " to rack "+
                             racks[i-curDatanodesNum]);
         StaticMapping.addNodeToRack(name,
                                     racks[i-curDatanodesNum]);
@@ -417,7 +425,7 @@
       String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
       if (racks != null) {
         int port = dn.getSelfAddr().getPort();
-        System.out.println("Adding node with IP:port : " + ipAddr + ":" + port+
+        LOG.info("Adding node with IP:port : " + ipAddr + ":" + port+
                             " to rack " + racks[i-curDatanodesNum]);
         StaticMapping.addNodeToRack(ipAddr + ":" + port,
                                   racks[i-curDatanodesNum]);
@@ -548,8 +556,8 @@
   /**
    * Shut down the servers that are up.
    */
-  public void shutdown() {
-    System.out.println("Shutting down the Mini HDFS Cluster");
+  public synchronized void shutdown() {
+    LOG.info("Shutting down the Mini HDFS Cluster");
     shutdownDataNodes();
     if (nameNode != null) {
       nameNode.stop();
@@ -557,20 +565,57 @@
       nameNode = null;
     }
   }
-  
+
+  /**
+   * Shuts down the cluster.  
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  public void close() throws IOException {
+    shutdown();
+  }
+
+  /**
+   * Static operation to shut down a cluster;
+   * harmless if the cluster argument is null
+   *
+   * @param cluster cluster to shut down, or null for no cluster
+   */
+  public static void close(Closeable cluster) {
+    Service.close(cluster);
+  }
+
   /**
    * Shutdown all DataNodes started by this class.  The NameNode
    * is left running so that new DataNodes may be started.
    */
   public void shutdownDataNodes() {
     for (int i = dataNodes.size()-1; i >= 0; i--) {
-      System.out.println("Shutting down DataNode " + i);
+      LOG.info("Shutting down DataNode " + i);
       DataNode dn = dataNodes.remove(i).datanode;
       dn.shutdown();
       numDataNodes--;
     }
   }
 
+  /**
+   * Returns a string representation of the cluster.
+   *
+   * @return a string representation of the cluster
+   */
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("Cluster up:").append(isClusterUp());
+    builder.append("\nName Node:").append(getNameNode());
+    builder.append("\nData node count:").append(dataNodes.size());
+    for (DataNodeProperties dnp : dataNodes) {
+      builder.append("\n Datanode: ").append(dnp.datanode);
+      builder.append("\n  state: ").append(dnp.datanode.getServiceState());
+    }
+    return builder.toString();
+  }
+
   /*
    * Corrupt a block on all datanode
    */
@@ -595,12 +640,15 @@
       if (blockFile.exists()) {
         // Corrupt replica by writing random bytes into replica
         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
-        FileChannel channel = raFile.getChannel();
-        String badString = "BADBAD";
-        int rand = random.nextInt((int)channel.size()/2);
-        raFile.seek(rand);
-        raFile.write(badString.getBytes());
-        raFile.close();
+        try {
+          FileChannel channel = raFile.getChannel();
+          String badString = "BADBAD";
+          int rand = random.nextInt((int) channel.size() / 2);
+          raFile.seek(rand);
+          raFile.write(badString.getBytes());
+        } finally {
+          raFile.close();
+        }
       }
       corrupted = true;
     }
@@ -616,7 +664,7 @@
     }
     DataNodeProperties dnprop = dataNodes.remove(i);
     DataNode dn = dnprop.datanode;
-    System.out.println("MiniDFSCluster Stopping DataNode " + 
+    LOG.info("MiniDFSCluster Stopping DataNode " +
                        dn.dnRegistration.getName() +
                        " from a total of " + (dataNodes.size() + 1) + 
                        " datanodes.");
@@ -655,8 +703,10 @@
     }
   }
 
-  /*
+  /**
    * Shutdown a datanode by name.
+   * @param name datanode name
+   * @return true if a node was shut down
    */
   public synchronized DataNodeProperties stopDataNode(String name) {
     int i;
@@ -670,7 +720,7 @@
   }
   
   /**
-   * Returns true if the NameNode is running and is out of Safe Mode.
+   * @return true if the NameNode is running and is out of Safe Mode.
    */
   public boolean isClusterUp() {
     if (nameNode == null) {
@@ -685,7 +735,7 @@
   }
   
   /**
-   * Returns true if there is at least one DataNode running.
+   * @return true if there is at least one DataNode running.
    */
   public boolean isDataNodeUp() {
     if (dataNodes == null || dataNodes.size() == 0) {
@@ -696,13 +746,15 @@
   
   /**
    * Get a client handle to the DFS cluster.
+   * @return a new filesystem, which must be closed when no longer needed.
+   * @throws IOException if the filesystem cannot be created
    */
   public FileSystem getFileSystem() throws IOException {
     return FileSystem.get(conf);
   }
 
   /**
-   * Get the directories where the namenode stores its image.
+   * @return the directories where the namenode stores its state.
    */
   public Collection<File> getNameDirs() {
     return FSNamesystem.getNamespaceDirs(conf);
@@ -725,17 +777,27 @@
     InetSocketAddress addr = new InetSocketAddress("localhost",
                                                    getNameNodePort());
     DFSClient client = new DFSClient(addr, conf);
+    try {
+      DatanodeInfo[] dnInfos;
 
-    // make sure all datanodes are alive
-    while(client.datanodeReport(DatanodeReportType.LIVE).length
-        != numDataNodes) {
-      try {
-        Thread.sleep(500);
-      } catch (Exception e) {
+      // make sure all datanodes are alive
+      long timeout=System.currentTimeMillis() + STARTUP_TIMEOUT_MILLIS;
+      while((dnInfos = client.datanodeReport(DatanodeReportType.LIVE)).length
+          != numDataNodes) {
+        try {
+          Thread.sleep(WAIT_SLEEP_TIME_MILLIS);
+          if(System.currentTimeMillis() > timeout) {
+            throw new IOException("Timeout waiting for the datanodes. "+
+                    "Expected " + numDataNodes + "but got " + dnInfos.length);
+          }
+        } catch (InterruptedException e) {
+          throw new IOException("Interrupted while waiting for the datanodes",e);
+        }
       }
+    } finally {
+      client.close();
     }
 
-    client.close();
   }
   
   public void formatDataNodeDirs() throws IOException {
@@ -824,7 +886,7 @@
   }
 
   /**
-   * Returns the current set of datanodes
+   * @return the current set of datanodes
    */
   DataNode[] listDataNodes() {
     DataNode[] list = new DataNode[dataNodes.size()];

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java Thu Mar 19 12:15:51 2009
@@ -58,7 +58,7 @@
     config = new Configuration();
     config.set("dfs.name.dir", new File(hdfsDir, "name1").getPath());
     FileSystem.setDefaultUri(config, "hdfs://"+NAME_NODE_HOST + "0");
-    config.set("dfs.http.address", NAME_NODE_HTTP_HOST + "0");
+    config.set("dfs.http.address", "0.0.0.0:0");
     NameNode.format(config);
 
     String[] args = new String[] {};

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestReplication.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestReplication.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestReplication.java Thu Mar 19 12:15:51 2009
@@ -19,7 +19,6 @@
 
 import junit.framework.TestCase;
 import java.io.*;
-import java.util.Iterator;
 import java.util.Random;
 import java.net.*;
 
@@ -71,7 +70,7 @@
     Configuration conf = fileSys.getConf();
     ClientProtocol namenode = DFSClient.createNamenode(conf);
       
-    waitForBlockReplication(name.toString(), namenode, 
+    waitForBlockReplication(name.toString(), namenode,
                             Math.min(numDatanodes, repl), -1);
     
     LocatedBlocks locations = namenode.getBlockLocations(name.toString(),0,
@@ -169,9 +168,9 @@
     // Now get block details and check if the block is corrupt
     blocks = dfsClient.namenode.
               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    LOG.info("Waiting until block is marked as corrupt...");
     while (blocks.get(0).isCorrupt() != true) {
       try {
-        LOG.info("Waiting until block is marked as corrupt...");
         Thread.sleep(1000);
       } catch (InterruptedException ie) {
       }
@@ -249,16 +248,14 @@
       boolean replOk = true;
       LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, 
                                                         Long.MAX_VALUE);
-      
-      for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
-           iter.hasNext();) {
-        LocatedBlock block = iter.next();
+
+      for (LocatedBlock block : blocks.getLocatedBlocks()) {
         int actual = block.getLocations().length;
-        if ( actual < expected ) {
+        if (actual < expected) {
           if (true || iters > 0) {
             LOG.info("Not enough replicas for " + block.getBlock() +
-                               " yet. Expecting " + expected + ", got " + 
-                               actual + ".");
+                    " yet. Expecting " + expected + ", got " +
+                    actual + ".");
           }
           replOk = false;
           break;
@@ -385,10 +382,8 @@
       waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
       
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }  
+      MiniDFSCluster.close(cluster);
+    }
   }
   
   /**

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Thu Mar 19 12:15:51 2009
@@ -137,6 +137,11 @@
     
     editLog.close();
 
+    //check that the namesystem is still healthy
+    assertNotNull("FSNamesystem.getFSNamesystem()  is null",
+            FSNamesystem.getFSNamesystem());
+    assertNotNull("FSNamesystem.getFSNamesystem().dir is null",
+            FSNamesystem.getFSNamesystem().dir);
     // Verify that we can read in all the transactions that we have written.
     // If there were any corruptions, it is likely that the reading in
     // of these transactions will throw an exception.

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java Thu Mar 19 12:15:51 2009
@@ -93,7 +93,7 @@
       config = props;
     }
 
-    public ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
+    private ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
                                      int numDir) throws Exception {
       super(numTaskTrackers, namenode, numDir);
     }
@@ -121,14 +121,10 @@
    * @throws Exception if the cluster could not be stopped
    */
   protected void stopCluster() throws Exception {
-    if (mrCluster != null) {
-      mrCluster.shutdown();
-      mrCluster = null;
-    }
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-      dfsCluster = null;
-    }
+    MiniMRCluster.close(mrCluster);
+    mrCluster = null;
+    MiniDFSCluster.close(dfsCluster);
+    dfsCluster = null;
   }
 
   /**

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/HadoopTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/HadoopTestCase.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/HadoopTestCase.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/HadoopTestCase.java Thu Mar 19 12:15:51 2009
@@ -163,22 +163,8 @@
    * @throws Exception
    */
   protected void tearDown() throws Exception {
-    try {
-      if (mrCluster != null) {
-        mrCluster.shutdown();
-      }
-    }
-    catch (Exception ex) {
-      System.out.println(ex);
-    }
-    try {
-      if (dfsCluster != null) {
-        dfsCluster.shutdown();
-      }
-    }
-    catch (Exception ex) {
-      System.out.println(ex);
-    }
+    MiniMRCluster.close(mrCluster);
+    MiniDFSCluster.close(dfsCluster);
     super.tearDown();
   }
 

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Thu Mar 19 12:15:51 2009
@@ -19,6 +19,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -31,12 +32,13 @@
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.StaticMapping;
 import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.util.Service;
 
 /**
  * This class creates a single-process Map-Reduce cluster for junit testing.
  * One thread is created for each server.
  */
-public class MiniMRCluster {
+public class MiniMRCluster implements Closeable {
   private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
     
   private Thread jobTrackerThread;
@@ -52,9 +54,13 @@
     
   private String namenode;
   private UnixUserGroupInformation ugi = null;
+  private static final int TRACKER_STABILIZATION_TIMEOUT = 30000;
+
   private JobConf conf;
     
   private JobConf job;
+  /** time for a tracker to shut down : {@value} */
+  private static final long TRACKER_SHUTDOWN_TIMEOUT = 30000;
   
   /**
    * An inner class that runs a job tracker.
@@ -101,7 +107,7 @@
         tracker = JobTracker.startTracker(jc);
         tracker.offerService();
       } catch (Throwable e) {
-        LOG.error("Job tracker crashed", e);
+        LOG.error("Job tracker crashed: " + e, e);
         isActive = false;
       }
     }
@@ -110,13 +116,12 @@
      * Shutdown the job tracker and wait for it to finish.
      */
     public void shutdown() {
-      try {
-        if (tracker != null) {
-          tracker.stopTracker();
-        }
-      } catch (Throwable e) {
-        LOG.error("Problem shutting down job tracker", e);
+      JobTracker jobTracker;
+      synchronized (this) {
+        jobTracker = tracker;
+        tracker = null;
       }
+      Service.close(jobTracker);
       isActive = false;
     }
   }
@@ -177,7 +182,7 @@
       } catch (Throwable e) {
         isDead = true;
         tt = null;
-        LOG.error("task tracker " + trackerId + " crashed", e);
+        LOG.error("task tracker " + trackerId + " crashed : "+e, e);
       }
     }
         
@@ -420,7 +425,7 @@
      
      //Generate rack names if required
      if (racks == null) {
-       System.out.println("Generating rack names for tasktrackers");
+       LOG.info("Generating rack names for tasktrackers");
        racks = new String[numTaskTrackers];
        for (int i=0; i < racks.length; ++i) {
          racks[i] = NetworkTopology.DEFAULT_RACK;
@@ -429,7 +434,7 @@
      
     //Generate some hostnames if required
     if (hosts == null) {
-      System.out.println("Generating host names for tasktrackers");
+      LOG.info("Generating host names for tasktrackers");
       hosts = new String[numTaskTrackers];
       for (int i = 0; i < numTaskTrackers; i++) {
         hosts[i] = "host" + i + ".foo.com";
@@ -470,6 +475,24 @@
     }
     
     this.job = createJobConf(conf);
+    // Wait till the MR cluster stabilizes
+    long timeout = System.currentTimeMillis() +
+            TRACKER_STABILIZATION_TIMEOUT;
+    while(jobTracker.tracker.getNumResolvedTaskTrackers() != numTaskTrackers) {
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException ie) {
+        throw new IOException("Interrupted during startup");
+      }
+      if(System.currentTimeMillis() > timeout) {
+        String message = "Time out waiting for the task trackers to stabilize. "
+                + "Expected tracker count: " + numTaskTrackers
+                + "  -actual count: "
+                + jobTracker.tracker.getNumResolvedTaskTrackers();
+        LOG.error(message);
+        throw new IOException(message);
+      }
+    }
     waitUntilIdle();
   }
     
@@ -583,12 +606,11 @@
    * Kill the jobtracker.
    */
   public void stopJobTracker() {
-    //jobTracker.exit(-1);
     jobTracker.shutdown();
 
     jobTrackerThread.interrupt();
     try {
-      jobTrackerThread.join();
+      jobTrackerThread.join(TRACKER_SHUTDOWN_TIMEOUT);
     } catch (InterruptedException ex) {
       LOG.error("Problem waiting for job tracker to finish", ex);
     }
@@ -649,6 +671,25 @@
     }
   }
     
+  /**
+   * Static operation to shut down a cluster; harmless if the cluster argument
+   * is null
+   *
+   * @param cluster cluster to shut down, or null for no cluster
+   */
+  public static void close(Closeable cluster) {
+    Service.close(cluster);
+  }
+
+  /**
+   * Shuts down the cluster.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  public void close() throws IOException {
+    shutdown();
+  }
+    
   public static void main(String[] args) throws IOException {
     LOG.info("Bringing up Jobtracker and tasktrackers.");
     MiniMRCluster mr = new MiniMRCluster(4, "file:///", 1);

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java Thu Mar 19 12:15:51 2009
@@ -35,6 +35,9 @@
  */
 public class TestMRServerPorts extends TestCase {
   TestHDFSServerPorts hdfs = new TestHDFSServerPorts();
+  private static final String STARTED_UNEXPECTEDLY
+          = "the Job tracker should not have started";
+  private static final String FAILED_TO_START = "The Job tracker did not start";
 
   // Runs the JT in a separate thread
   private static class JTRunner extends Thread {
@@ -85,7 +88,6 @@
         return false;
       throw e;
     }
-    jt.fs.close();
     jt.stopTracker();
     return true;
   }
@@ -122,21 +124,21 @@
       conf2.set("mapred.job.tracker.http.address",
         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
       boolean started = canStartJobTracker(conf2);
-      assertFalse(started); // should fail
+      assertFalse(STARTED_UNEXPECTEDLY, started); // should fail
 
       // bind http server to the same port as name-node
       conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
       conf2.set("mapred.job.tracker.http.address",
         hdfs.getConfig().get("dfs.http.address"));
       started = canStartJobTracker(conf2);
-      assertFalse(started); // should fail again
+      assertFalse(STARTED_UNEXPECTEDLY, started); // should fail again
 
       // both ports are different from the name-node ones
       conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
       conf2.set("mapred.job.tracker.http.address",
         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
       started = canStartJobTracker(conf2);
-      assertTrue(started); // should start now
+      assertTrue(FAILED_TO_START, started); // should start now
 
     } finally {
       hdfs.stopNameNode(nn);
@@ -163,7 +165,7 @@
       conf2.set("mapred.task.tracker.http.address",
         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
       boolean started = canStartTaskTracker(conf2);
-      assertFalse(started); // should fail
+      assertFalse(STARTED_UNEXPECTEDLY, started); // should fail
 
       // bind http server to the same port as name-node
       conf2.set("mapred.task.tracker.report.address",
@@ -171,7 +173,7 @@
       conf2.set("mapred.task.tracker.http.address",
         hdfs.getConfig().get("dfs.http.address"));
       started = canStartTaskTracker(conf2);
-      assertFalse(started); // should fail again
+      assertFalse(STARTED_UNEXPECTEDLY, started); // should fail again
 
       // both ports are different from the name-node ones
       conf2.set("mapred.task.tracker.report.address",
@@ -179,7 +181,7 @@
       conf2.set("mapred.task.tracker.http.address",
         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
       started = canStartTaskTracker(conf2);
-      assertTrue(started); // should start now
+      assertTrue(FAILED_TO_START, started); // should start now
     } finally {
       if (jt != null) {
         jt.fs.close();

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java Thu Mar 19 12:15:51 2009
@@ -46,7 +46,12 @@
   private MiniMRCluster mr;
   private MiniDFSCluster dfs;
   private FileSystem fileSys;
-  
+  private static final String BAILING_OUT = "Bailing out";
+  private static final String TEST_SCRIPT_BAILING_OUT
+          = "Test Script\n"+ BAILING_OUT;
+  private static final int SCRIPT_SLEEP_TIMEOUT = 60000;
+  private static final int SCRIPT_SLEEP_INTERVAL = 1000;
+
   /**
    * Fail map class 
    */
@@ -55,7 +60,7 @@
      public void map (LongWritable key, Text value, 
                      OutputCollector<Text, IntWritable> output, 
                      Reporter reporter) throws IOException {
-       System.err.println("Bailing out");
+       System.err.println(BAILING_OUT);
        throw new IOException();
      }
   }
@@ -165,7 +170,17 @@
     // construct the task id of first map task of failmap
     TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId,true, 0), 0);
     // wait for the job to finish.
-    while (!job.isComplete()) ;
+    long timeout = System.currentTimeMillis() + SCRIPT_SLEEP_TIMEOUT;
+    while (!job.isComplete()) {
+      try {
+          Thread.sleep(SCRIPT_SLEEP_INTERVAL);
+        } catch (InterruptedException e) {
+          fail("Interrupted");
+        }
+        if(System.currentTimeMillis() > timeout) {
+          fail("Timeout waiting for the job to complete ");
+      }
+    }
     
     // return the output of debugout log.
     return readTaskLog(TaskLog.LogName.DEBUGOUT,taskId);
@@ -204,7 +219,9 @@
                                outDir,debugDir, debugScript, input);
       
       // Assert the output of debug script.
-      assertEquals("Test Script\nBailing out", result);
+      if(!result.contains(TEST_SCRIPT_BAILING_OUT)) {
+        fail("Did not find " + TEST_SCRIPT_BAILING_OUT + "in \n" + result);
+      }
 
     } finally {  
       // close file system and shut down dfs and mapred cluster

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java Thu Mar 19 12:15:51 2009
@@ -112,13 +112,11 @@
        */
       TestRackAwareTaskPlacement.launchJobAndTestCounters(
     		  testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0);
-      mr.shutdown();
     } finally {
       fileSys.delete(inDir, true);
       fileSys.delete(outputPath, true);
-      if (dfs != null) { 
-        dfs.shutdown(); 
-      }
+      MiniMRCluster.close(mr);
+      MiniDFSCluster.close(dfs);
     }
   }
 }

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Thu Mar 19 12:15:51 2009
@@ -151,14 +151,11 @@
       launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
                                0, 3);
       mr.shutdown();
+      mr=null;
       
     } finally {
-      if (dfs != null) { 
-        dfs.shutdown(); 
-      }
-      if (mr != null) { 
-        mr.shutdown();
-      }
+      MiniDFSCluster.close(dfs);
+      MiniMRCluster.close(mr);
     }
   }
   static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath, 

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Thu Mar 19 12:15:51 2009
@@ -247,8 +247,13 @@
       // they were running on a lost tracker
       testSetupAndCleanupKill(mr, dfs, false);
     } finally {
-      if (dfs != null) { dfs.shutdown(); }
-      if (mr != null) { mr.shutdown();
+      try {
+        if (dfs != null) { dfs.shutdown(); }
+        if (mr != null) { mr.shutdown();
+        }
+      } catch (OutOfMemoryError e) {
+        //ignore this as it means something went very wrong in the test logging
+        //any attempt to log it may make things worse
       }
     }
   }

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Thu Mar 19 12:15:51 2009
@@ -81,8 +81,8 @@
       runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
       mr.waitUntilIdle();
     } finally {
-      mr.shutdown();
-      dfs.shutdown();
+      MiniMRCluster.close(mr);
+      MiniDFSCluster.close(dfs);
     }
   }
 



Mime
View raw message