Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 92488 invoked from network); 2 Jun 2010 10:51:41 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 2 Jun 2010 10:51:41 -0000 Received: (qmail 92692 invoked by uid 500); 2 Jun 2010 10:51:41 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 92603 invoked by uid 500); 2 Jun 2010 10:51:39 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 92594 invoked by uid 99); 2 Jun 2010 10:51:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Jun 2010 10:51:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Jun 2010 10:51:37 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 232B823888E4; Wed, 2 Jun 2010 10:51:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r950485 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskTracker.java src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerSlotManagement.java Date: Wed, 02 Jun 2010 10:51:17 -0000 To: mapreduce-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100602105117.232B823888E4@eris.apache.org> Author: vinodkv Date: Wed Jun 2 10:51:16 2010 New Revision: 950485 URL: http://svn.apache.org/viewvc?rev=950485&view=rev Log: MAPREDUCE-913. TaskRunner crashes with NPE resulting in held up slots, UNINITIALIZED tasks and hung TaskTracker. Contributed by Amareshwari Sriramadasu and Sreekanth Ramakrishnan. Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerSlotManagement.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=950485&r1=950484&r2=950485&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jun 2 10:51:16 2010 @@ -1638,3 +1638,7 @@ Release 0.21.0 - Unreleased (Dick King and Amareshwari Sriramadasu via tomwhite) MAPREDUCE-118. Fix Job.getJobID(). (Amareshwari Sriramadasu via sharad) + + MAPREDUCE-913. TaskRunner crashes with NPE resulting in held up slots, + UNINITIALIZED tasks and hung TaskTracker. (Amareshwari Sriramadasu and + Sreekanth Ramakrishnan via vinodkv) Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=950485&r1=950484&r2=950485&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun 2 10:51:16 2010 @@ -2093,7 +2093,16 @@ public class TaskTracker reduceLauncher.addToTaskQueue(action); } } - + + // This method is called from unit tests + int getFreeSlots(boolean isMap) { + if (isMap) { + return mapLauncher.numFreeSlots.get(); + } else { + return reduceLauncher.numFreeSlots.get(); + } + } + class TaskLauncher extends Thread { private IntWritable numFreeSlots; private final int maxSlots; @@ -2657,8 +2666,11 @@ public class TaskTracker */ void reportTaskFinished(boolean commitPending) { if (!commitPending) { - taskFinished(); - releaseSlot(); + try { + taskFinished(); + } finally { + releaseSlot(); + } } notifyTTAboutTaskCompletion(); } @@ -2728,7 +2740,15 @@ public class TaskTracker setTaskFailState(true); // call the script here for the failed tasks. if (debugCommand != null) { - runDebugScript(); + try { + runDebugScript(); + } catch (Exception e) { + String msg = + "Debug-script could not be run successfully : " + + StringUtils.stringifyException(e); + LOG.warn(msg); + reportDiagnosticInfo(msg); + } } } taskStatus.setProgress(0.0f); @@ -2749,14 +2769,17 @@ public class TaskTracker if (needCleanup) { removeTaskFromJob(task.getJobID(), this); } - try { - cleanup(needCleanup); - } catch (IOException ie) { - } + cleanup(needCleanup); } - - private void runDebugScript() { + + /** + * Run the debug-script now. Because debug-script can be user code, we use + * {@link TaskController} to execute the debug script. + * + * @throws IOException + */ + private void runDebugScript() throws IOException { String taskStdout =""; String taskStderr =""; String taskSyslog =""; @@ -2774,23 +2797,14 @@ public class TaskTracker taskSyslog = FileUtil .makeShellPath(TaskLog.getRealTaskLogFileLocation(task.getTaskID(), task.isTaskCleanupTask(), TaskLog.LogName.SYSLOG)); - } catch(IOException e){ - LOG.warn("Exception finding task's stdout/err/syslog files"); - } - File workDir = null; - try { - workDir = - new File(lDirAlloc.getLocalPathToRead( - TaskTracker.getLocalTaskDir(task.getUser(), task - .getJobID().toString(), task.getTaskID() - .toString(), task.isTaskCleanupTask()) - + Path.SEPARATOR + MRConstants.WORKDIR, - localJobConf).toString()); - } catch (IOException e) { - LOG.warn("Working Directory of the task " + task.getTaskID() + - " doesnt exist. Caught exception " + - StringUtils.stringifyException(e)); + } catch(Exception e){ + LOG.warn("Exception finding task's stdout/err/syslog files", e); } + File workDir = new File(lDirAlloc.getLocalPathToRead( + TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID() + .toString(), task.getTaskID().toString(), task + .isTaskCleanupTask()) + + Path.SEPARATOR + MRConstants.WORKDIR, localJobConf).toString()); // Build the command File stdout = TaskLog.getTaskLogFile(task.getTaskID(), task .isTaskCleanupTask(), TaskLog.LogName.DEBUGOUT); @@ -2820,21 +2834,10 @@ public class TaskTracker context.stdout = stdout; context.workDir = workDir; context.task = task; - try { - getTaskController().runDebugScript(context); - // add all lines of debug out to diagnostics - try { - int num = localJobConf.getInt(MRJobConfig.TASK_DEBUGOUT_LINES, - -1); - addDiagnostics(FileUtil.makeShellPath(stdout),num, - "DEBUG OUT"); - } catch(IOException ioe) { - LOG.warn("Exception in add diagnostics!"); - } - } catch (IOException ie) { - LOG.warn("runDebugScript failed with: " + StringUtils. - stringifyException(ie)); - } + getTaskController().runDebugScript(context); + // add the lines of debug out to diagnostics + int num = localJobConf.getInt(MRJobConfig.TASK_DEBUGOUT_LINES, -1); + addDiagnostics(FileUtil.makeShellPath(stdout), num, "DEBUG OUT"); } /** @@ -2998,7 +3001,7 @@ public class TaskTracker * otherwise the current working directory of the task * i.e. <taskid>/work is cleaned up. */ - void cleanup(boolean needCleanup) throws IOException { + void cleanup(boolean needCleanup) { TaskAttemptID taskId = task.getTaskID(); LOG.debug("Cleaning up " + taskId); Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerSlotManagement.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerSlotManagement.java?rev=950485&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerSlotManagement.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerSlotManagement.java Wed Jun 2 10:51:16 2010 @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.File; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.ClusterMetrics; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.MapReduceTestUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Regression test for MAPREDUCE-913 + */ +public class TestTaskTrackerSlotManagement { + + private static final Path TEST_DIR = new Path(System.getProperty( + "test.build.data", "/tmp"), "tt_slots"); + private static final String CACHE_FILE_PATH = new Path(TEST_DIR, "test.txt") + .toString(); + + /** + * Test-setup. Create the cache-file. + * + * @throws Exception + */ + @Before + public void setUp() throws Exception { + new File(TEST_DIR.toString()).mkdirs(); + File myFile = new File(CACHE_FILE_PATH); + myFile.createNewFile(); + } + + /** + * Test-cleanup. Remove the cache-file. + * + * @throws Exception + */ + @After + public void tearDown() throws Exception { + File myFile = new File(CACHE_FILE_PATH); + myFile.delete(); + new File(TEST_DIR.toString()).delete(); + } + + /** + * Test case to test addition of free slot when the job fails localization due + * to cache file being modified after the job has started running. + * + * @throws Exception + */ + @Test + public void testFreeingOfTaskSlots() throws Exception { + // Start a cluster with no task tracker. + MiniMRCluster mrCluster = new MiniMRCluster(0, "file:///", 1); + Configuration conf = mrCluster.createJobConf(); + Cluster cluster = new Cluster(conf); + // set the debug script so that TT tries to launch the debug + // script for failed tasks. + conf.set(JobContext.MAP_DEBUG_SCRIPT, "/bin/echo"); + conf.set(JobContext.REDUCE_DEBUG_SCRIPT, "/bin/echo"); + Job j = MapReduceTestUtil.createJob(conf, new Path(TEST_DIR, "in"), + new Path(TEST_DIR, "out"), 0, 0); + // Add the local filed created to the cache files of the job + j.addCacheFile(new URI(CACHE_FILE_PATH)); + j.setMaxMapAttempts(1); + j.setMaxReduceAttempts(1); + // Submit the job and return immediately. + // Job submit now takes care setting the last + // modified time of the cache file. + j.submit(); + // Look up the file and modify the modification time. + File myFile = new File(CACHE_FILE_PATH); + myFile.setLastModified(0L); + // Start up the task tracker after the time has been changed. + mrCluster.startTaskTracker(null, null, 0, 1); + // Now wait for the job to fail. + j.waitForCompletion(false); + Assert.assertFalse("Job successfully completed.", j.isSuccessful()); + + ClusterMetrics metrics = cluster.getClusterStatus(); + // validate number of slots in JobTracker + Assert.assertEquals(0, metrics.getOccupiedMapSlots()); + Assert.assertEquals(0, metrics.getOccupiedReduceSlots()); + + // validate number of slots in TaskTracker + TaskTracker tt = mrCluster.getTaskTrackerRunner(0).getTaskTracker(); + Assert.assertEquals(metrics.getMapSlotCapacity(), tt.getFreeSlots(true)); + Assert.assertEquals(metrics.getReduceSlotCapacity(), tt.getFreeSlots(false)); + + } +}