hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r575438 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/webapps/job/
Date Thu, 13 Sep 2007 20:24:11 GMT
Author: cutting
Date: Thu Sep 13 13:24:10 2007
New Revision: 575438

URL: http://svn.apache.org/viewvc?rev=575438&view=rev
Log:
HADOOP-1819.  Jobtracker cleanups, including binding ports before clearing state directories,
so that inadvertantly starting a second jobtracker doesn't trash one that's already running.
 Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp
    lucene/hadoop/trunk/src/webapps/job/jobconf.jsp
    lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
    lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp
    lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp
    lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp
    lucene/hadoop/trunk/src/webapps/job/machines.jsp
    lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp
    lucene/hadoop/trunk/src/webapps/job/taskstats.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Sep 13 13:24:10 2007
@@ -218,6 +218,12 @@
     HADOOP-1718.  Add ant targets for measuring code coverage with clover.
     (simonwillnauer via nigel)
 
+    HADOOP-1819.  Jobtracker cleanups, including binding ports before
+    clearing state directories, so that inadvertently starting a
+    second jobtracker doesn't trash one that's already running.
+    (omalley via cutting)
+
+
 Release 0.14.1 - 2007-09-04
 
   BUG FIXES

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Thu Sep 13 13:24:10 2007
@@ -22,8 +22,6 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.BufferedOutputStream;
-import java.io.StringWriter;
-import java.io.PrintWriter;
 import java.io.ByteArrayInputStream;
 
 import java.nio.ByteBuffer;
@@ -32,9 +30,14 @@
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 
+import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
 
 import java.util.Collections;
 import java.util.LinkedList;
@@ -44,7 +47,6 @@
 
 import org.apache.commons.logging.*;
 
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -142,7 +144,35 @@
   private Listener listener = null;
   private int numConnections = 0;
   private Handler[] handlers = null;
-  
+
+  /**
+   * A convience method to bind to a given address and report 
+   * better exceptions if the address is not a valid host.
+   * @param socket the socket to bind
+   * @param address the address to bind to
+   * @param backlog the number of connections allowed in the queue
+   * @throws BindException if the address can't be bound
+   * @throws UnknownHostException if the address isn't a valid host name
+   * @throws IOException other random errors from bind
+   */
+  static void bind(ServerSocket socket, InetSocketAddress address, 
+                   int backlog) throws IOException {
+    try {
+      socket.bind(address, backlog);
+    } catch (BindException e) {
+      throw new BindException("Problem binding to " + address);
+    } catch (SocketException e) {
+      // If they try to bind to a different host's address, give a better
+      // error message.
+      if ("Unresolved address".equals(e.getMessage())) {
+        throw new UnknownHostException("Invalid hostname for server: " + 
+                                       address.getHostName());
+      } else {
+        throw e;
+      }
+    }
+  }
+
   /** A call queued for handling. */
   private static class Call {
     private int id;                               // the client's call id
@@ -182,7 +212,7 @@
       acceptChannel.configureBlocking(false);
 
       // Bind the server socket to the local host and port
-      acceptChannel.socket().bind(address, backlogLength);
+      bind(acceptChannel.socket(), address, backlogLength);
       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
       // create a selector;
       selector= Selector.open();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java Thu Sep 13 13:24:10
2007
@@ -23,34 +23,28 @@
 import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Summarizes the size and current state of the cluster.
  */
 public class ClusterStatus implements Writable {
 
-  static {                                        // register a ctor
-    WritableFactories.setFactory
-      (ClusterStatus.class,
-       new WritableFactory() {
-         public Writable newInstance() { return new ClusterStatus(); }
-       });
-  }
-
   private int task_trackers;
   private int map_tasks;
   private int reduce_tasks;
   private int max_tasks;
+  private JobTracker.State state;
 
   ClusterStatus() {}
   
-  ClusterStatus(int trackers, int maps, int reduces, int max) {
+  ClusterStatus(int trackers, int maps, int reduces, int max,
+                JobTracker.State state) {
     task_trackers = trackers;
     map_tasks = maps;
     reduce_tasks = reduces;
     max_tasks = max;
+    this.state = state;
   }
   
 
@@ -81,12 +75,17 @@
   public int getMaxTasks() {
     return max_tasks;
   }
-  
+
+  public JobTracker.State getJobTrackerState() {
+    return state;
+  }
+
   public void write(DataOutput out) throws IOException {
     out.writeInt(task_trackers);
     out.writeInt(map_tasks);
     out.writeInt(reduce_tasks);
     out.writeInt(max_tasks);
+    WritableUtils.writeEnum(out, state);
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -94,6 +93,7 @@
     map_tasks = in.readInt();
     reduce_tasks = in.readInt();
     max_tasks = in.readInt();
+    state = WritableUtils.readEnum(in, JobTracker.State.class);
   }
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Thu Sep
13 13:24:10 2007
@@ -32,6 +32,7 @@
    *Changing the versionID to 2L since the getTaskCompletionEvents method has
    *changed.
    *Changed to 4 since killTask(String,boolean) is added
+   *Version 4: added jobtracker state to ClusterStatus
    */
   public static final long versionID = 4L;
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Sep 13 13:24:10
2007
@@ -19,7 +19,9 @@
 
 
 import java.io.IOException;
+import java.net.BindException;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.text.NumberFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -66,6 +68,17 @@
   static float TASK_ALLOC_EPSILON;
   static float PAD_FRACTION;
   static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
+  public static enum State { INITIALIZING, RUNNING }
+  State state = State.INITIALIZING;
+  
+  /**
+   * A client tried to submit a job before the Job Tracker was ready.
+   */
+  public static class IllegalStateException extends IOException {
+    public IllegalStateException(String msg) {
+      super(msg);
+    }
+  }
 
   /**
    * The maximum no. of 'completed' (successful/failed/killed)
@@ -85,9 +98,6 @@
   private int nextJobId = 1;
 
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobTracker");
-
-  private static JobTracker tracker = null;
-  private static boolean runTracker = true;
     
   /**
    * Start the JobTracker with given configuration.
@@ -99,17 +109,18 @@
    * @param conf configuration for the JobTracker.
    * @throws IOException
    */
-  public static void startTracker(JobConf conf) throws IOException {
-    if (tracker != null)
-      throw new IOException("JobTracker already running.");
-    runTracker = true;
-    while (runTracker) {
+  public static JobTracker startTracker(JobConf conf) throws IOException {
+    JobTracker result = null;
+    while (true) {
       try {
-        tracker = new JobTracker(conf);
+        result = new JobTracker(conf);
         break;
-      } catch (VersionMismatch v) {
-        // Can't recover from a version mismatch. Avoid the retry loop and re-throw
-        throw v;
+      } catch (VersionMismatch e) {
+        throw e;
+      } catch (BindException e) {
+        throw e;
+      } catch (UnknownHostException e) {
+        throw e;
       } catch (IOException e) {
         LOG.warn("Error starting tracker: " + 
                  StringUtils.stringifyException(e));
@@ -117,25 +128,17 @@
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {
-      }
+      }      
     }
-    if (runTracker) {
+    if (result != null) {
       JobEndNotifier.startNotifier();
-      tracker.offerService();
     }
+    return result;
   }
 
-  public static JobTracker getTracker() {
-    return tracker;
-  }
-
-  public static void stopTracker() throws IOException {
-    runTracker = false;
-    if (tracker != null) {
-      JobEndNotifier.stopNotifier();
-      tracker.close();
-      tracker = null;
-    }
+  public void stopTracker() throws IOException {
+    JobEndNotifier.stopNotifier();
+    close();
   }
     
   public long getProtocolVersion(String protocol, 
@@ -426,8 +429,9 @@
     private int numReduceTasksCompleted = 0;
     private int numJobsSubmitted = 0;
     private int numJobsCompleted = 0;
+    private JobTracker tracker;
       
-    JobTrackerMetrics(JobConf conf) {
+    JobTrackerMetrics(JobTracker tracker, JobConf conf) {
       String sessionId = conf.getSessionId();
       // Initiate JVM Metrics
       JvmMetrics.init("JobTracker", sessionId);
@@ -435,6 +439,7 @@
       MetricsContext context = MetricsUtil.getContext("mapred");
       metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
       metricsRecord.setTag("sessionId", sessionId);
+      this.tracker = tracker;
       context.registerUpdater(this);
     }
       
@@ -459,14 +464,14 @@
         numJobsCompleted = 0;
       }
       metricsRecord.update();
-        
+
       if (tracker != null) {
         for (JobInProgress jip : tracker.getRunningJobs()) {
           jip.updateMetrics();
         }
       }
     }
-      
+
     synchronized void launchMap() {
       ++numMapTasksLaunched;
     }
@@ -629,13 +634,6 @@
     JobConf jobConf = new JobConf(conf);
     this.systemDir = jobConf.getSystemDir();
     this.fs = FileSystem.get(conf);
-    fs.delete(systemDir);
-    if (!fs.mkdirs(systemDir)) {
-      throw new IOException("Mkdirs failed to create " + systemDir.toString());
-    }
-
-    // Same with 'localDir' except it's always on the local disk.
-    jobConf.deleteLocalFiles(SUBDIR);
 
     // Read the hosts/exclude files to restrict access to the jobtracker.
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
@@ -648,23 +646,26 @@
     int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
     this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount,
false, conf);
     this.interTrackerServer.start();
-    Properties p = System.getProperties();
-    for (Iterator it = p.keySet().iterator(); it.hasNext();) {
-      String key = (String) it.next();
-      String val = (String) p.getProperty(key);
-      LOG.info("Property '" + key + "' is " + val);
+    if (LOG.isDebugEnabled()) {
+      Properties p = System.getProperties();
+      for (Iterator it = p.keySet().iterator(); it.hasNext();) {
+        String key = (String) it.next();
+        String val = (String) p.getProperty(key);
+        LOG.debug("Property '" + key + "' is " + val);
+      }
     }
 
     this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
     this.infoBindAddress = conf.get("mapred.job.tracker.info.bindAddress","0.0.0.0");
-    this.infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false);
-    this.infoServer.start();
+    infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false);
+    infoServer.setAttribute("job.tracker", this);
+    infoServer.start();
 
     this.startTime = System.currentTimeMillis();
     SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
     trackerIdentifier = dateFormat.format(new Date());
 
-    myMetrics = new JobTrackerMetrics(jobConf);
+    myMetrics = new JobTrackerMetrics(this, jobConf);
     
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                            "expireTrackers");
@@ -683,6 +684,25 @@
     this.infoPort = this.infoServer.getPort();
     this.conf.setInt("mapred.job.tracker.info.port", this.infoPort); 
     LOG.info("JobTracker webserver: " + this.infoServer.getPort());
+
+    while (true) {
+      try {
+        fs.delete(systemDir);
+        if (fs.mkdirs(systemDir)) {
+          break;
+        }
+        LOG.error("Mkdirs failed to create " + systemDir);
+      } catch (IOException ie) {
+        LOG.info("problem cleaning system directory: " + systemDir, ie);
+      }
+    }
+
+    // Same with 'localDir' except it's always on the local disk.
+    jobConf.deleteLocalFiles(SUBDIR);
+    synchronized (this) {
+      state = State.RUNNING;
+    }
+    LOG.info("Starting RUNNING");
   }
 
   public static InetSocketAddress getAddress(Configuration conf) {
@@ -1456,13 +1476,21 @@
   ////////////////////////////////////////////////////
 
   /**
+   * Make sure the JobTracker is done initializing.
+   */
+  private synchronized void ensureRunning() throws IllegalStateException {
+    if (state != State.RUNNING) {
+      throw new IllegalStateException("Job tracker still initializing");
+    }
+  }
+
+  /**
    * Allocates a new JobId string.
    */
-  public String getNewJobId() {
-    synchronized (this) {
-      return "job_" + getTrackerIdentifier() + "_" + 
+  public synchronized String getNewJobId() throws IOException {
+    ensureRunning();
+    return "job_" + getTrackerIdentifier() + "_" + 
              idFormat.format(nextJobId++);
-    }
   }
 
   /**
@@ -1478,6 +1506,7 @@
    * the right TaskTracker/Block mapping.
    */
   public synchronized JobStatus submitJob(String jobFile) throws IOException {
+    ensureRunning();
     totalSubmissions++;
     JobInProgress job = new JobInProgress(jobFile, this, this.conf);
     synchronized (jobs) {
@@ -1526,7 +1555,8 @@
       return new ClusterStatus(taskTrackers.size(),
                                totalMaps,
                                totalReduces,
-                               maxCurrentTasks);          
+                               maxCurrentTasks,
+                               state);          
     }
   }
     
@@ -1816,7 +1846,8 @@
     }
       
     try {
-      startTracker(new JobConf());
+      JobTracker tracker = startTracker(new JobConf());
+      tracker.offerService();
     } catch (Throwable e) {
       LOG.fatal(StringUtils.stringifyException(e));
       System.exit(-1);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Sep 13 13:24:10
2007
@@ -265,7 +265,7 @@
   public LocalJobRunner(JobConf conf) throws IOException {
     this.fs = FileSystem.get(conf);
     this.conf = conf;
-    myMetrics = new JobTrackerMetrics(new JobConf(conf));
+    myMetrics = new JobTrackerMetrics(null, new JobConf(conf));
   }
 
   // JobSubmissionProtocol methods
@@ -316,7 +316,8 @@
   }
   
   public ClusterStatus getClusterStatus() {
-    return new ClusterStatus(1, map_tasks, reduce_tasks, 1);
+    return new ClusterStatus(1, map_tasks, reduce_tasks, 1, 
+                             JobTracker.State.RUNNING);
   }
 
   public JobStatus[] jobsToComplete() {return null;}

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Thu Sep 13 13:24:10
2007
@@ -20,11 +20,15 @@
 import java.io.*;
 import java.util.*;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /**
  * This class creates a single-process Map-Reduce cluster for junit testing.
  * One thread is created for each server.
  */
 public class MiniMRCluster {
+  private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
     
   private Thread jobTrackerThread;
   private JobTrackerRunner jobTracker;
@@ -43,19 +47,20 @@
    * An inner class that runs a job tracker.
    */
   class JobTrackerRunner implements Runnable {
+    private JobTracker tracker = null;
 
     JobConf jc = null;
         
     public boolean isUp() {
-      return (JobTracker.getTracker() != null);
+      return (tracker != null);
     }
         
     public int getJobTrackerPort() {
-      return JobTracker.getAddress(jc).getPort();
+      return tracker.getTrackerPort();
     }
 
     public int getJobTrackerInfoPort() {
-      return jc.getInt("mapred.job.tracker.info.port", 50030);
+      return tracker.getInfoPort();
     }
         
     /**
@@ -65,10 +70,10 @@
       try {
         jc = createJobConf();
         jc.set("mapred.local.dir","build/test/mapred/local");
-        JobTracker.startTracker(jc);
+        tracker = JobTracker.startTracker(jc);
+        tracker.offerService();
       } catch (Throwable e) {
-        System.err.println("Job tracker crashed:");
-        e.printStackTrace();
+        LOG.error("Job tracker crashed", e);
       }
     }
         
@@ -77,10 +82,11 @@
      */
     public void shutdown() {
       try {
-        JobTracker.stopTracker();
+        if (tracker != null) {
+          tracker.stopTracker();
+        }
       } catch (Throwable e) {
-        System.err.println("Unable to shut down job tracker:");
-        e.printStackTrace();
+        LOG.error("Problem shutting down job tracker", e);
       }
     }
   }
@@ -91,16 +97,40 @@
   class TaskTrackerRunner implements Runnable {
     volatile TaskTracker tt;
     int trackerId;
+    JobConf conf = createJobConf();
     // the localDirs for this taskTracker
-    String[] localDir;
+    String[] localDirs;
     volatile boolean isInitialized = false;
     volatile boolean isDead = false;
-    int numDir;       
-    TaskTrackerRunner(int trackerId, int numDir) {
+    int numDir;
+
+    TaskTrackerRunner(int trackerId, int numDir) throws IOException {
       this.trackerId = trackerId;
       this.numDir = numDir;
-      // a maximum of 10 local dirs can be specified in MinMRCluster
-      localDir = new String[10];
+      localDirs = new String[numDir];
+      conf = createJobConf();
+      conf.setInt("mapred.task.tracker.info.port", 0);
+      conf.setInt("mapred.task.tracker.report.port", taskTrackerPort);
+      File localDirBase = 
+        new File(conf.get("mapred.local.dir")).getAbsoluteFile();
+      localDirBase.mkdirs();
+      StringBuffer localPath = new StringBuffer();
+      for(int i=0; i < numDir; ++i) {
+        File ttDir = new File(localDirBase, 
+                              Integer.toString(trackerId) + "_" + 0);
+        if (!ttDir.mkdirs()) {
+          if (!ttDir.isDirectory()) {
+            throw new IOException("Mkdirs failed to create " + ttDir);
+          }
+        }
+        localDirs[i] = ttDir.toString();
+        if (i != 0) {
+          localPath.append(",");
+        }
+        localPath.append(localDirs[i]);
+      }
+      conf.set("mapred.local.dir", localPath.toString());
+      LOG.info("mapred.local.dir is " +  localPath);
     }
         
     /**
@@ -108,40 +138,13 @@
      */
     public void run() {
       try {
-        JobConf jc = createJobConf();
-        jc.setInt("mapred.task.tracker.info.port", 0);
-        jc.setInt("mapred.task.tracker.report.port", taskTrackerPort);
-        File localDir = new File(jc.get("mapred.local.dir"));
-        String mapredDir = "";
-        File ttDir = new File(localDir, Integer.toString(trackerId) + "_" + 0);
-        if (!ttDir.mkdirs()) {
-          if (!ttDir.isDirectory()) {
-            throw new IOException("Mkdirs failed to create " + ttDir.toString());
-          }
-        }
-        this.localDir[0] = ttDir.getAbsolutePath();
-        mapredDir = ttDir.getAbsolutePath();
-        for (int i = 1; i < numDir; i++){
-          ttDir = new File(localDir, Integer.toString(trackerId) + "_" + i);
-          ttDir.mkdirs();
-          if (!ttDir.mkdirs()) {
-            if (!ttDir.isDirectory()) {
-              throw new IOException("Mkdirs failed to create " + ttDir.toString());
-            }
-          }
-          this.localDir[i] = ttDir.getAbsolutePath();
-          mapredDir = mapredDir + "," + ttDir.getAbsolutePath();
-        }
-        jc.set("mapred.local.dir", mapredDir);
-        System.out.println("mapred.local.dir is " +  mapredDir);
-        tt = new TaskTracker(jc);
+        tt = new TaskTracker(conf);
         isInitialized = true;
         tt.run();
       } catch (Throwable e) {
         isDead = true;
         tt = null;
-        System.err.println("Task tracker crashed:");
-        e.printStackTrace();
+        LOG.error("task tracker " + trackerId + " crashed", e);
       }
     }
         
@@ -152,11 +155,11 @@
      * @return the absolute pathname
      */
     public String getLocalDir() {
-      return localDir[0];
+      return localDirs[0];
     }
        
     public String[] getLocalDirs(){
-      return localDir;
+      return localDirs;
     } 
     /**
      * Shut down the server and wait for it to finish.
@@ -166,8 +169,8 @@
         try {
           tt.shutdown();
         } catch (Throwable e) {
-          System.err.println("Unable to shut down task tracker:");
-          e.printStackTrace();
+          LOG.error("task tracker " + trackerId + " could not shut down",
+                    e);
         }
       }
     }
@@ -198,10 +201,10 @@
       TaskTrackerRunner runner = (TaskTrackerRunner) itr.next();
       while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle())) {
         if (!runner.isInitialized) {
-          System.out.println("Waiting for task tracker to start.");
+          LOG.info("Waiting for task tracker to start.");
         } else {
-          System.out.println("Waiting for task tracker " + runner.tt.getName() +
-                             " to be idle.");
+          LOG.info("Waiting for task tracker " + runner.tt.getName() +
+                   " to be idle.");
         }
         try {
           Thread.sleep(1000);
@@ -270,6 +273,19 @@
     jobTracker = new JobTrackerRunner();
     jobTrackerThread = new Thread(jobTracker);
         
+    jobTrackerThread.start();
+    while (!jobTracker.isUp()) {
+      try {                                     // let daemons get started
+        LOG.info("Waiting for JobTracker to start...");
+        Thread.sleep(1000);
+      } catch(InterruptedException e) {
+      }
+    }
+        
+    // Set the configuration for the task-trackers
+    this.jobTrackerPort = jobTracker.getJobTrackerPort();
+    this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();
+
     // Create the TaskTrackers
     for (int idx = 0; idx < numTaskTrackers; idx++) {
       TaskTrackerRunner taskTracker = new TaskTrackerRunner(idx, numDir);
@@ -286,19 +302,6 @@
       }
     }
         
-    jobTrackerThread.start();
-    while (!jobTracker.isUp()) {
-      try {                                     // let daemons get started
-        System.err.println("Waiting for JobTracker to start...");
-        Thread.sleep(1000);
-      } catch(InterruptedException e) {
-      }
-    }
-        
-    // Set the configuration for the task-trackers
-    this.jobTrackerPort = jobTracker.getJobTrackerPort();
-    this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();
-
     if (!taskTrackerFirst) {
       for (Thread taskTrackerThread : taskTrackerThreadList){
         taskTrackerThread.start();
@@ -323,7 +326,7 @@
         try {
           taskTrackerThread.join();
         } catch (InterruptedException ex) {
-          ex.printStackTrace();
+          LOG.error("Problem shutting down task tracker", ex);
         }
       }
       jobTracker.shutdown();
@@ -331,7 +334,7 @@
       try {
         jobTrackerThread.join();
       } catch (InterruptedException ex) {
-        ex.printStackTrace();
+        LOG.error("Problem waiting for job tracker to finish", ex);
       }
     } finally {
       File configDir = new File("build", "minimr");
@@ -341,11 +344,11 @@
   }
     
   public static void main(String[] args) throws IOException {
-    System.out.println("Bringing up Jobtracker and tasktrackers.");
+    LOG.info("Bringing up Jobtracker and tasktrackers.");
     MiniMRCluster mr = new MiniMRCluster(4, "local", 1);
-    System.out.println("JobTracker and TaskTrackers are up.");
+    LOG.info("JobTracker and TaskTrackers are up.");
     mr.shutdown();
-    System.out.println("JobTracker and TaskTrackers brought down.");
+    LOG.info("JobTracker and TaskTrackers brought down.");
   }
 }
 

Modified: lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp Thu Sep 13 13:24:10 2007
@@ -8,11 +8,12 @@
   import="org.apache.hadoop.util.*"
 %>
 
-<%!
-  JobTracker tracker = JobTracker.getTracker();
+<%
+  JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
   String trackerName = 
            StringUtils.simpleHostname(tracker.getJobTrackerMachine());
-          
+%>
+<%!       
   private void printBlackListedTrackers(JspWriter out, 
                              JobInProgress job) throws IOException {
     Map<String, Integer> trackerErrors = job.getTaskTrackerErrors();

Modified: lucene/hadoop/trunk/src/webapps/job/jobconf.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobconf.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobconf.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobconf.jsp Thu Sep 13 13:24:10 2007
@@ -10,6 +10,7 @@
 
 
 <%
+  JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
   String jobId = request.getParameter("jobid");
   if (jobId == null) {
     out.println("<h2>Missing 'jobid' for fetching job configuration!</h2>");
@@ -25,8 +26,6 @@
 <h2>Job Configuration: JobId - <%= jobId %></h2><br>
 
 <%
-  JobTracker tracker = JobTracker.getTracker();
-  
   JobInProgress job = (JobInProgress)tracker.getJob(jobId);
   if (job == null) {
     out.print("<h4>Job '" + jobId + "' not found!</h4><br>\n");

Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Thu Sep 13 13:24:10 2007
@@ -11,15 +11,16 @@
   import="org.apache.hadoop.dfs.JspHelper"
 %>
 
+<%
+  JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+  String trackerName = 
+           StringUtils.simpleHostname(tracker.getJobTrackerMachine());
+%>
 <%!
   
 	private static final String PRIVATE_ACTIONS_KEY 
 		= "webinterface.private.actions";
-
-  JobTracker tracker = JobTracker.getTracker();
-  String trackerName = 
-           StringUtils.simpleHostname(tracker.getJobTrackerMachine());
-  
+ 
   private void printTaskSummary(JspWriter out,
                                 String jobId,
                                 String kind,

Modified: lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp Thu Sep 13 13:24:10 2007
@@ -8,12 +8,14 @@
   import="org.apache.hadoop.util.*"
 %>
 
-<%!
-  JobTracker tracker = JobTracker.getTracker();
+<%
+  JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
   String trackerName = 
            StringUtils.simpleHostname(tracker.getJobTrackerMachine());
-  
+%>
+<%! 
   private void printFailedAttempts(JspWriter out,
+                                   JobTracker tracker,
                                    String jobId,
                                    TaskInProgress tip,
                                    TaskStatus.State failState) throws IOException {
@@ -75,6 +77,7 @@
   }
              
   private void printFailures(JspWriter out, 
+                             JobTracker tracker,
                              String jobId,
                              String kind, 
                              String cause) throws IOException {
@@ -122,13 +125,13 @@
     if (includeMap) {
       TaskInProgress[] tips = job.getMapTasks();
       for(int i=0; i < tips.length; ++i) {
-        printFailedAttempts(out, jobId, tips[i], state);
+        printFailedAttempts(out, tracker, jobId, tips[i], state);
       }
     }
     if (includeReduce) {
       TaskInProgress[] tips = job.getReduceTasks();
       for(int i=0; i < tips.length; ++i) {
-        printFailedAttempts(out, jobId, tips[i], state);
+        printFailedAttempts(out, tracker, jobId, tips[i], state);
       }
     }
     out.print("</table>\n");
@@ -148,7 +151,7 @@
 failures on <a href="jobtracker.jsp"><%=trackerName%></a></h1>
 
 <% 
-    printFailures(out, jobId, kind, cause); 
+    printFailures(out, tracker, jobId, kind, cause); 
 %>
 
 <hr>

Modified: lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp Thu Sep 13 13:24:10 2007
@@ -11,15 +11,15 @@
 %>
 <%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ;
%>
 <%
+  JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+  String trackerName = 
+           StringUtils.simpleHostname(tracker.getJobTrackerMachine());
   String jobid = request.getParameter("jobid");
   String type = request.getParameter("type");
   String pagenum = request.getParameter("pagenum");
   int pnum = Integer.parseInt(pagenum);
   int next_page = pnum+1;
   int numperpage = 2000;
-  JobTracker tracker = JobTracker.getTracker();
-  String trackerLabel = 
-           StringUtils.simpleHostname(tracker.getJobTrackerMachine());
   JobInProgress job = (JobInProgress) tracker.getJob(jobid);
   JobProfile profile = (job != null) ? (job.getProfile()) : null;
   JobStatus status = (job != null) ? (job.getStatus()) : null;
@@ -37,12 +37,12 @@
 
 <html>
   <head>
-    <title>Hadoop <%=type%> task list for <%=jobid%> on <%=trackerLabel%></title>
+    <title>Hadoop <%=type%> task list for <%=jobid%> on <%=trackerName%></title>
   </head>
 <body>
 <h1>Hadoop <%=type%> task list for 
 <a href="jobdetails.jsp?jobid=<%=jobid%>"><%=jobid%></a> on 
-<a href="jobtracker.jsp"><%=trackerLabel%></a></h1>
+<a href="jobtracker.jsp"><%=trackerName%></a></h1>
 <%
   if (job == null) {
     out.print("<b>Job " + jobid + " not found.</b><br>\n");

Modified: lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp Thu Sep 13 13:24:10 2007
@@ -8,10 +8,12 @@
   import="org.apache.hadoop.mapred.*"
   import="org.apache.hadoop.util.*"
 %>
-<%!
-  JobTracker tracker = JobTracker.getTracker();
-  String trackerLabel = 
+<%
+  JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+  String trackerName = 
            StringUtils.simpleHostname(tracker.getJobTrackerMachine());
+%>
+<%!
   private static DecimalFormat percentFormat = new DecimalFormat("##0.00");
 
   public void generateJobTable(JspWriter out, String label, Vector jobs, int refresh) throws
IOException {
@@ -60,7 +62,8 @@
       out.print("</center>\n");
   }
 
-  public void generateSummaryTable(JspWriter out) throws IOException {
+  public void generateSummaryTable(JspWriter out,
+                                   JobTracker tracker) throws IOException {
     ClusterStatus status = tracker.getClusterStatus();
     out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n"+
               "<tr><th>Maps</th><th>Reduces</th>" + 
@@ -76,11 +79,12 @@
 
 <html>
 
-<title><%= trackerLabel %> Hadoop Map/Reduce Administration</title>
+<title><%= trackerName %> Hadoop Map/Reduce Administration</title>
 
 <body>
-<h1><%= trackerLabel %> Hadoop Map/Reduce Administration</h1>
+<h1><%= trackerName %> Hadoop Map/Reduce Administration</h1>
 
+<b>State:</b> <%= tracker.getClusterStatus().getJobTrackerState() %><br>
 <b>Started:</b> <%= new Date(tracker.getStartTime())%><br>
 <b>Version:</b> <%= VersionInfo.getVersion()%>,
                 r<%= VersionInfo.getRevision()%><br>
@@ -92,7 +96,7 @@
 <h2>Cluster Summary</h2>
 <center>
 <% 
-   generateSummaryTable(out); 
+   generateSummaryTable(out, tracker); 
 %>
 </center>
 <hr>

Modified: lucene/hadoop/trunk/src/webapps/job/machines.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/machines.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/machines.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/machines.jsp Thu Sep 13 13:24:10 2007
@@ -8,12 +8,14 @@
   import="org.apache.hadoop.mapred.*"
   import="org.apache.hadoop.util.*"
 %>
-<%!
-  JobTracker tracker = JobTracker.getTracker();
-  String trackerLabel = 
+<%
+  JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+  String trackerName = 
            StringUtils.simpleHostname(tracker.getJobTrackerMachine());
-
-  public void generateTaskTrackerTable(JspWriter out) throws IOException {
+%>
+<%!
+  public void generateTaskTrackerTable(JspWriter out,
+                                       JobTracker tracker) throws IOException {
     Collection c = tracker.taskTrackers();
 
     if (c.size() == 0) {
@@ -63,14 +65,14 @@
 
 <html>
 
-<title><%=trackerLabel%> Hadoop Machine List</title>
+<title><%=trackerName%> Hadoop Machine List</title>
 
 <body>
-<h1><a href="jobtracker.jsp"><%=trackerLabel%></a> Hadoop Machine
List</h1>
+<h1><a href="jobtracker.jsp"><%=trackerName%></a> Hadoop Machine
List</h1>
 
 <h2>Task Trackers</h2>
 <%
-  generateTaskTrackerTable(out);
+  generateTaskTrackerTable(out, tracker);
 %>
 
 <hr>

Modified: lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Thu Sep 13 13:24:10 2007
@@ -27,8 +27,9 @@
         + "\">Cancel</a></td></tr></table></body></html>");
   }%>
 <%
-    JobTracker tracker = JobTracker.getTracker();
+    JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
     String jobid = request.getParameter("jobid");
+    JobInProgress job = (JobInProgress) tracker.getJob(jobid);
     String tipid = request.getParameter("tipid");
     String taskid = request.getParameter("taskid");
 
@@ -57,7 +58,6 @@
         }
       }
     }
-    JobInProgress job = (JobInProgress) tracker.getJob(jobid);
     TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(jobid, tipid)
         : null;
 %>

Modified: lucene/hadoop/trunk/src/webapps/job/taskstats.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskstats.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/taskstats.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/taskstats.jsp Thu Sep 13 13:24:10 2007
@@ -12,8 +12,10 @@
   import="org.apache.hadoop.util.*"
 %>
 <%
+  JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+  String trackerName = 
+           StringUtils.simpleHostname(tracker.getJobTrackerMachine());
   String jobid = request.getParameter("jobid");
-  JobTracker tracker = JobTracker.getTracker();
   JobInProgress job = (JobInProgress) tracker.getJob(jobid);
   String tipid = request.getParameter("tipid");
   String taskid = request.getParameter("taskid");



Mime
View raw message