Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 8212 invoked from network); 13 Sep 2007 20:24:35 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 13 Sep 2007 20:24:35 -0000 Received: (qmail 7541 invoked by uid 500); 13 Sep 2007 20:24:28 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 7523 invoked by uid 500); 13 Sep 2007 20:24:28 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 7514 invoked by uid 99); 13 Sep 2007 20:24:28 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Sep 2007 13:24:28 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Sep 2007 20:24:33 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A08921A9832; Thu, 13 Sep 2007 13:24:13 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r575438 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/webapps/job/ Date: Thu, 13 Sep 2007 20:24:11 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20070913202413.A08921A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Thu Sep 13 13:24:10 2007 New Revision: 575438 URL: http://svn.apache.org/viewvc?rev=575438&view=rev Log: HADOOP-1819. Jobtracker cleanups, including binding ports before clearing state directories, so that inadvertantly starting a second jobtracker doesn't trash one that's already running. Contributed by Owen. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp lucene/hadoop/trunk/src/webapps/job/jobconf.jsp lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp lucene/hadoop/trunk/src/webapps/job/machines.jsp lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp lucene/hadoop/trunk/src/webapps/job/taskstats.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Sep 13 13:24:10 2007 @@ -218,6 +218,12 @@ HADOOP-1718. Add ant targets for measuring code coverage with clover. (simonwillnauer via nigel) + HADOOP-1819. Jobtracker cleanups, including binding ports before + clearing state directories, so that inadvertently starting a + second jobtracker doesn't trash one that's already running. + (omalley via cutting) + + Release 0.14.1 - 2007-09-04 BUG FIXES Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Thu Sep 13 13:24:10 2007 @@ -22,8 +22,6 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.BufferedOutputStream; -import java.io.StringWriter; -import java.io.PrintWriter; import java.io.ByteArrayInputStream; import java.nio.ByteBuffer; @@ -32,9 +30,14 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.UnknownHostException; import java.util.Collections; import java.util.LinkedList; @@ -44,7 +47,6 @@ import org.apache.commons.logging.*; -import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -142,7 +144,35 @@ private Listener listener = null; private int numConnections = 0; private Handler[] handlers = null; - + + /** + * A convience method to bind to a given address and report + * better exceptions if the address is not a valid host. + * @param socket the socket to bind + * @param address the address to bind to + * @param backlog the number of connections allowed in the queue + * @throws BindException if the address can't be bound + * @throws UnknownHostException if the address isn't a valid host name + * @throws IOException other random errors from bind + */ + static void bind(ServerSocket socket, InetSocketAddress address, + int backlog) throws IOException { + try { + socket.bind(address, backlog); + } catch (BindException e) { + throw new BindException("Problem binding to " + address); + } catch (SocketException e) { + // If they try to bind to a different host's address, give a better + // error message. + if ("Unresolved address".equals(e.getMessage())) { + throw new UnknownHostException("Invalid hostname for server: " + + address.getHostName()); + } else { + throw e; + } + } + } + /** A call queued for handling. */ private static class Call { private int id; // the client's call id @@ -182,7 +212,7 @@ acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port - acceptChannel.socket().bind(address, backlogLength); + bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java Thu Sep 13 13:24:10 2007 @@ -23,34 +23,28 @@ import java.io.IOException; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableFactory; -import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableUtils; /** * Summarizes the size and current state of the cluster. */ public class ClusterStatus implements Writable { - static { // register a ctor - WritableFactories.setFactory - (ClusterStatus.class, - new WritableFactory() { - public Writable newInstance() { return new ClusterStatus(); } - }); - } - private int task_trackers; private int map_tasks; private int reduce_tasks; private int max_tasks; + private JobTracker.State state; ClusterStatus() {} - ClusterStatus(int trackers, int maps, int reduces, int max) { + ClusterStatus(int trackers, int maps, int reduces, int max, + JobTracker.State state) { task_trackers = trackers; map_tasks = maps; reduce_tasks = reduces; max_tasks = max; + this.state = state; } @@ -81,12 +75,17 @@ public int getMaxTasks() { return max_tasks; } - + + public JobTracker.State getJobTrackerState() { + return state; + } + public void write(DataOutput out) throws IOException { out.writeInt(task_trackers); out.writeInt(map_tasks); out.writeInt(reduce_tasks); out.writeInt(max_tasks); + WritableUtils.writeEnum(out, state); } public void readFields(DataInput in) throws IOException { @@ -94,6 +93,7 @@ map_tasks = in.readInt(); reduce_tasks = in.readInt(); max_tasks = in.readInt(); + state = WritableUtils.readEnum(in, JobTracker.State.class); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Thu Sep 13 13:24:10 2007 @@ -32,6 +32,7 @@ *Changing the versionID to 2L since the getTaskCompletionEvents method has *changed. *Changed to 4 since killTask(String,boolean) is added + *Version 4: added jobtracker state to ClusterStatus */ public static final long versionID = 4L; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Sep 13 13:24:10 2007 @@ -19,7 +19,9 @@ import java.io.IOException; +import java.net.BindException; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.text.NumberFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -66,6 +68,17 @@ static float TASK_ALLOC_EPSILON; static float PAD_FRACTION; static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3; + public static enum State { INITIALIZING, RUNNING } + State state = State.INITIALIZING; + + /** + * A client tried to submit a job before the Job Tracker was ready. + */ + public static class IllegalStateException extends IOException { + public IllegalStateException(String msg) { + super(msg); + } + } /** * The maximum no. of 'completed' (successful/failed/killed) @@ -85,9 +98,6 @@ private int nextJobId = 1; public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobTracker"); - - private static JobTracker tracker = null; - private static boolean runTracker = true; /** * Start the JobTracker with given configuration. @@ -99,17 +109,18 @@ * @param conf configuration for the JobTracker. * @throws IOException */ - public static void startTracker(JobConf conf) throws IOException { - if (tracker != null) - throw new IOException("JobTracker already running."); - runTracker = true; - while (runTracker) { + public static JobTracker startTracker(JobConf conf) throws IOException { + JobTracker result = null; + while (true) { try { - tracker = new JobTracker(conf); + result = new JobTracker(conf); break; - } catch (VersionMismatch v) { - // Can't recover from a version mismatch. Avoid the retry loop and re-throw - throw v; + } catch (VersionMismatch e) { + throw e; + } catch (BindException e) { + throw e; + } catch (UnknownHostException e) { + throw e; } catch (IOException e) { LOG.warn("Error starting tracker: " + StringUtils.stringifyException(e)); @@ -117,25 +128,17 @@ try { Thread.sleep(1000); } catch (InterruptedException e) { - } + } } - if (runTracker) { + if (result != null) { JobEndNotifier.startNotifier(); - tracker.offerService(); } + return result; } - public static JobTracker getTracker() { - return tracker; - } - - public static void stopTracker() throws IOException { - runTracker = false; - if (tracker != null) { - JobEndNotifier.stopNotifier(); - tracker.close(); - tracker = null; - } + public void stopTracker() throws IOException { + JobEndNotifier.stopNotifier(); + close(); } public long getProtocolVersion(String protocol, @@ -426,8 +429,9 @@ private int numReduceTasksCompleted = 0; private int numJobsSubmitted = 0; private int numJobsCompleted = 0; + private JobTracker tracker; - JobTrackerMetrics(JobConf conf) { + JobTrackerMetrics(JobTracker tracker, JobConf conf) { String sessionId = conf.getSessionId(); // Initiate JVM Metrics JvmMetrics.init("JobTracker", sessionId); @@ -435,6 +439,7 @@ MetricsContext context = MetricsUtil.getContext("mapred"); metricsRecord = MetricsUtil.createRecord(context, "jobtracker"); metricsRecord.setTag("sessionId", sessionId); + this.tracker = tracker; context.registerUpdater(this); } @@ -459,14 +464,14 @@ numJobsCompleted = 0; } metricsRecord.update(); - + if (tracker != null) { for (JobInProgress jip : tracker.getRunningJobs()) { jip.updateMetrics(); } } } - + synchronized void launchMap() { ++numMapTasksLaunched; } @@ -629,13 +634,6 @@ JobConf jobConf = new JobConf(conf); this.systemDir = jobConf.getSystemDir(); this.fs = FileSystem.get(conf); - fs.delete(systemDir); - if (!fs.mkdirs(systemDir)) { - throw new IOException("Mkdirs failed to create " + systemDir.toString()); - } - - // Same with 'localDir' except it's always on the local disk. - jobConf.deleteLocalFiles(SUBDIR); // Read the hosts/exclude files to restrict access to the jobtracker. this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""), @@ -648,23 +646,26 @@ int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10); this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf); this.interTrackerServer.start(); - Properties p = System.getProperties(); - for (Iterator it = p.keySet().iterator(); it.hasNext();) { - String key = (String) it.next(); - String val = (String) p.getProperty(key); - LOG.info("Property '" + key + "' is " + val); + if (LOG.isDebugEnabled()) { + Properties p = System.getProperties(); + for (Iterator it = p.keySet().iterator(); it.hasNext();) { + String key = (String) it.next(); + String val = (String) p.getProperty(key); + LOG.debug("Property '" + key + "' is " + val); + } } this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030); this.infoBindAddress = conf.get("mapred.job.tracker.info.bindAddress","0.0.0.0"); - this.infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false); - this.infoServer.start(); + infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false); + infoServer.setAttribute("job.tracker", this); + infoServer.start(); this.startTime = System.currentTimeMillis(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm"); trackerIdentifier = dateFormat.format(new Date()); - myMetrics = new JobTrackerMetrics(jobConf); + myMetrics = new JobTrackerMetrics(this, jobConf); this.expireTrackersThread = new Thread(this.expireTrackers, "expireTrackers"); @@ -683,6 +684,25 @@ this.infoPort = this.infoServer.getPort(); this.conf.setInt("mapred.job.tracker.info.port", this.infoPort); LOG.info("JobTracker webserver: " + this.infoServer.getPort()); + + while (true) { + try { + fs.delete(systemDir); + if (fs.mkdirs(systemDir)) { + break; + } + LOG.error("Mkdirs failed to create " + systemDir); + } catch (IOException ie) { + LOG.info("problem cleaning system directory: " + systemDir, ie); + } + } + + // Same with 'localDir' except it's always on the local disk. + jobConf.deleteLocalFiles(SUBDIR); + synchronized (this) { + state = State.RUNNING; + } + LOG.info("Starting RUNNING"); } public static InetSocketAddress getAddress(Configuration conf) { @@ -1456,13 +1476,21 @@ //////////////////////////////////////////////////// /** + * Make sure the JobTracker is done initializing. + */ + private synchronized void ensureRunning() throws IllegalStateException { + if (state != State.RUNNING) { + throw new IllegalStateException("Job tracker still initializing"); + } + } + + /** * Allocates a new JobId string. */ - public String getNewJobId() { - synchronized (this) { - return "job_" + getTrackerIdentifier() + "_" + + public synchronized String getNewJobId() throws IOException { + ensureRunning(); + return "job_" + getTrackerIdentifier() + "_" + idFormat.format(nextJobId++); - } } /** @@ -1478,6 +1506,7 @@ * the right TaskTracker/Block mapping. */ public synchronized JobStatus submitJob(String jobFile) throws IOException { + ensureRunning(); totalSubmissions++; JobInProgress job = new JobInProgress(jobFile, this, this.conf); synchronized (jobs) { @@ -1526,7 +1555,8 @@ return new ClusterStatus(taskTrackers.size(), totalMaps, totalReduces, - maxCurrentTasks); + maxCurrentTasks, + state); } } @@ -1816,7 +1846,8 @@ } try { - startTracker(new JobConf()); + JobTracker tracker = startTracker(new JobConf()); + tracker.offerService(); } catch (Throwable e) { LOG.fatal(StringUtils.stringifyException(e)); System.exit(-1); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Sep 13 13:24:10 2007 @@ -265,7 +265,7 @@ public LocalJobRunner(JobConf conf) throws IOException { this.fs = FileSystem.get(conf); this.conf = conf; - myMetrics = new JobTrackerMetrics(new JobConf(conf)); + myMetrics = new JobTrackerMetrics(null, new JobConf(conf)); } // JobSubmissionProtocol methods @@ -316,7 +316,8 @@ } public ClusterStatus getClusterStatus() { - return new ClusterStatus(1, map_tasks, reduce_tasks, 1); + return new ClusterStatus(1, map_tasks, reduce_tasks, 1, + JobTracker.State.RUNNING); } public JobStatus[] jobsToComplete() {return null;} Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Thu Sep 13 13:24:10 2007 @@ -20,11 +20,15 @@ import java.io.*; import java.util.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /** * This class creates a single-process Map-Reduce cluster for junit testing. * One thread is created for each server. */ public class MiniMRCluster { + private static final Log LOG = LogFactory.getLog(MiniMRCluster.class); private Thread jobTrackerThread; private JobTrackerRunner jobTracker; @@ -43,19 +47,20 @@ * An inner class that runs a job tracker. */ class JobTrackerRunner implements Runnable { + private JobTracker tracker = null; JobConf jc = null; public boolean isUp() { - return (JobTracker.getTracker() != null); + return (tracker != null); } public int getJobTrackerPort() { - return JobTracker.getAddress(jc).getPort(); + return tracker.getTrackerPort(); } public int getJobTrackerInfoPort() { - return jc.getInt("mapred.job.tracker.info.port", 50030); + return tracker.getInfoPort(); } /** @@ -65,10 +70,10 @@ try { jc = createJobConf(); jc.set("mapred.local.dir","build/test/mapred/local"); - JobTracker.startTracker(jc); + tracker = JobTracker.startTracker(jc); + tracker.offerService(); } catch (Throwable e) { - System.err.println("Job tracker crashed:"); - e.printStackTrace(); + LOG.error("Job tracker crashed", e); } } @@ -77,10 +82,11 @@ */ public void shutdown() { try { - JobTracker.stopTracker(); + if (tracker != null) { + tracker.stopTracker(); + } } catch (Throwable e) { - System.err.println("Unable to shut down job tracker:"); - e.printStackTrace(); + LOG.error("Problem shutting down job tracker", e); } } } @@ -91,16 +97,40 @@ class TaskTrackerRunner implements Runnable { volatile TaskTracker tt; int trackerId; + JobConf conf = createJobConf(); // the localDirs for this taskTracker - String[] localDir; + String[] localDirs; volatile boolean isInitialized = false; volatile boolean isDead = false; - int numDir; - TaskTrackerRunner(int trackerId, int numDir) { + int numDir; + + TaskTrackerRunner(int trackerId, int numDir) throws IOException { this.trackerId = trackerId; this.numDir = numDir; - // a maximum of 10 local dirs can be specified in MinMRCluster - localDir = new String[10]; + localDirs = new String[numDir]; + conf = createJobConf(); + conf.setInt("mapred.task.tracker.info.port", 0); + conf.setInt("mapred.task.tracker.report.port", taskTrackerPort); + File localDirBase = + new File(conf.get("mapred.local.dir")).getAbsoluteFile(); + localDirBase.mkdirs(); + StringBuffer localPath = new StringBuffer(); + for(int i=0; i < numDir; ++i) { + File ttDir = new File(localDirBase, + Integer.toString(trackerId) + "_" + 0); + if (!ttDir.mkdirs()) { + if (!ttDir.isDirectory()) { + throw new IOException("Mkdirs failed to create " + ttDir); + } + } + localDirs[i] = ttDir.toString(); + if (i != 0) { + localPath.append(","); + } + localPath.append(localDirs[i]); + } + conf.set("mapred.local.dir", localPath.toString()); + LOG.info("mapred.local.dir is " + localPath); } /** @@ -108,40 +138,13 @@ */ public void run() { try { - JobConf jc = createJobConf(); - jc.setInt("mapred.task.tracker.info.port", 0); - jc.setInt("mapred.task.tracker.report.port", taskTrackerPort); - File localDir = new File(jc.get("mapred.local.dir")); - String mapredDir = ""; - File ttDir = new File(localDir, Integer.toString(trackerId) + "_" + 0); - if (!ttDir.mkdirs()) { - if (!ttDir.isDirectory()) { - throw new IOException("Mkdirs failed to create " + ttDir.toString()); - } - } - this.localDir[0] = ttDir.getAbsolutePath(); - mapredDir = ttDir.getAbsolutePath(); - for (int i = 1; i < numDir; i++){ - ttDir = new File(localDir, Integer.toString(trackerId) + "_" + i); - ttDir.mkdirs(); - if (!ttDir.mkdirs()) { - if (!ttDir.isDirectory()) { - throw new IOException("Mkdirs failed to create " + ttDir.toString()); - } - } - this.localDir[i] = ttDir.getAbsolutePath(); - mapredDir = mapredDir + "," + ttDir.getAbsolutePath(); - } - jc.set("mapred.local.dir", mapredDir); - System.out.println("mapred.local.dir is " + mapredDir); - tt = new TaskTracker(jc); + tt = new TaskTracker(conf); isInitialized = true; tt.run(); } catch (Throwable e) { isDead = true; tt = null; - System.err.println("Task tracker crashed:"); - e.printStackTrace(); + LOG.error("task tracker " + trackerId + " crashed", e); } } @@ -152,11 +155,11 @@ * @return the absolute pathname */ public String getLocalDir() { - return localDir[0]; + return localDirs[0]; } public String[] getLocalDirs(){ - return localDir; + return localDirs; } /** * Shut down the server and wait for it to finish. @@ -166,8 +169,8 @@ try { tt.shutdown(); } catch (Throwable e) { - System.err.println("Unable to shut down task tracker:"); - e.printStackTrace(); + LOG.error("task tracker " + trackerId + " could not shut down", + e); } } } @@ -198,10 +201,10 @@ TaskTrackerRunner runner = (TaskTrackerRunner) itr.next(); while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle())) { if (!runner.isInitialized) { - System.out.println("Waiting for task tracker to start."); + LOG.info("Waiting for task tracker to start."); } else { - System.out.println("Waiting for task tracker " + runner.tt.getName() + - " to be idle."); + LOG.info("Waiting for task tracker " + runner.tt.getName() + + " to be idle."); } try { Thread.sleep(1000); @@ -270,6 +273,19 @@ jobTracker = new JobTrackerRunner(); jobTrackerThread = new Thread(jobTracker); + jobTrackerThread.start(); + while (!jobTracker.isUp()) { + try { // let daemons get started + LOG.info("Waiting for JobTracker to start..."); + Thread.sleep(1000); + } catch(InterruptedException e) { + } + } + + // Set the configuration for the task-trackers + this.jobTrackerPort = jobTracker.getJobTrackerPort(); + this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort(); + // Create the TaskTrackers for (int idx = 0; idx < numTaskTrackers; idx++) { TaskTrackerRunner taskTracker = new TaskTrackerRunner(idx, numDir); @@ -286,19 +302,6 @@ } } - jobTrackerThread.start(); - while (!jobTracker.isUp()) { - try { // let daemons get started - System.err.println("Waiting for JobTracker to start..."); - Thread.sleep(1000); - } catch(InterruptedException e) { - } - } - - // Set the configuration for the task-trackers - this.jobTrackerPort = jobTracker.getJobTrackerPort(); - this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort(); - if (!taskTrackerFirst) { for (Thread taskTrackerThread : taskTrackerThreadList){ taskTrackerThread.start(); @@ -323,7 +326,7 @@ try { taskTrackerThread.join(); } catch (InterruptedException ex) { - ex.printStackTrace(); + LOG.error("Problem shutting down task tracker", ex); } } jobTracker.shutdown(); @@ -331,7 +334,7 @@ try { jobTrackerThread.join(); } catch (InterruptedException ex) { - ex.printStackTrace(); + LOG.error("Problem waiting for job tracker to finish", ex); } } finally { File configDir = new File("build", "minimr"); @@ -341,11 +344,11 @@ } public static void main(String[] args) throws IOException { - System.out.println("Bringing up Jobtracker and tasktrackers."); + LOG.info("Bringing up Jobtracker and tasktrackers."); MiniMRCluster mr = new MiniMRCluster(4, "local", 1); - System.out.println("JobTracker and TaskTrackers are up."); + LOG.info("JobTracker and TaskTrackers are up."); mr.shutdown(); - System.out.println("JobTracker and TaskTrackers brought down."); + LOG.info("JobTracker and TaskTrackers brought down."); } } Modified: lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp Thu Sep 13 13:24:10 2007 @@ -8,11 +8,12 @@ import="org.apache.hadoop.util.*" %> -<%! - JobTracker tracker = JobTracker.getTracker(); +<% + JobTracker tracker = (JobTracker) application.getAttribute("job.tracker"); String trackerName = StringUtils.simpleHostname(tracker.getJobTrackerMachine()); - +%> +<%! private void printBlackListedTrackers(JspWriter out, JobInProgress job) throws IOException { Map trackerErrors = job.getTaskTrackerErrors(); Modified: lucene/hadoop/trunk/src/webapps/job/jobconf.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobconf.jsp?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobconf.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobconf.jsp Thu Sep 13 13:24:10 2007 @@ -10,6 +10,7 @@ <% + JobTracker tracker = (JobTracker) application.getAttribute("job.tracker"); String jobId = request.getParameter("jobid"); if (jobId == null) { out.println("

Missing 'jobid' for fetching job configuration!

"); @@ -25,8 +26,6 @@

Job Configuration: JobId - <%= jobId %>


<% - JobTracker tracker = JobTracker.getTracker(); - JobInProgress job = (JobInProgress)tracker.getJob(jobId); if (job == null) { out.print("

Job '" + jobId + "' not found!


\n"); Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Thu Sep 13 13:24:10 2007 @@ -11,15 +11,16 @@ import="org.apache.hadoop.dfs.JspHelper" %> +<% + JobTracker tracker = (JobTracker) application.getAttribute("job.tracker"); + String trackerName = + StringUtils.simpleHostname(tracker.getJobTrackerMachine()); +%> <%! private static final String PRIVATE_ACTIONS_KEY = "webinterface.private.actions"; - - JobTracker tracker = JobTracker.getTracker(); - String trackerName = - StringUtils.simpleHostname(tracker.getJobTrackerMachine()); - + private void printTaskSummary(JspWriter out, String jobId, String kind, Modified: lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp Thu Sep 13 13:24:10 2007 @@ -8,12 +8,14 @@ import="org.apache.hadoop.util.*" %> -<%! - JobTracker tracker = JobTracker.getTracker(); +<% + JobTracker tracker = (JobTracker) application.getAttribute("job.tracker"); String trackerName = StringUtils.simpleHostname(tracker.getJobTrackerMachine()); - +%> +<%! private void printFailedAttempts(JspWriter out, + JobTracker tracker, String jobId, TaskInProgress tip, TaskStatus.State failState) throws IOException { @@ -75,6 +77,7 @@ } private void printFailures(JspWriter out, + JobTracker tracker, String jobId, String kind, String cause) throws IOException { @@ -122,13 +125,13 @@ if (includeMap) { TaskInProgress[] tips = job.getMapTasks(); for(int i=0; i < tips.length; ++i) { - printFailedAttempts(out, jobId, tips[i], state); + printFailedAttempts(out, tracker, jobId, tips[i], state); } } if (includeReduce) { TaskInProgress[] tips = job.getReduceTasks(); for(int i=0; i < tips.length; ++i) { - printFailedAttempts(out, jobId, tips[i], state); + printFailedAttempts(out, tracker, jobId, tips[i], state); } } out.print("\n"); @@ -148,7 +151,7 @@ failures on <%=trackerName%> <% - printFailures(out, jobId, kind, cause); + printFailures(out, tracker, jobId, kind, cause); %>
Modified: lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp Thu Sep 13 13:24:10 2007 @@ -11,15 +11,15 @@ %> <%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ; %> <% + JobTracker tracker = (JobTracker) application.getAttribute("job.tracker"); + String trackerName = + StringUtils.simpleHostname(tracker.getJobTrackerMachine()); String jobid = request.getParameter("jobid"); String type = request.getParameter("type"); String pagenum = request.getParameter("pagenum"); int pnum = Integer.parseInt(pagenum); int next_page = pnum+1; int numperpage = 2000; - JobTracker tracker = JobTracker.getTracker(); - String trackerLabel = - StringUtils.simpleHostname(tracker.getJobTrackerMachine()); JobInProgress job = (JobInProgress) tracker.getJob(jobid); JobProfile profile = (job != null) ? (job.getProfile()) : null; JobStatus status = (job != null) ? (job.getStatus()) : null; @@ -37,12 +37,12 @@ - Hadoop <%=type%> task list for <%=jobid%> on <%=trackerLabel%> + Hadoop <%=type%> task list for <%=jobid%> on <%=trackerName%>

Hadoop <%=type%> task list for <%=jobid%> on -<%=trackerLabel%>

+<%=trackerName%> <% if (job == null) { out.print("Job " + jobid + " not found.
\n"); Modified: lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp Thu Sep 13 13:24:10 2007 @@ -8,10 +8,12 @@ import="org.apache.hadoop.mapred.*" import="org.apache.hadoop.util.*" %> -<%! - JobTracker tracker = JobTracker.getTracker(); - String trackerLabel = +<% + JobTracker tracker = (JobTracker) application.getAttribute("job.tracker"); + String trackerName = StringUtils.simpleHostname(tracker.getJobTrackerMachine()); +%> +<%! private static DecimalFormat percentFormat = new DecimalFormat("##0.00"); public void generateJobTable(JspWriter out, String label, Vector jobs, int refresh) throws IOException { @@ -60,7 +62,8 @@ out.print("\n"); } - public void generateSummaryTable(JspWriter out) throws IOException { + public void generateSummaryTable(JspWriter out, + JobTracker tracker) throws IOException { ClusterStatus status = tracker.getClusterStatus(); out.print("\n"+ "" + @@ -76,11 +79,12 @@ -<%= trackerLabel %> Hadoop Map/Reduce Administration +<%= trackerName %> Hadoop Map/Reduce Administration -

<%= trackerLabel %> Hadoop Map/Reduce Administration

+

<%= trackerName %> Hadoop Map/Reduce Administration

+State: <%= tracker.getClusterStatus().getJobTrackerState() %>
Started: <%= new Date(tracker.getStartTime())%>
Version: <%= VersionInfo.getVersion()%>, r<%= VersionInfo.getRevision()%>
@@ -92,7 +96,7 @@

Cluster Summary

<% - generateSummaryTable(out); + generateSummaryTable(out, tracker); %>

Modified: lucene/hadoop/trunk/src/webapps/job/machines.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/machines.jsp?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/machines.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/machines.jsp Thu Sep 13 13:24:10 2007 @@ -8,12 +8,14 @@ import="org.apache.hadoop.mapred.*" import="org.apache.hadoop.util.*" %> -<%! - JobTracker tracker = JobTracker.getTracker(); - String trackerLabel = +<% + JobTracker tracker = (JobTracker) application.getAttribute("job.tracker"); + String trackerName = StringUtils.simpleHostname(tracker.getJobTrackerMachine()); - - public void generateTaskTrackerTable(JspWriter out) throws IOException { +%> +<%! + public void generateTaskTrackerTable(JspWriter out, + JobTracker tracker) throws IOException { Collection c = tracker.taskTrackers(); if (c.size() == 0) { @@ -63,14 +65,14 @@ -<%=trackerLabel%> Hadoop Machine List +<%=trackerName%> Hadoop Machine List -

<%=trackerLabel%> Hadoop Machine List

+

<%=trackerName%> Hadoop Machine List

Task Trackers

<% - generateTaskTrackerTable(out); + generateTaskTrackerTable(out, tracker); %>
Modified: lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Thu Sep 13 13:24:10 2007 @@ -27,8 +27,9 @@ + "\">Cancel
MapsReduces
"); }%> <% - JobTracker tracker = JobTracker.getTracker(); + JobTracker tracker = (JobTracker) application.getAttribute("job.tracker"); String jobid = request.getParameter("jobid"); + JobInProgress job = (JobInProgress) tracker.getJob(jobid); String tipid = request.getParameter("tipid"); String taskid = request.getParameter("taskid"); @@ -57,7 +58,6 @@ } } } - JobInProgress job = (JobInProgress) tracker.getJob(jobid); TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(jobid, tipid) : null; %> Modified: lucene/hadoop/trunk/src/webapps/job/taskstats.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskstats.jsp?rev=575438&r1=575437&r2=575438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/taskstats.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/taskstats.jsp Thu Sep 13 13:24:10 2007 @@ -12,8 +12,10 @@ import="org.apache.hadoop.util.*" %> <% + JobTracker tracker = (JobTracker) application.getAttribute("job.tracker"); + String trackerName = + StringUtils.simpleHostname(tracker.getJobTrackerMachine()); String jobid = request.getParameter("jobid"); - JobTracker tracker = JobTracker.getTracker(); JobInProgress job = (JobInProgress) tracker.getJob(jobid); String tipid = request.getParameter("tipid"); String taskid = request.getParameter("taskid");