Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 77256 invoked from network); 2 Mar 2006 23:06:09 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 2 Mar 2006 23:06:09 -0000 Received: (qmail 87204 invoked by uid 500); 2 Mar 2006 23:06:56 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 87185 invoked by uid 500); 2 Mar 2006 23:06:56 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 87176 invoked by uid 99); 2 Mar 2006 23:06:56 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Mar 2006 15:06:56 -0800 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 02 Mar 2006 15:06:54 -0800 Received: (qmail 77186 invoked by uid 65534); 2 Mar 2006 23:05:47 -0000 Message-ID: <20060302230547.77184.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r382545 - in /lucene/hadoop/trunk/src/java/org/apache/hadoop: dfs/DFSClient.java dfs/FSNamesystem.java mapred/JobInProgress.java mapred/JobStatus.java mapred/JobTracker.java mapred/TaskInProgress.java Date: Thu, 02 Mar 2006 23:05:45 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.7 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: cutting Date: Thu Mar 2 15:05:43 2006 New Revision: 382545 URL: http://svn.apache.org/viewcvs?rev=382545&view=rev Log: Fix for HADOOP-16. Splitting and other job planning is now performed in a separate thread. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=382545&r1=382544&r2=382545&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Mar 2 15:05:43 2006 @@ -585,7 +585,7 @@ // Connection failed. Let's wait a little bit and retry try { if (System.currentTimeMillis() - start > 5000) { - LOG.info("Waiting to find target node"); + LOG.info("Waiting to find target node: " + target); } Thread.sleep(6000); } catch (InterruptedException iex) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=382545&r1=382544&r2=382545&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Mar 2 15:05:43 2006 @@ -550,6 +550,10 @@ int endBlock = -1; Block blocks[] = dir.getFile(src); + if (blocks == null) { // no blocks + return new UTF8[0][]; + } + // // First, figure out where the range falls in // the blocklist. @@ -579,7 +583,7 @@ if (startBlock < 0 || endBlock < 0) { return new UTF8[0][]; } else { - UTF8 hosts[][] = new UTF8[endBlock - startBlock + 1][]; + UTF8 hosts[][] = new UTF8[(endBlock - startBlock) + 1][]; for (int i = startBlock; i <= endBlock; i++) { TreeSet containingNodes = (TreeSet) blocksMap.get(blocks[i]); Vector v = new Vector(); @@ -587,7 +591,7 @@ DatanodeInfo cur = (DatanodeInfo) it.next(); v.add(cur.getHost()); } - hosts[i] = (UTF8[]) v.toArray(new UTF8[v.size()]); + hosts[i-startBlock] = (UTF8[]) v.toArray(new UTF8[v.size()]); } return hosts; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=382545&r1=382544&r2=382545&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Mar 2 15:05:43 2006 @@ -44,6 +44,7 @@ int numReduceTasks = 0; JobTracker jobtracker = null; + TreeMap cachedHints = new TreeMap(); long startTime; long finishTime; @@ -62,7 +63,7 @@ this.conf = conf; this.jobtracker = jobtracker; this.profile = new JobProfile(jobid, jobFile, url); - this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.RUNNING); + this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP); this.startTime = System.currentTimeMillis(); this.localJobFile = new JobConf(conf).getLocalFile(JobTracker.SUBDIR, jobid + ".xml"); @@ -92,9 +93,10 @@ } /** - * Construct the splits, etc + * Construct the splits, etc. This is invoked from an async + * thread so that split-computation doesn't block anyone. */ - void initTasks() throws IOException { + public void initTasks() throws IOException { if (tasksInited) { return; } @@ -153,9 +155,33 @@ reduces[i] = new TaskInProgress(jobFile, maps, i, jobtracker, conf, this); } + // + // Obtain some tasktracker-cache information for the map task splits. + // + for (int i = 0; i < maps.length; i++) { + String hints[][] = fs.getFileCacheHints(splits[i].getFile(), splits[i].getStart(), splits[i].getLength()); + cachedHints.put(maps[i].getTIPId(), hints); + } + + this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING); tasksInited = true; } + /** + * This is called by TaskInProgress objects. The JobInProgress + * prefetches and caches a lot of these hints. If the hint is + * not available, then we pass it through to the filesystem. + */ + String[][] getFileCacheHints(String tipID, File f, long start, long len) throws IOException { + String results[][] = (String[][]) cachedHints.get(tipID); + if (tipID == null) { + FileSystem fs = FileSystem.get(conf); + results = fs.getFileCacheHints(f, start, len); + cachedHints.put(tipID, results); + } + return results; + } + ///////////////////////////////////////////////////// // Accessors for the JobInProgress ///////////////////////////////////////////////////// @@ -252,12 +278,8 @@ */ public Task obtainNewMapTask(String taskTracker, TaskTrackerStatus tts) { if (! tasksInited) { - try { - initTasks(); - } catch (IOException ie) { - ie.printStackTrace(); - LOG.info("Cannot create task split for " + profile.getJobId()); - } + LOG.info("Cannot create task split for " + profile.getJobId()); + return null; } Task t = null; @@ -271,30 +293,62 @@ // we call obtainNewMapTask() really fast, twice in a row. // There's not enough time for the "recentTasks" // + + // + // Compute avg progress through the map tasks + // + for (int i = 0; i < maps.length; i++) { + totalProgress += maps[i].getProgress(); + } + double avgProgress = totalProgress / maps.length; + + // + // See if there is a split over a block that is stored on + // the TaskTracker checking in. That means the block + // doesn't have to be transmitted from another node. + // for (int i = 0; i < maps.length; i++) { if (maps[i].hasTaskWithCacheHit(taskTracker, tts)) { if (cacheTarget < 0) { cacheTarget = i; break; } - } else if (maps[i].hasTask()) { - if (stdTarget < 0) { - stdTarget = i; - break; + } + } + + // + // If there's no cached target, see if there's + // a std. task to run. + // + if (cacheTarget < 0) { + for (int i = 0; i < maps.length; i++) { + if (maps[i].hasTask()) { + if (stdTarget < 0) { + stdTarget = i; + break; + } } } - totalProgress += maps[i].getProgress(); } - double avgProgress = totalProgress / maps.length; - for (int i = 0; i < maps.length; i++) { - if (maps[i].hasSpeculativeTask(avgProgress)) { - if (specTarget < 0) { - specTarget = i; + // + // If no cached-target and no std target, see if + // there's a speculative task to run. + // + if (cacheTarget < 0 && stdTarget < 0) { + for (int i = 0; i < maps.length; i++) { + if (maps[i].hasSpeculativeTask(avgProgress)) { + if (specTarget < 0) { + specTarget = i; + break; + } } } } - + + // + // Run whatever we found + // if (cacheTarget >= 0) { t = maps[cacheTarget].getTaskToRun(taskTracker, tts, avgProgress); } else if (stdTarget >= 0) { @@ -312,12 +366,8 @@ */ public Task obtainNewReduceTask(String taskTracker, TaskTrackerStatus tts) { if (! tasksInited) { - try { - initTasks(); - } catch (IOException ie) { - ie.printStackTrace(); - LOG.info("Cannot create task split for " + profile.getJobId()); - } + LOG.info("Cannot create task split for " + profile.getJobId()); + return null; } Task t = null; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?rev=382545&r1=382544&r2=382545&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Thu Mar 2 15:05:43 2006 @@ -39,6 +39,7 @@ public static final int RUNNING = 1; public static final int SUCCEEDED = 2; public static final int FAILED = 3; + public static final int PREP = 4; String jobid; float mapProgress; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=382545&r1=382544&r2=382545&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Mar 2 15:05:43 2006 @@ -33,6 +33,7 @@ * @author Mike Cafarella *******************************************************/ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol { + static long JOBINIT_SLEEP_INTERVAL = 2000; static long RETIRE_JOB_INTERVAL; static long RETIRE_JOB_CHECK_INTERVAL; static float TASK_ALLOC_EPSILON; @@ -156,14 +157,21 @@ } synchronized (jobs) { - for (Iterator it = jobs.keySet().iterator(); it.hasNext(); ) { - String jobid = (String) it.next(); - JobInProgress job = (JobInProgress) jobs.get(jobid); - - if (job.getStatus().getRunState() != JobStatus.RUNNING && - (job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) { - it.remove(); - jobsByArrival.remove(job); + synchronized (jobInitQueue) { + synchronized (jobsByArrival) { + for (Iterator it = jobs.keySet().iterator(); it.hasNext(); ) { + String jobid = (String) it.next(); + JobInProgress job = (JobInProgress) jobs.get(jobid); + + if (job.getStatus().getRunState() != JobStatus.RUNNING && + job.getStatus().getRunState() != JobStatus.PREP && + (job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) { + it.remove(); + + jobInitQueue.remove(job); + jobsByArrival.remove(job); + } + } } } } @@ -175,6 +183,43 @@ } ///////////////////////////////////////////////////////////////// + // Used to init new jobs that have just been created + ///////////////////////////////////////////////////////////////// + class JobInitThread implements Runnable { + boolean shouldRun = true; + public JobInitThread() { + } + public void run() { + while (shouldRun) { + JobInProgress job = null; + synchronized (jobInitQueue) { + if (jobInitQueue.size() > 0) { + job = (JobInProgress) jobInitQueue.elementAt(0); + jobInitQueue.remove(job); + } else { + try { + jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL); + } catch (InterruptedException iex) { + } + } + } + try { + if (job != null) { + job.initTasks(); + } + } catch (Exception e) { + LOG.log(Level.WARNING, "job init failed", e); + job.kill(); + } + } + } + public void stopIniter() { + shouldRun = false; + } + } + + + ///////////////////////////////////////////////////////////////// // The real JobTracker //////////////////////////////////////////////////////////////// int port; @@ -221,8 +266,10 @@ int totalMaps = 0; int totalReduces = 0; TreeMap taskTrackers = new TreeMap(); + Vector jobInitQueue = new Vector(); ExpireTrackers expireTrackers = new ExpireTrackers(); RetireJobs retireJobs = new RetireJobs(); + JobInitThread initJobs = new JobInitThread(); /** * It might seem like a bug to maintain a TreeSet of status objects, @@ -307,6 +354,7 @@ new Thread(this.expireTrackers).start(); new Thread(this.retireJobs).start(); + new Thread(this.initJobs).start(); } public static InetSocketAddress getAddress(Configuration conf) { @@ -521,67 +569,69 @@ // has not yet been removed from the pool, making capacity seem // larger than it really is.) // - if ((numMaps < maxCurrentTasks) && - (numMaps <= (avgMaps + TASK_ALLOC_EPSILON))) { - - int totalNeededMaps = 0; - for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) { - JobInProgress job = (JobInProgress) it.next(); - if (job.getStatus().getRunState() != JobStatus.RUNNING) { - continue; - } + synchronized (jobsByArrival) { + if ((numMaps < maxCurrentTasks) && + (numMaps <= (avgMaps + TASK_ALLOC_EPSILON))) { + + int totalNeededMaps = 0; + for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) { + JobInProgress job = (JobInProgress) it.next(); + if (job.getStatus().getRunState() != JobStatus.RUNNING) { + continue; + } - Task t = job.obtainNewMapTask(taskTracker, tts); - if (t != null) { - return t; - } + Task t = job.obtainNewMapTask(taskTracker, tts); + if (t != null) { + return t; + } - // - // Beyond the highest-priority task, reserve a little - // room for failures and speculative executions; don't - // schedule tasks to the hilt. - // - totalNeededMaps += job.desiredMaps(); - double padding = 0; - if (totalCapacity > MIN_SLOTS_FOR_PADDING) { - padding = Math.min(maxCurrentTasks, totalNeededMaps * PAD_FRACTION); - } - if (totalNeededMaps + padding >= totalCapacity) { - break; + // + // Beyond the highest-priority task, reserve a little + // room for failures and speculative executions; don't + // schedule tasks to the hilt. + // + totalNeededMaps += job.desiredMaps(); + double padding = 0; + if (totalCapacity > MIN_SLOTS_FOR_PADDING) { + padding = Math.min(maxCurrentTasks, totalNeededMaps * PAD_FRACTION); + } + if (totalNeededMaps + padding >= totalCapacity) { + break; + } } } - } - - // - // Same thing, but for reduce tasks - // - if ((numReduces < maxCurrentTasks) && - (numReduces <= (avgReduces + TASK_ALLOC_EPSILON))) { - int totalNeededReduces = 0; - for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) { - JobInProgress job = (JobInProgress) it.next(); - if (job.getStatus().getRunState() != JobStatus.RUNNING) { - continue; - } + // + // Same thing, but for reduce tasks + // + if ((numReduces < maxCurrentTasks) && + (numReduces <= (avgReduces + TASK_ALLOC_EPSILON))) { + + int totalNeededReduces = 0; + for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) { + JobInProgress job = (JobInProgress) it.next(); + if (job.getStatus().getRunState() != JobStatus.RUNNING) { + continue; + } - Task t = job.obtainNewReduceTask(taskTracker, tts); - if (t != null) { - return t; - } + Task t = job.obtainNewReduceTask(taskTracker, tts); + if (t != null) { + return t; + } - // - // Beyond the highest-priority task, reserve a little - // room for failures and speculative executions; don't - // schedule tasks to the hilt. - // - totalNeededReduces += job.desiredReduces(); - double padding = 0; - if (totalCapacity > MIN_SLOTS_FOR_PADDING) { - padding = Math.min(maxCurrentTasks, totalNeededReduces * PAD_FRACTION); - } - if (totalNeededReduces + padding >= totalCapacity) { - break; + // + // Beyond the highest-priority task, reserve a little + // room for failures and speculative executions; don't + // schedule tasks to the hilt. + // + totalNeededReduces += job.desiredReduces(); + double padding = 0; + if (totalCapacity > MIN_SLOTS_FOR_PADDING) { + padding = Math.min(maxCurrentTasks, totalNeededReduces * PAD_FRACTION); + } + if (totalNeededReduces + padding >= totalCapacity) { + break; + } } } } @@ -645,9 +695,31 @@ //////////////////////////////////////////////////// // JobSubmissionProtocol //////////////////////////////////////////////////// + /** + * JobTracker.submitJob() kicks off a new job. + * + * Create a 'JobInProgress' object, which contains both JobProfile + * and JobStatus. Those two sub-objects are sometimes shipped outside + * of the JobTracker. But JobInProgress adds info that's useful for + * the JobTracker alone. + * + * We add the JIP to the jobInitQueue, which is processed + * asynchronously to handle split-computation and build up + * the right TaskTracker/Block mapping. + */ public synchronized JobStatus submitJob(String jobFile) throws IOException { totalSubmissions++; - JobInProgress job = createJob(jobFile); + JobInProgress job = new JobInProgress(jobFile, this, this.conf); + synchronized (jobs) { + synchronized (jobsByArrival) { + synchronized (jobInitQueue) { + jobs.put(job.getProfile().getJobId(), job); + jobsByArrival.add(job); + jobInitQueue.add(job); + jobInitQueue.notifyAll(); + } + } + } return job.getStatus(); } @@ -730,25 +802,6 @@ */ String createUniqueId() { return "" + Integer.toString(Math.abs(r.nextInt()),36); - } - - /** - * JobProfile createJob() kicks off a new job. - * This function creates a job profile and also decomposes it into - * tasks. The tasks are added to the unassignedTasks structure. - * (The precise structure will change as we get more sophisticated about - * task allocation.) - * - * Create a 'JobInProgress' object, which contains both JobProfile - * and JobStatus. Those two sub-objects are sometimes shipped outside - * of the JobTracker. But JobInProgress adds info that's useful for - * the JobTracker alone. - */ - JobInProgress createJob(String jobFile) throws IOException { - JobInProgress job = new JobInProgress(jobFile, this, this.conf); - jobs.put(job.getProfile().getJobId(), job); - jobsByArrival.add(job); - return job; } //////////////////////////////////////////////////// Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=382545&r1=382544&r2=382545&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Thu Mar 2 15:05:43 2006 @@ -360,8 +360,7 @@ try { if (isMapTask()) { if (hints == null) { - FileSystem fs = FileSystem.get(conf); - hints = fs.getFileCacheHints(split.getFile(), split.getStart(), split.getLength()); + hints = job.getFileCacheHints(getTIPId(), split.getFile(), split.getStart(), split.getLength()); } if (hints != null) { for (int i = 0; i < hints.length; i++) {