Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 50969 invoked from network); 8 Dec 2009 15:15:06 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 8 Dec 2009 15:15:06 -0000 Received: (qmail 87564 invoked by uid 500); 8 Dec 2009 15:15:06 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 87507 invoked by uid 500); 8 Dec 2009 15:15:05 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 87497 invoked by uid 99); 8 Dec 2009 15:15:05 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Dec 2009 15:15:05 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=AWL,BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Dec 2009 15:15:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id CD2CD23888DC; Tue, 8 Dec 2009 15:14:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r888431 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/ Date: Tue, 08 Dec 2009 15:14:40 -0000 To: mapreduce-commits@hadoop.apache.org From: sharad@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091208151440.CD2CD23888DC@eris.apache.org> Author: sharad Date: Tue Dec 8 15:14:40 2009 New Revision: 888431 URL: http://svn.apache.org/viewvc?rev=888431&view=rev Log: MAPREDUCE-754. Fix NPE in expiry thread when a TT is lost. Contributed by Amar Kamat. Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=888431&r1=888430&r2=888431&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Tue Dec 8 15:14:40 2009 @@ -966,3 +966,6 @@ MAPREDUCE-1075. Fix JobTracker to not throw an NPE for a non-existent queue. (V.V.Chaitanya Krishna via yhemanth) + MAPREDUCE-754. Fix NPE in expiry thread when a TT is lost. (Amar Kamat + via sharad) + Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=888431&r1=888430&r2=888431&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Dec 8 15:14:40 2009 @@ -804,17 +804,21 @@ private void removeHostCapacity(String hostName) { synchronized (taskTrackers) { // remove the capacity of trackers on this host + int numTrackersOnHost = 0; for (TaskTrackerStatus status : getStatusesOnHost(hostName)) { int mapSlots = status.getMaxMapSlots(); totalMapTaskCapacity -= mapSlots; int reduceSlots = status.getMaxReduceSlots(); totalReduceTaskCapacity -= reduceSlots; + ++numTrackersOnHost; getInstrumentation().addBlackListedMapSlots( mapSlots); getInstrumentation().addBlackListedReduceSlots( reduceSlots); } - incrBlackListedTrackers(uniqueHostsMap.remove(hostName)); + // remove the host + uniqueHostsMap.remove(hostName); + incrBlackListedTrackers(numTrackersOnHost); } } @@ -2468,12 +2472,14 @@ taskTrackers.remove(trackerName); Integer numTaskTrackersInHost = uniqueHostsMap.get(oldStatus.getHost()); - numTaskTrackersInHost --; - if (numTaskTrackersInHost > 0) { - uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost); - } - else { - uniqueHostsMap.remove(oldStatus.getHost()); + if (numTaskTrackersInHost != null) { + numTaskTrackersInHost --; + if (numTaskTrackersInHost > 0) { + uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost); + } + else { + uniqueHostsMap.remove(oldStatus.getHost()); + } } } } @@ -3841,8 +3847,8 @@ Set trackers = hostnameToTaskTracker.remove(host); if (trackers != null) { for (TaskTracker tracker : trackers) { - LOG.info("Decommission: Losing tracker " + tracker + - " on host " + host); + LOG.info("Decommission: Losing tracker " + + tracker.getTrackerName() + " on host " + host); removeTracker(tracker); } trackersDecommissioned += trackers.size(); Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=888431&r1=888430&r2=888431&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Tue Dec 8 15:14:40 2009 @@ -63,8 +63,10 @@ } @Override public ClusterStatus getClusterStatus(boolean detailed) { - return new ClusterStatus(trackers.length, - 0, 0, 0, 0, totalSlots/2, totalSlots/2, JobTracker.State.RUNNING, 0); + return new ClusterStatus( + taskTrackers().size() - getBlacklistedTrackerCount(), + getBlacklistedTrackerCount(), 0, 0, 0, totalSlots/2, totalSlots/2, + JobTracker.State.RUNNING, 0); } public void setNumSlots(int totalSlots) { Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java?rev=888431&r1=888430&r2=888431&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java Tue Dec 8 15:14:40 2009 @@ -24,6 +24,7 @@ import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress; import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker; import org.apache.hadoop.mapred.UtilsForTests.FakeClock; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; /** @@ -47,6 +48,7 @@ conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0"); conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0"); conf.setLong(JTConfig.JT_TRACKER_EXPIRY_INTERVAL, 1000); + conf.set(JTConfig.JT_MAX_TRACKER_BLACKLISTS, "1"); jobTracker = new FakeJobTracker(conf, (clock = new FakeClock()), trackers); jobTracker.startExpireTrackersThread(); } @@ -91,4 +93,139 @@ job.finishTask(tid[1]); } + + /** + * Test whether the tracker gets blacklisted after its lost. + */ + public void testLostTrackerBeforeBlacklisting() throws Exception { + FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]); + TaskAttemptID[] tid = new TaskAttemptID[3]; + JobConf conf = new JobConf(); + conf.setNumMapTasks(1); + conf.setNumReduceTasks(1); + conf.set(JobContext.MAX_TASK_FAILURES_PER_TRACKER, "1"); + conf.set(JobContext.SETUP_CLEANUP_NEEDED, "false"); + FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); + job.initTasks(); + job.setClusterSize(4); + + // Tracker 0 gets the map task + tid[0] = job.findMapTask(trackers[0]); + + job.finishTask(tid[0]); + + // validate the total tracker count + assertEquals("Active tracker count mismatch", + 1, jobTracker.getClusterStatus(false).getTaskTrackers()); + + // lose the tracker + clock.advance(1100); + jobTracker.checkExpiredTrackers(); + assertFalse("Tracker 0 not lost", + jobTracker.getClusterStatus(false).getActiveTrackerNames() + .contains(trackers[0])); + + // validate the total tracker count + assertEquals("Active tracker count mismatch", + 0, jobTracker.getClusterStatus(false).getTaskTrackers()); + + // Tracker 1 establishes contact with JT + FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]); + + // Tracker1 should get assigned the lost map task + tid[1] = job.findMapTask(trackers[1]); + + assertNotNull("Map Task from Lost Tracker did not get reassigned", tid[1]); + + assertEquals("Task ID of reassigned map task does not match", + tid[0].getTaskID().toString(), tid[1].getTaskID().toString()); + + // finish the map task + job.finishTask(tid[1]); + + // finish the reduce task + tid[2] = job.findReduceTask(trackers[1]); + job.finishTask(tid[2]); + + // check if job is successful + assertEquals("Job not successful", + JobStatus.SUCCEEDED, job.getStatus().getRunState()); + + // check if the tracker is lost + // validate the total tracker count + assertEquals("Active tracker count mismatch", + 1, jobTracker.getClusterStatus(false).getTaskTrackers()); + // validate blacklisted count .. since we lost one blacklisted tracker + assertEquals("Blacklisted tracker count mismatch", + 0, jobTracker.getClusterStatus(false).getBlacklistedTrackers()); + } + + /** + * Test whether the tracker gets lost after its blacklisted. + */ + public void testLostTrackerAfterBlacklisting() throws Exception { + FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]); + clock.advance(600); + TaskAttemptID[] tid = new TaskAttemptID[2]; + JobConf conf = new JobConf(); + conf.setNumMapTasks(1); + conf.setNumReduceTasks(0); + conf.set(JobContext.MAX_TASK_FAILURES_PER_TRACKER, "1"); + conf.set(JobContext.SETUP_CLEANUP_NEEDED, "false"); + FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); + job.initTasks(); + job.setClusterSize(4); + + // check if the tracker count is correct + assertEquals("Active tracker count mismatch", + 1, jobTracker.taskTrackers().size()); + + // Tracker 0 gets the map task + tid[0] = job.findMapTask(trackers[0]); + // Fail the task + job.failTask(tid[0]); + + // Tracker 1 establishes contact with JT + FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]); + // check if the tracker count is correct + assertEquals("Active tracker count mismatch", + 2, jobTracker.taskTrackers().size()); + + // Tracker 1 gets the map task + tid[1] = job.findMapTask(trackers[1]); + // Finish the task and also the job + job.finishTask(tid[1]); + + // check if job is successful + assertEquals("Job not successful", + JobStatus.SUCCEEDED, job.getStatus().getRunState()); + + // check if the trackers 1 got blacklisted + assertTrue("Tracker 0 not blacklisted", + jobTracker.getBlacklistedTrackers()[0].getTaskTrackerName() + .equals(trackers[0])); + // check if the tracker count is correct + assertEquals("Active tracker count mismatch", + 2, jobTracker.taskTrackers().size()); + // validate blacklisted count + assertEquals("Blacklisted tracker count mismatch", + 1, jobTracker.getClusterStatus(false).getBlacklistedTrackers()); + + // Advance clock. Tracker 0 should be lost + clock.advance(500); + jobTracker.checkExpiredTrackers(); + + // check if the task tracker is lost + assertFalse("Tracker 0 not lost", + jobTracker.getClusterStatus(false).getActiveTrackerNames() + .contains(trackers[0])); + + // check if the lost tracker has removed from the jobtracker + assertEquals("Active tracker count mismatch", + 1, jobTracker.taskTrackers().size()); + // validate blacklisted count + assertEquals("Blacklisted tracker count mismatch", + 0, jobTracker.getClusterStatus(false).getBlacklistedTrackers()); + + } } \ No newline at end of file Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=888431&r1=888430&r2=888431&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Tue Dec 8 15:14:40 2009 @@ -32,7 +32,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UnixUserGroupInformation; @@ -377,4 +381,88 @@ stopCluster(); } + + // Mapper that fails once for the first time + static class FailOnceMapper extends MapReduceBase implements + Mapper { + + private boolean shouldFail = false; + public void map(WritableComparable key, Writable value, + OutputCollector out, Reporter reporter) + throws IOException { + + if (shouldFail) { + throw new RuntimeException("failing map"); + } + } + + @Override + public void configure(JobConf conf) { + TaskAttemptID id = TaskAttemptID.forName(conf.get("mapred.task.id")); + shouldFail = id.getId() == 0 && id.getTaskID().getId() == 0; + } + } + + /** + * Check refreshNodes for decommissioning blacklisted nodes. + */ + public void testBlacklistedNodeDecommissioning() throws Exception { + LOG.info("Testing blacklisted node decommissioning"); + + Configuration conf = new Configuration(); + conf.set(JTConfig.JT_MAX_TRACKER_BLACKLISTS, "1"); + + startCluster(2, 1, 0, conf); + + assertEquals("Trackers not up", 2, + mr.getJobTrackerRunner().getJobTracker().getActiveTrackers().length); + // validate the total tracker count + assertEquals("Active tracker count mismatch", + 2, jt.getClusterStatus(false).getTaskTrackers()); + // validate blacklisted count + assertEquals("Blacklisted tracker count mismatch", + 0, jt.getClusterStatus(false).getBlacklistedTrackers()); + + // run a failing job to blacklist the tracker + JobConf jConf = mr.createJobConf(); + jConf.set(JobContext.MAX_TASK_FAILURES_PER_TRACKER, "1"); + jConf.setJobName("test-job-fail-once"); + jConf.setMapperClass(FailOnceMapper.class); + jConf.setReducerClass(IdentityReducer.class); + jConf.setNumMapTasks(1); + jConf.setNumReduceTasks(0); + + RunningJob job = + UtilsForTests.runJob(jConf, new Path("in"), new Path("out")); + job.waitForCompletion(); + + // check if the tracker is lost + // validate the total tracker count + assertEquals("Active tracker count mismatch", + 1, jt.getClusterStatus(false).getTaskTrackers()); + // validate blacklisted count + assertEquals("Blacklisted tracker count mismatch", + 1, jt.getClusterStatus(false).getBlacklistedTrackers()); + + // find the tracker to decommission + String hostToDecommission = + JobInProgress.convertTrackerNameToHostName( + jt.getBlacklistedTrackers()[0].getTaskTrackerName()); + LOG.info("Decommissioning host " + hostToDecommission); + + Set decom = new HashSet(1); + decom.add(hostToDecommission); + jt.decommissionNodes(decom); + + // check the cluster status and tracker size + assertEquals("Tracker is not lost upon host decommissioning", + 1, jt.getClusterStatus(false).getTaskTrackers()); + assertEquals("Blacklisted tracker count incorrect in cluster status after " + + "decommissioning", + 0, jt.getClusterStatus(false).getBlacklistedTrackers()); + assertEquals("Tracker is not lost upon host decommissioning", + 1, jt.taskTrackers().size()); + + stopCluster(); + } }