Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 16914 invoked from network); 4 Mar 2011 04:32:05 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 04:32:05 -0000 Received: (qmail 34625 invoked by uid 500); 4 Mar 2011 04:32:05 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 34595 invoked by uid 500); 4 Mar 2011 04:32:05 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 34585 invoked by uid 99); 4 Mar 2011 04:32:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:32:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:31:55 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A89C62388C1C; Fri, 4 Mar 2011 04:31:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077582 - in /hadoop/common/branches/branch-0.20-security-patches/src/test/system: aop/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapreduce/test/system/ java/shared/ java/shared/org/ java/shared/org/apa... Date: Fri, 04 Mar 2011 04:31:32 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304043132.A89C62388C1C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Fri Mar 4 04:31:31 2011 New Revision: 1077582 URL: http://svn.apache.org/viewvc?rev=1077582&view=rev Log: commit 8d4bb100904e189de9c215c52012303e9a18a1b1 Author: Iyappan Srinivasan Date: Fri Jul 23 06:27:13 2010 +0000 MAPREDUCE-1871 https://issues.apache.org/jira/secure/attachment/12450270/1871-ydist-security-patch.txt Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/StatisticsCollectorAspect.aj hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoTTProcess.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/mapred/ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/mapred/StatisticsCollectionHandler.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj?rev=1077582&r1=1077581&r2=1077582&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj Fri Mar 4 04:31:31 2011 @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.test. import org.apache.hadoop.mapreduce.test.system.JobInfo; import org.apache.hadoop.mapreduce.test.system.TTInfo; import org.apache.hadoop.mapreduce.test.system.TaskInfo; +import org.apache.hadoop.mapred.StatisticsCollectionHandler; /** * Aspect which injects the basic protocol functionality which is to be @@ -91,5 +92,25 @@ public aspect JTProtocolAspect { public String JTProtocol.getJobSummaryInfo(JobID jobId) throws IOException { return null; } + + public int JTProtocol.getTaskTrackerLevelStatistics(TaskTrackerStatus + ttStatus, String timePeriod, String totalTasksOrSucceededTasks) + throws IOException { + return 0; + } + + public int JTProtocol.getInfoFromAllClients(String timePeriod, + String totalTasksOrSucceededTasks) throws IOException { + return 0; + } + public StatisticsCollectionHandler JTProtocol. + getInfoFromAllClientsForAllTaskType() throws Exception { + return null; + } + + public int JTProtocol.getTaskTrackerHeartbeatInterval() + throws Exception { + return -1; + } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj?rev=1077582&r1=1077581&r2=1077582&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj Fri Mar 4 04:31:31 2011 @@ -21,6 +21,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; import java.util.List; import java.util.ArrayList; +import java.util.LinkedList; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -31,6 +32,9 @@ import org.apache.hadoop.mapred.Counters import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapred.TaskTrackerStatus; +import org.apache.hadoop.mapred.StatisticsCollector; +import org.apache.hadoop.mapred.StatisticsCollectionHandler; import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; import org.apache.hadoop.mapreduce.test.system.JTProtocol; import org.apache.hadoop.mapreduce.test.system.JobInfo; @@ -340,4 +344,177 @@ public privileged aspect JobTrackerAspec jobSummary.append(tracker.getClusterMetrics().getReduceSlotCapacity()); return jobSummary.toString(); } + + /** + * This gets the value of one task tracker window in the tasktracker page. + * + * @param TaskTrackerStatus, + * timePeriod and totalTasksOrSucceededTasks, which are requried to + * identify the window + * @return The number of tasks info in a particular window in + * tasktracker page. + */ + public int JobTracker.getTaskTrackerLevelStatistics( + TaskTrackerStatus ttStatus, String timePeriod, + String totalTasksOrSucceededTasks) throws IOException { + + LOG.info("ttStatus host :" + ttStatus.getHost()); + if (timePeriod.matches("since_start")) { + StatisticsCollector.TimeWindow window = getStatistics(). + collector.DEFAULT_COLLECT_WINDOWS[0]; + return(getNumberOfTasks(window, ttStatus , + totalTasksOrSucceededTasks)); + } else if (timePeriod.matches("last_day")) { + StatisticsCollector.TimeWindow window = getStatistics(). + collector.DEFAULT_COLLECT_WINDOWS[1]; + return(getNumberOfTasks(window, ttStatus, + totalTasksOrSucceededTasks)); + } else if (timePeriod.matches("last_hour")) { + StatisticsCollector.TimeWindow window = getStatistics(). + collector.DEFAULT_COLLECT_WINDOWS[2]; + return(getNumberOfTasks(window, ttStatus , + totalTasksOrSucceededTasks)); + } + return -1; + } + + /** + * Get Information for Time Period and TaskType box + * from all tasktrackers + * + * @param + * timePeriod and totalTasksOrSucceededTasks, which are requried to + * identify the window + * @return The total number of tasks info for a particular column in + * tasktracker page. + */ + public int JobTracker.getInfoFromAllClients(String timePeriod, + String totalTasksOrSucceededTasks) throws IOException { + + int totalTasksCount = 0; + int totalTasksRanForJob = 0; + for (TaskTracker tt : taskTrackers.values()) { + TaskTrackerStatus ttStatus = tt.getStatus(); + String tasktrackerName = ttStatus.getHost(); + List taskTrackerValues = new LinkedList(); + JobTrackerStatistics.TaskTrackerStat ttStat = getStatistics(). + getTaskTrackerStat(ttStatus.getTrackerName()); + int totalTasks = getTaskTrackerLevelStatistics( + ttStatus, timePeriod, totalTasksOrSucceededTasks); + totalTasksCount += totalTasks; + } + return totalTasksCount; + } + + private int JobTracker.getNumberOfTasks(StatisticsCollector.TimeWindow + window, TaskTrackerStatus ttStatus, String totalTasksOrSucceededTasks ) { + JobTrackerStatistics.TaskTrackerStat ttStat = getStatistics(). + getTaskTrackerStat(ttStatus.getTrackerName()); + if (totalTasksOrSucceededTasks.matches("total_tasks")) { + return ttStat.totalTasksStat.getValues(). + get(window).getValue(); + } else if (totalTasksOrSucceededTasks.matches("succeeded_tasks")) { + return ttStat.succeededTasksStat.getValues(). + get(window).getValue(); + } + return -1; + } + + /** + * This gets the value of all task trackers windows in the tasktracker page. + * + * @param none, + * @return StatisticsCollectionHandler class which holds the number + * of all jobs ran from all tasktrackers, in the sequence given below + * "since_start - total_tasks" + * "since_start - succeeded_tasks" + * "last_hour - total_tasks" + * "last_hour - succeeded_tasks" + * "last_day - total_tasks" + * "last_day - succeeded_tasks" + */ + public StatisticsCollectionHandler JobTracker. + getInfoFromAllClientsForAllTaskType() throws Exception { + + //The outer list will have a list of each tasktracker list. + //The inner list will have a list of all number of tasks in + //one tasktracker. + List> ttInfoList = new LinkedList>(); + + // Go through each tasktracker and get all the number of tasks + // six window's values of that tasktracker.Each window points to + // specific value for that tasktracker. + //"since_start - total_tasks" + //"since_start - succeeded_tasks" + //"last_hour - total_tasks" + //"last_hour - succeeded_tasks" + //"last_day - total_tasks" + //"last_day - succeeded_tasks" + + for (TaskTracker tt : taskTrackers.values()) { + TaskTrackerStatus ttStatus = tt.getStatus(); + String tasktrackerName = ttStatus.getHost(); + List taskTrackerValues = new LinkedList(); + JobTrackerStatistics.TaskTrackerStat ttStat = getStatistics(). + getTaskTrackerStat(ttStatus.getTrackerName()); + + int value; + int totalCount = 0; + for (int i = 0; i < 3; i++) { + StatisticsCollector.TimeWindow window = getStatistics(). + collector.DEFAULT_COLLECT_WINDOWS[i]; + value=0; + value = ttStat.totalTasksStat.getValues(). + get(window).getValue(); + taskTrackerValues.add(value); + value=0; + value = ttStat.succeededTasksStat.getValues(). + get(window).getValue(); + taskTrackerValues.add(value); + } + ttInfoList.add(taskTrackerValues); + } + + //The info is collected in the order described above by going + //through each tasktracker list + int totalInfoValues = 0; + StatisticsCollectionHandler statisticsCollectionHandler = + new StatisticsCollectionHandler(); + for (int i = 0; i < 6; i++) { + totalInfoValues = 0; + for (int j = 0; j < ttInfoList.size(); j++) { + List list = ttInfoList.get(j); + totalInfoValues += list.get(i); + } + switch (i) { + case 0: statisticsCollectionHandler. + setSinceStartTotalTasks(totalInfoValues); + break; + case 1: statisticsCollectionHandler. + setSinceStartSucceededTasks(totalInfoValues); + break; + case 2: statisticsCollectionHandler. + setLastHourTotalTasks(totalInfoValues); + break; + case 3: statisticsCollectionHandler. + setLastHourSucceededTasks(totalInfoValues); + break; + case 4: statisticsCollectionHandler. + setLastDayTotalTasks(totalInfoValues); + break; + case 5: statisticsCollectionHandler. + setLastDaySucceededTasks(totalInfoValues); + break; + } + } + return statisticsCollectionHandler; + } + + /* + * Get the Tasktrcker Heart beat interval + */ + public int JobTracker.getTaskTrackerHeartbeatInterval() + throws Exception { + return (getNextHeartbeatInterval()); + } } Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/StatisticsCollectorAspect.aj URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/StatisticsCollectorAspect.aj?rev=1077582&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/StatisticsCollectorAspect.aj (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/StatisticsCollectorAspect.aj Fri Mar 4 04:31:31 2011 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import org.apache.hadoop.mapred.StatisticsCollector.TimeWindow; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/* + * This class will change the number of jobs time windows + * of all task trackers
+ * Last Day time window will be changed from 24 hours to 2 minutes
+ * Last Hour time window will be changed from 1 hour to 1 minute
+ */ + +public privileged aspect StatisticsCollectorAspect { + + //last day is changed to 120 seconds instead of 24 hours, + //with a 10 seconds refresh rate + static final TimeWindow + LAST_DAY_ASPECT = new TimeWindow("Last Day", 2 * 60, 10); + + //last day is changed to 60 seconds instead of 1 hour, + //with a 10 seconds refresh rate + static final TimeWindow + LAST_HOUR_ASPECT = new TimeWindow("Last Hour", 60, 10); + + private static final Log LOG = LogFactory + .getLog(StatisticsCollectorAspect.class); + + pointcut createStatExecution(String name, TimeWindow[] window) : + call(* StatisticsCollector.createStat(String, TimeWindow[])) + && args(name, window); + + //This will change the timewindow to have last day and last hour changed. + before(String name, TimeWindow[] window) : createStatExecution(name, window) { + window[1] = LAST_DAY_ASPECT; + window[2] = LAST_HOUR_ASPECT; + } + +} Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java?rev=1077582&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java Fri Mar 4 04:31:31 2011 @@ -0,0 +1,631 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; +import org.apache.hadoop.mapreduce.test.system.JTProtocol; +import org.apache.hadoop.mapreduce.test.system.JobInfo; +import org.apache.hadoop.mapreduce.test.system.TaskInfo; +import org.apache.hadoop.mapreduce.test.system.MRCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.UtilsForTests; +import org.apache.hadoop.mapred.JobClient.NetworkedJob; +import org.apache.hadoop.examples.SleepJob; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.Path; +import testjar.GenerateTaskChildProcess; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +/** + * Verify the Task Tracker Info functionality. + */ + +public class TestTaskTrackerInfoSuccessfulFailedJobs { + + private static MRCluster cluster = null; + private static JobClient client = null; + static final Log LOG = LogFactory. + getLog(TestTaskTrackerInfoSuccessfulFailedJobs.class); + private static Configuration conf = null; + private static JTProtocol remoteJTClient = null; + + StatisticsCollectionHandler statisticsCollectionHandler = null; + int taskTrackerHeartBeatInterval = 0; + + public TestTaskTrackerInfoSuccessfulFailedJobs() throws Exception { + } + + @BeforeClass + public static void setUp() throws Exception { + cluster = MRCluster.createCluster(new Configuration()); + cluster.setUp(); + conf = new Configuration(cluster.getConf()); + conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); + remoteJTClient = cluster.getJTClient().getProxy(); + } + + @AfterClass + public static void tearDown() throws Exception { + cluster.tearDown(); + } + + @Test + /** + * This tests Task tracker summary information for + * since start - total tasks, successful tasks + * Last Hour - total tasks, successful tasks + * Last Day - total tasks, successful tasks + * It is checked for multiple job submissions. + * @param none + * @return void + */ + public void testTaskTrackerInfoAll() throws Exception { + + //This boolean will decide whether to run job again + boolean continueLoop = true; + + //counter for job Loop + int countLoop = 0; + + String jobTrackerUserName = remoteJTClient.getDaemonUser(); + + LOG.info("jobTrackerUserName is :" + jobTrackerUserName); + + //This counter will check for count of a loop, + //which might become infinite. + int count = 0; + + SleepJob job = new SleepJob(); + job.setConf(conf); + int totalMapTasks = 5; + int totalReduceTasks = 1; + conf = job.setupJobConf(totalMapTasks, totalReduceTasks, + 100, 100, 100, 100); + JobConf jconf = new JobConf(conf); + + count = 0; + //The last hour and last day are given 60 seconds and 120 seconds + //recreate values rate, replacing one hour and 1 day. Waiting for + //them to be ona just created stage when testacse starts. + while (remoteJTClient.getInfoFromAllClients("last_day","total_tasks") + != 0) { + count++; + UtilsForTests.waitFor(1000); + //If the count goes beyond a point, then break; This is to avoid + //infinite loop under unforeseen circumstances. Testcase will + //anyway fail later. + if (count > 180) { + Assert.fail("Since this value has not reached 0" + + "in more than 180 seconds. Failing at this point"); + } + } + + statisticsCollectionHandler = null; + statisticsCollectionHandler = remoteJTClient. + getInfoFromAllClientsForAllTaskType(); + + int totalTasksSinceStartBeforeJob = statisticsCollectionHandler. + getSinceStartTotalTasks(); + int succeededTasksSinceStartBeforeJob = statisticsCollectionHandler. + getSinceStartSucceededTasks(); + int totalTasksLastHourBeforeJob = statisticsCollectionHandler. + getLastHourTotalTasks(); + int succeededTasksLastHourBeforeJob = statisticsCollectionHandler. + getLastHourSucceededTasks(); + int totalTasksLastDayBeforeJob = statisticsCollectionHandler. + getLastDayTotalTasks(); + int succeededTasksLastDayBeforeJob = statisticsCollectionHandler. + getLastDaySucceededTasks(); + + //Submitting the job + RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf); + + JobInfo jInfo = remoteJTClient.getJobInfo(rJob.getID()); + LOG.info("jInfo is :" + jInfo); + + //Assert if jobInfo is null + Assert.assertNotNull("jobInfo is null", jInfo); + + count = 0; + LOG.info("Waiting till the job is completed..."); + while (jInfo != null && !jInfo.getStatus().isJobComplete()) { + UtilsForTests.waitFor(1000); + count++; + jInfo = remoteJTClient.getJobInfo(rJob.getID()); + //If the count goes beyond a point, then break; This is to avoid + //infinite loop under unforeseen circumstances. Testcase will + //anyway fail later. + if (count > 40) { + Assert.fail("job has not reached completed state for more" + + " than 400 seconds. Failing at this point"); + } + } + + //Waiting for 20 seconds to make sure that all the completed tasks + //are reflected in their corresponding Tasktracker boxes. + taskTrackerHeartBeatInterval = remoteJTClient. + getTaskTrackerHeartbeatInterval(); + + //Waiting for 6 times the Task tracker heart beat interval to + //account for network slowness, job tracker processing time + //after receiving the tasktracker updates etc. + UtilsForTests.waitFor(taskTrackerHeartBeatInterval * 6); + + statisticsCollectionHandler = null; + statisticsCollectionHandler = + remoteJTClient.getInfoFromAllClientsForAllTaskType(); + int totalTasksSinceStartAfterJob = statisticsCollectionHandler. + getSinceStartTotalTasks(); + int succeededTasksSinceStartAfterJob = statisticsCollectionHandler. + getSinceStartSucceededTasks(); + int totalTasksLastHourAfterJob = statisticsCollectionHandler. + getLastHourTotalTasks(); + int succeededTasksLastHourAfterJob = statisticsCollectionHandler. + getLastHourSucceededTasks(); + int totalTasksLastDayAfterJob = statisticsCollectionHandler. + getLastDayTotalTasks(); + int succeededTasksLastDayAfterJob = statisticsCollectionHandler. + getLastDaySucceededTasks(); + + int totalTasksForJob = (totalMapTasks + totalReduceTasks + 2); + + Assert.assertEquals("The number of total tasks, since start" + + " dont match", + (totalTasksSinceStartBeforeJob + totalTasksForJob), + totalTasksSinceStartAfterJob); + Assert.assertEquals("The number of succeeded tasks, " + + "since start dont match", + (succeededTasksSinceStartBeforeJob + totalTasksForJob), + succeededTasksSinceStartAfterJob); + + Assert.assertEquals("The number of total tasks, last hour" + + " dont match", + (totalTasksLastHourBeforeJob + totalTasksForJob), + totalTasksLastHourAfterJob); + Assert.assertEquals("The number of succeeded tasks, " + + "last hour dont match", + (succeededTasksLastHourBeforeJob + totalTasksForJob), + succeededTasksLastHourAfterJob); + + Assert.assertEquals("The number of total tasks, last day" + + " dont match", + (totalTasksLastDayBeforeJob + totalTasksForJob), + totalTasksLastDayAfterJob); + Assert.assertEquals("The number of succeeded tasks, " + + "since start dont match", + (succeededTasksLastDayBeforeJob + totalTasksForJob), + succeededTasksLastDayAfterJob); + } + + @Test + /** + * This tests Task tracker task killed + * summary information for + * since start - total tasks, successful tasks + * Last Hour - total tasks, successful tasks + * Last Day - total tasks, successful tasks + * It is checked for multiple job submissions. + * @param none + * @return void + */ + public void testTaskTrackerInfoKilled() throws Exception { + + //This boolean will decide whether to run job again + boolean continueLoop = true; + + //counter for job Loop + int countLoop = 0; + + TaskInfo taskInfo = null; + + String jobTrackerUserName = remoteJTClient.getDaemonUser(); + + LOG.info("jobTrackerUserName is :" + jobTrackerUserName); + + //This counter will check for count of a loop, + //which might become infinite. + int count = 0; + + SleepJob job = new SleepJob(); + job.setConf(conf); + int totalMapTasks = 5; + int totalReduceTasks = 1; + conf = job.setupJobConf(totalMapTasks, totalReduceTasks, + 100, 100, 100, 100); + JobConf jconf = new JobConf(conf); + + count = 0; + //The last hour and last day are given 60 seconds and 120 seconds + //recreate values rate, replacing one hour and 1 day. Waiting for + //them to be ona just created stage when testacse starts. + while (remoteJTClient.getInfoFromAllClients("last_day","total_tasks") + != 0) { + count++; + UtilsForTests.waitFor(1000); + //If the count goes beyond a point, then break; This is to avoid + //infinite loop under unforeseen circumstances. Testcase will + //anyway fail later. + if (count > 140) { + Assert.fail("Since this value has not reached 0" + + "in more than 140 seconds. Failing at this point"); + } + } + + statisticsCollectionHandler = null; + statisticsCollectionHandler = remoteJTClient. + getInfoFromAllClientsForAllTaskType(); + + int totalTasksSinceStartBeforeJob = statisticsCollectionHandler. + getSinceStartTotalTasks(); + int succeededTasksSinceStartBeforeJob = statisticsCollectionHandler. + getSinceStartSucceededTasks(); + int totalTasksLastHourBeforeJob = statisticsCollectionHandler. + getLastHourTotalTasks(); + int succeededTasksLastHourBeforeJob = statisticsCollectionHandler. + getLastHourSucceededTasks(); + int totalTasksLastDayBeforeJob = statisticsCollectionHandler. + getLastDayTotalTasks(); + int succeededTasksLastDayBeforeJob = statisticsCollectionHandler. + getLastDaySucceededTasks(); + + //Submitting the job + RunningJob rJob = cluster.getJTClient().getClient(). + submitJob(jconf); + + JobInfo jInfo = remoteJTClient.getJobInfo(rJob.getID()); + LOG.info("jInfo is :" + jInfo); + + count = 0; + while (count < 60) { + if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) { + break; + } else { + UtilsForTests.waitFor(1000); + jInfo = remoteJTClient.getJobInfo(rJob.getID()); + } + count++; + } + Assert.assertTrue("Job has not been started for 1 min.", + count != 60); + + //Assert if jobInfo is null + Assert.assertNotNull("jobInfo is null", jInfo); + + TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(rJob.getID()); + for (TaskInfo taskinfo : taskInfos) { + if (!taskinfo.isSetupOrCleanup()) { + taskInfo = taskinfo; + } + } + + count = 0; + taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID()); + while (count < 60) { + if (taskInfo.getTaskStatus().length > 0) { + if (taskInfo.getTaskStatus()[0].getRunState() + == TaskStatus.State.RUNNING) { + break; + } + } + UtilsForTests.waitFor(1000); + taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID()); + count++; + } + + Assert.assertTrue("Task has not been started for 1 min.", + count != 60); + + NetworkedJob networkJob = (cluster.getJTClient().getClient()).new + NetworkedJob(jInfo.getStatus()); + TaskID tID = TaskID.downgrade(taskInfo.getTaskID()); + TaskAttemptID taskAttID = new TaskAttemptID(tID , 0); + networkJob.killTask(taskAttID, false); + + count = 0; + LOG.info("Waiting till the job is completed..."); + while (!jInfo.getStatus().isJobComplete()) { + UtilsForTests.waitFor(1000); + count++; + jInfo = remoteJTClient.getJobInfo(rJob.getID()); + //If the count goes beyond a point, then break; This is to avoid + //infinite loop under unforeseen circumstances. Testcase will + //anyway fail later. + if (count > 40) { + Assert.fail("job has not reached completed state for more" + + " than 400 seconds. Failing at this point"); + } + } + + //Waiting for 20 seconds to make sure that all the completed tasks + //are reflected in their corresponding Tasktracker boxes. + taskTrackerHeartBeatInterval = remoteJTClient. + getTaskTrackerHeartbeatInterval(); + + //Waiting for 6 times the Task tracker heart beat interval to + //account for network slowness, job tracker processing time + //after receiving the tasktracker updates etc. + UtilsForTests.waitFor(taskTrackerHeartBeatInterval * 6); + + statisticsCollectionHandler = null; + statisticsCollectionHandler = + remoteJTClient.getInfoFromAllClientsForAllTaskType(); + int totalTasksSinceStartAfterJob = statisticsCollectionHandler. + getSinceStartTotalTasks(); + int succeededTasksSinceStartAfterJob = statisticsCollectionHandler. + getSinceStartSucceededTasks(); + int totalTasksLastHourAfterJob = statisticsCollectionHandler. + getLastHourTotalTasks(); + int succeededTasksLastHourAfterJob = statisticsCollectionHandler. + getLastHourSucceededTasks(); + int totalTasksLastDayAfterJob = statisticsCollectionHandler. + getLastDayTotalTasks(); + int succeededTasksLastDayAfterJob = statisticsCollectionHandler. + getLastDaySucceededTasks(); + + //Total tasks expected is setup, Cleanup + totalMapTasks + //+ totalReduceTasks + int totalTasksForJob = (totalMapTasks + totalReduceTasks + 2); + + //The total tasks will be equal to the totalTasksSinceStartBeforeJob + // + totalTasksFor present Job + 1 more task which was killed. + //This kiled task will be re-attempted by the job tracker and would have + //rerun in another tasktracker and would have completed successfully, + //which is captured in totalTasksForJob + Assert.assertEquals("The number of total tasks, since start" + + " dont match", + (totalTasksSinceStartBeforeJob + totalTasksForJob + 1), + totalTasksSinceStartAfterJob ); + Assert.assertEquals("The number of succeeded tasks, " + + "since start dont match", + (succeededTasksSinceStartBeforeJob + totalTasksForJob), + succeededTasksSinceStartAfterJob); + + Assert.assertEquals("The number of total tasks, last hour" + + " dont match", + (totalTasksLastHourBeforeJob + totalTasksForJob + 1), + totalTasksLastHourAfterJob); + Assert.assertEquals("The number of succeeded tasks, " + + "last hour dont match", + (succeededTasksLastHourBeforeJob + totalTasksForJob), + succeededTasksLastHourAfterJob); + + Assert.assertEquals("The number of total tasks, last day" + + " dont match", + (totalTasksLastDayBeforeJob + totalTasksForJob + 1), + totalTasksLastDayAfterJob); + Assert.assertEquals("The number of succeeded tasks, " + + "since start dont match", + (succeededTasksLastDayBeforeJob + totalTasksForJob), + succeededTasksLastDayAfterJob); + + } + + @Test + /** + * This tests Task tracker task failure + * summary information for + * since start - total tasks, successful tasks + * Last Hour - total tasks, successful tasks + * Last Day - total tasks, successful tasks + * @param none + * @return void + */ + public void testTaskTrackerInfoTaskFailure() throws Exception { + + //This boolean will decide whether to run job again + boolean continueLoop = true; + + //counter for job Loop + int countLoop = 0; + + TaskInfo taskInfo = null; + + String jobTrackerUserName = remoteJTClient.getDaemonUser(); + + LOG.info("jobTrackerUserName is :" + jobTrackerUserName); + + //This counter will check for count of a loop, + //which might become infinite. + int count = 0; + + Configuration conf = new Configuration(cluster.getConf()); + conf.setBoolean("mapreduce.map.output.compress", false); + conf.set("mapred.map.output.compression.codec", + "org.apache.hadoop.io.compress.DefaultCodec"); + JobConf jconf = new JobConf(conf); + Path inputDir = new Path("input"); + Path outputDir = new Path("output"); + cleanup(inputDir, conf); + cleanup(outputDir, conf); + + createInput(inputDir, conf); + jconf.setJobName("Task Failed job"); + jconf.setJarByClass(UtilsForTests.class); + jconf.setMapperClass(GenerateTaskChildProcess.FailedMapper.class); + jconf.setNumMapTasks(1); + jconf.setNumReduceTasks(0); + jconf.setMaxMapAttempts(1); + FileInputFormat.setInputPaths(jconf, inputDir); + FileOutputFormat.setOutputPath(jconf, outputDir); + + count = 0; + //The last hour and last day are given 60 seconds and 120 seconds + //recreate values rate, replacing one hour and 1 day. Waiting for + //them to be ona just created stage when testacse starts. + while (remoteJTClient.getInfoFromAllClients("last_day","total_tasks") + != 0) { + count++; + UtilsForTests.waitFor(1000); + //If the count goes beyond a point, then break; This is to avoid + //infinite loop under unforeseen circumstances. Testcase will + //anyway fail later. + if (count > 140) { + Assert.fail("Since this value has not reached 0" + + "in more than 140 seconds. Failing at this point"); + } + } + + statisticsCollectionHandler = null; + statisticsCollectionHandler = remoteJTClient. + getInfoFromAllClientsForAllTaskType(); + + int totalTasksSinceStartBeforeJob = statisticsCollectionHandler. + getSinceStartTotalTasks(); + int succeededTasksSinceStartBeforeJob = statisticsCollectionHandler. + getSinceStartSucceededTasks(); + int totalTasksLastHourBeforeJob = statisticsCollectionHandler. + getLastHourTotalTasks(); + int succeededTasksLastHourBeforeJob = statisticsCollectionHandler. + getLastHourSucceededTasks(); + int totalTasksLastDayBeforeJob = statisticsCollectionHandler. + getLastDayTotalTasks(); + int succeededTasksLastDayBeforeJob = statisticsCollectionHandler. + getLastDaySucceededTasks(); + + RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf); + JobID id = rJob.getID(); + JobInfo jInfo = remoteJTClient.getJobInfo(id); + + LOG.info("jInfo is :" + jInfo); + + count = 0; + while (count < 60) { + if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) { + break; + } else { + UtilsForTests.waitFor(1000); + jInfo = remoteJTClient.getJobInfo(rJob.getID()); + } + count++; + } + Assert.assertTrue("Job has not been started for 1 min.", + count != 60); + + //Assert if jobInfo is null + Assert.assertNotNull("jobInfo is null", jInfo); + + count = 0; + LOG.info("Waiting till the job is completed..."); + while ( jInfo != null && (!jInfo.getStatus().isJobComplete())) { + UtilsForTests.waitFor(1000); + count++; + jInfo = remoteJTClient.getJobInfo(id); + //If the count goes beyond a point, then break; This is to avoid + //infinite loop under unforeseen circumstances. Testcase will + //anyway fail later. + if (count > 40) { + Assert.fail("job has not reached completed state for more" + + " than 400 seconds. Failing at this point"); + } + } + + //Waiting for 20 seconds to make sure that all the completed tasks + //are reflected in their corresponding Tasktracker boxes. + taskTrackerHeartBeatInterval = remoteJTClient. + getTaskTrackerHeartbeatInterval(); + + //Waiting for 6 times the Task tracker heart beat interval to + //account for network slowness, job tracker processing time + //after receiving the tasktracker updates etc. + UtilsForTests.waitFor(taskTrackerHeartBeatInterval * 6); + + statisticsCollectionHandler = null; + statisticsCollectionHandler = + remoteJTClient.getInfoFromAllClientsForAllTaskType(); + int totalTasksSinceStartAfterJob = statisticsCollectionHandler. + getSinceStartTotalTasks(); + int succeededTasksSinceStartAfterJob = statisticsCollectionHandler. + getSinceStartSucceededTasks(); + int totalTasksLastHourAfterJob = statisticsCollectionHandler. + getLastHourTotalTasks(); + int succeededTasksLastHourAfterJob = statisticsCollectionHandler. + getLastHourSucceededTasks(); + int totalTasksLastDayAfterJob = statisticsCollectionHandler. + getLastDayTotalTasks(); + int succeededTasksLastDayAfterJob = statisticsCollectionHandler. + getLastDaySucceededTasks(); + + //1 map running 4 times before failure, plus sometimes two failures + //which are not captured in Job summary, but caught in + //tasktracker summary. + //0 reduces, setup and cleanup + int totalTasksForJob = 4; + + Assert.assertTrue("The number of total tasks, since start" + + " dont match", (totalTasksSinceStartAfterJob >= + totalTasksSinceStartBeforeJob + totalTasksForJob)); + + Assert.assertTrue("The number of succeeded tasks, " + + "since start dont match", + (succeededTasksSinceStartAfterJob >= + succeededTasksSinceStartBeforeJob)); + + Assert.assertTrue("The number of total tasks, last hour" + + " dont match", (totalTasksLastHourAfterJob >= + totalTasksLastHourBeforeJob + totalTasksForJob)); + Assert.assertTrue("The number of succeeded tasks, " + + "last hour dont match", (succeededTasksLastHourAfterJob >= + succeededTasksLastHourBeforeJob)); + + Assert.assertTrue("The number of total tasks, last day" + + " dont match", totalTasksLastDayAfterJob >= + totalTasksLastDayBeforeJob + totalTasksForJob); + Assert.assertTrue("The number of succeeded tasks, " + + "since start dont match", succeededTasksLastDayAfterJob >= + succeededTasksLastDayBeforeJob); + } + + //This creates the input directories in the dfs + private void createInput(Path inDir, Configuration conf) throws + IOException { + String input = "Hadoop is framework for data intensive distributed " + + "applications.\n" + + "Hadoop enables applications to work with thousands of nodes."; + FileSystem fs = inDir.getFileSystem(conf); + if (!fs.mkdirs(inDir)) { + throw new IOException("Failed to create the input directory:" + + inDir.toString()); + } + fs.setPermission(inDir, new FsPermission(FsAction.ALL, + FsAction.ALL, FsAction.ALL)); + DataOutputStream file = fs.create(new Path(inDir, "data.txt")); + int i = 0; + while(i < 1000 * 3000) { + file.writeBytes(input); + i++; + } + file.close(); + } + + //This cleans up the specified directories in the dfs + private void cleanup(Path dir, Configuration conf) throws + IOException { + FileSystem fs = dir.getFileSystem(conf); + fs.delete(dir, true); + } +} Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoTTProcess.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoTTProcess.java?rev=1077582&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoTTProcess.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoTTProcess.java Fri Mar 4 04:31:31 2011 @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.List; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; +import org.apache.hadoop.mapreduce.test.system.JTProtocol; +import org.apache.hadoop.mapreduce.test.system.TTClient; +import org.apache.hadoop.mapreduce.test.system.JobInfo; +import org.apache.hadoop.mapreduce.test.system.TaskInfo; +import org.apache.hadoop.mapreduce.test.system.MRCluster; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.UtilsForTests; +import org.apache.hadoop.examples.SleepJob; +import org.apache.hadoop.mapred.StatisticsCollectionHandler; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +/** + * Verify the Task Tracker Info functionality. + */ + +public class TestTaskTrackerInfoTTProcess { + + private static MRCluster cluster = null; + private static JobClient client = null; + static final Log LOG = LogFactory. + getLog(TestTaskTrackerInfoTTProcess.class); + private static Configuration conf = null; + private static JTProtocol remoteJTClient = null; + private static String confFile = "mapred-site.xml"; + int taskTrackerHeartBeatInterval; + StatisticsCollectionHandler statisticsCollectionHandler = null; + + public TestTaskTrackerInfoTTProcess() throws Exception { + } + + @BeforeClass + public static void setUp() throws Exception { + cluster = MRCluster.createCluster(new Configuration()); + cluster.setUp(); + conf = new Configuration(cluster.getConf()); + conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); + remoteJTClient = cluster.getJTClient().getProxy(); + } + + @AfterClass + public static void tearDown() throws Exception { + cluster.tearDown(); + } + + @Test + /** + * This tests Task tracker info when a job with 0 maps and 0 reduces run + * summary information for + * since start - total tasks, successful tasks + * Last Hour - total tasks, successful tasks + * Last Day - total tasks, successful tasks + * It is checked for multiple job submissions. + * @param none + * @return void + */ + public void testTaskTrackerInfoZeroMapsZeroReduces() throws Exception { + + TaskInfo taskInfo = null; + + String jobTrackerUserName = remoteJTClient.getDaemonUser(); + + LOG.info("jobTrackerUserName is :" + jobTrackerUserName); + + int count = 0; + + SleepJob job = new SleepJob(); + job.setConf(conf); + int totalMapTasks = 0; + int totalReduceTasks = 0; + conf = job.setupJobConf(totalMapTasks, totalReduceTasks, + 100, 100, 100, 100); + JobConf jconf = new JobConf(conf); + + count = 0; + //The last hour and last day are given 60 seconds and 120 seconds + //recreate values rate, replacing one hour and 1 day. Waiting for + //them to be ona just created stage when testacse starts. + while (remoteJTClient.getInfoFromAllClients("last_day","total_tasks") + != 0) { + count++; + UtilsForTests.waitFor(1000); + //If the count goes beyond a point, then break; This is to avoid + //infinite loop under unforeseen circumstances. Testcase will + //anyway fail later. + if (count > 140) { + Assert.fail("Since this value has not reached 0" + + "in more than 140 seconds. Failing at this point"); + } + } + + statisticsCollectionHandler = remoteJTClient. + getInfoFromAllClientsForAllTaskType(); + + int totalTasksSinceStartBeforeJob = statisticsCollectionHandler. + getSinceStartTotalTasks(); + int succeededTasksSinceStartBeforeJob = statisticsCollectionHandler. + getSinceStartSucceededTasks(); + int totalTasksLastHourBeforeJob = statisticsCollectionHandler. + getLastHourTotalTasks(); + int succeededTasksLastHourBeforeJob = statisticsCollectionHandler. + getLastHourSucceededTasks(); + int totalTasksLastDayBeforeJob = statisticsCollectionHandler. + getLastDayTotalTasks(); + int succeededTasksLastDayBeforeJob = statisticsCollectionHandler. + getLastDaySucceededTasks(); + + //Submitting the job + RunningJob rJob = cluster.getJTClient().getClient(). + submitJob(jconf); + + JobInfo jInfo = remoteJTClient.getJobInfo(rJob.getID()); + LOG.info("jInfo is :" + jInfo); + + count = 0; + while (count < 60) { + if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) { + break; + } else { + UtilsForTests.waitFor(1000); + jInfo = remoteJTClient.getJobInfo(rJob.getID()); + } + count++; + } + Assert.assertTrue("Job has not been started for 1 min.", + count != 60); + + //Assert if jobInfo is null + Assert.assertNotNull("jobInfo is null", jInfo); + + count = 0; + LOG.info("Waiting till the job is completed..."); + while (!jInfo.getStatus().isJobComplete()) { + UtilsForTests.waitFor(1000); + count++; + jInfo = remoteJTClient.getJobInfo(rJob.getID()); + //If the count goes beyond a point, then break; This is to avoid + //infinite loop under unforeseen circumstances. Testcase will + //anyway fail later. + if (count > 40) { + Assert.fail("job has not reached completed state for more" + + " than 400 seconds. Failing at this point"); + } + } + + //Waiting for 20 seconds to make sure that all the completed tasks + //are reflected in their corresponding Tasktracker boxes. + taskTrackerHeartBeatInterval = remoteJTClient. + getTaskTrackerHeartbeatInterval(); + + //Waiting for 6 times the Task tracker heart beat interval to + //account for network slowness, job tracker processing time + //after receiving the tasktracker updates etc. + UtilsForTests.waitFor(taskTrackerHeartBeatInterval * 6); + + statisticsCollectionHandler = null; + statisticsCollectionHandler = + remoteJTClient.getInfoFromAllClientsForAllTaskType(); + int totalTasksSinceStartAfterJob = statisticsCollectionHandler. + getSinceStartTotalTasks(); + int succeededTasksSinceStartAfterJob = statisticsCollectionHandler. + getSinceStartSucceededTasks(); + int totalTasksLastHourAfterJob = statisticsCollectionHandler. + getLastHourTotalTasks(); + int succeededTasksLastHourAfterJob = statisticsCollectionHandler. + getLastHourSucceededTasks(); + int totalTasksLastDayAfterJob = statisticsCollectionHandler. + getLastDayTotalTasks(); + int succeededTasksLastDayAfterJob = statisticsCollectionHandler. + getLastDaySucceededTasks(); + + int totalTasksForJob = (totalMapTasks + totalReduceTasks + 2); + + Assert.assertEquals("The number of total tasks, since start" + + " dont match", + (totalTasksSinceStartBeforeJob + totalTasksForJob), + totalTasksSinceStartAfterJob ); + Assert.assertEquals("The number of succeeded tasks, " + + "since start dont match", + (succeededTasksSinceStartBeforeJob + totalTasksForJob), + succeededTasksSinceStartAfterJob); + + Assert.assertEquals("The number of total tasks, last hour" + + " dont match", + (totalTasksLastHourBeforeJob + totalTasksForJob), + totalTasksLastHourAfterJob); + Assert.assertEquals("The number of succeeded tasks, " + + "last hour dont match", + (succeededTasksLastHourBeforeJob + totalTasksForJob), + succeededTasksLastHourAfterJob); + + Assert.assertEquals("The number of total tasks, last day" + + " dont match", + (totalTasksLastDayBeforeJob + totalTasksForJob), + totalTasksLastDayAfterJob); + Assert.assertEquals("The number of succeeded tasks, " + + "since start dont match", + (succeededTasksLastDayBeforeJob + totalTasksForJob), + succeededTasksLastDayAfterJob); + } + + @Test + /** + * This tests Task tracker info when tasktracker is suspended/killed + * and then the process comes alive again + * summary information for + * since start - total tasks, successful tasks + * Last Hour - total tasks, successful tasks + * Last Day - total tasks, successful tasks + * It is checked for multiple job submissions. + * @param none + * @return void + */ + public void testTaskTrackerInfoTaskTrackerSuspend() throws Exception { + + TaskInfo taskInfo = null; + + String jobTrackerUserName = remoteJTClient.getDaemonUser(); + + LOG.info("jobTrackerUserName is :" + jobTrackerUserName); + + int count = 0; + + SleepJob job = new SleepJob(); + job.setConf(conf); + int totalMapTasks = 5; + int totalReduceTasks = 1; + conf = job.setupJobConf(totalMapTasks, totalReduceTasks, + 100, 100, 100, 100); + JobConf jconf = new JobConf(conf); + + //The last hour and last day are given 60 seconds and 120 seconds + //recreate values rate, replacing one hour and 1 day. Waiting for + //them to be ona just created stage when testacse starts. + while (remoteJTClient.getInfoFromAllClients("last_day","total_tasks") + != 0) { + count++; + UtilsForTests.waitFor(1000); + //If the count goes beyond a point, then break; This is to avoid + //infinite loop under unforeseen circumstances. Testcase will + //anyway fail later. + if (count > 140) { + Assert.fail("Since this value has not reached 0" + + "in more than 140 seconds. Failing at this point"); + } + } + + statisticsCollectionHandler = remoteJTClient. + getInfoFromAllClientsForAllTaskType(); + + int totalTasksSinceStartBeforeJob = statisticsCollectionHandler. + getSinceStartTotalTasks(); + int succeededTasksSinceStartBeforeJob = statisticsCollectionHandler. + getSinceStartSucceededTasks(); + int totalTasksLastHourBeforeJob = statisticsCollectionHandler. + getLastHourTotalTasks(); + int succeededTasksLastHourBeforeJob = statisticsCollectionHandler. + getLastHourSucceededTasks(); + int totalTasksLastDayBeforeJob = statisticsCollectionHandler. + getLastDayTotalTasks(); + int succeededTasksLastDayBeforeJob = statisticsCollectionHandler. + getLastDaySucceededTasks(); + + //Submitting the job + RunningJob rJob = cluster.getJTClient().getClient(). + submitJob(jconf); + + JobInfo jInfo = remoteJTClient.getJobInfo(rJob.getID()); + LOG.info("jInfo is :" + jInfo); + + count = 0; + while (count < 60) { + if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) { + break; + } else { + UtilsForTests.waitFor(1000); + jInfo = remoteJTClient.getJobInfo(rJob.getID()); + } + count++; + } + Assert.assertTrue("Job has not been started for 1 min.", + count != 60); + + //Assert if jobInfo is null + Assert.assertNotNull("jobInfo is null", jInfo); + + TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(rJob.getID()); + for (TaskInfo taskinfo : taskInfos) { + if (!taskinfo.isSetupOrCleanup()) { + taskInfo = taskinfo; + break; + } + } + + TTClient ttClient = cluster.getTTClientInstance(taskInfo); + String pid = null; + ttClient.kill(); + ttClient.waitForTTStop(); + ttClient.start(); + ttClient.waitForTTStart(); + + count = 0; + LOG.info("Waiting till the job is completed..."); + while (!jInfo.getStatus().isJobComplete()) { + UtilsForTests.waitFor(1000); + count++; + jInfo = remoteJTClient.getJobInfo(rJob.getID()); + //If the count goes beyond a point, then break; This is to avoid + //infinite loop under unforeseen circumstances. Testcase will + //anyway fail later. + if (count > 40) { + Assert.fail("job has not reached completed state for more" + + " than 400 seconds. Failing at this point"); + } + } + + //Waiting for 20 seconds to make sure that all the completed tasks + //are reflected in their corresponding Tasktracker boxes. + taskTrackerHeartBeatInterval = remoteJTClient. + getTaskTrackerHeartbeatInterval(); + + //Waiting for 6 times the Task tracker heart beat interval to + //account for network slowness, job tracker processing time + //after receiving the tasktracker updates etc. + UtilsForTests.waitFor(taskTrackerHeartBeatInterval * 6); + + statisticsCollectionHandler = null; + statisticsCollectionHandler = + remoteJTClient.getInfoFromAllClientsForAllTaskType(); + int totalTasksSinceStartAfterJob = statisticsCollectionHandler. + getSinceStartTotalTasks(); + int succeededTasksSinceStartAfterJob = statisticsCollectionHandler. + getSinceStartSucceededTasks(); + int totalTasksLastHourAfterJob = statisticsCollectionHandler. + getLastHourTotalTasks(); + int succeededTasksLastHourAfterJob = statisticsCollectionHandler. + getLastHourSucceededTasks(); + int totalTasksLastDayAfterJob = statisticsCollectionHandler. + getLastDayTotalTasks(); + int succeededTasksLastDayAfterJob = statisticsCollectionHandler. + getLastDaySucceededTasks(); + + int totalTasksForJob = (totalMapTasks + totalReduceTasks + 2); + + Assert.assertTrue("The number of total tasks, since start" + + " dont match", + (totalTasksSinceStartAfterJob >= succeededTasksSinceStartBeforeJob + + totalTasksForJob)); + + Assert.assertTrue("The number of succeeded tasks, " + + "since start dont match", + (succeededTasksSinceStartAfterJob >= succeededTasksSinceStartBeforeJob + + totalTasksForJob)); + + Assert.assertTrue("The number of total tasks, last hour" + + " dont match", + ( totalTasksLastHourAfterJob >= totalTasksLastHourBeforeJob + + totalTasksForJob)); + + Assert.assertTrue("The number of succeeded tasks, " + + "last hour dont match", + (succeededTasksLastHourAfterJob >= succeededTasksLastHourBeforeJob + + totalTasksForJob)); + + Assert.assertTrue("The number of total tasks, last day" + + " dont match", + (totalTasksLastDayAfterJob >= totalTasksLastDayBeforeJob + + totalTasksForJob)); + + Assert.assertTrue("The number of succeeded tasks, " + + "since start dont match", + (succeededTasksLastDayAfterJob >= succeededTasksLastDayBeforeJob + + totalTasksForJob)); + } +} Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java?rev=1077582&r1=1077581&r2=1077582&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java Fri Mar 4 04:31:31 2011 @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.test.system.DaemonProtocol; +import org.apache.hadoop.mapred.TaskTrackerStatus; +import org.apache.hadoop.mapred.StatisticsCollectionHandler; /** * Client side API's exposed from JobTracker. @@ -144,4 +146,44 @@ public interface JTProtocol extends Daem * @throws IOException if any I/O error occurs. */ public String getJobSummaryInfo(JobID jobId) throws IOException; + + + /** + * This gets the value of one task tracker window in the tasktracker page. + * + * @param TaskTrackerStatus, + * timePeriod and totalTasksOrSucceededTasks, which are requried to + * identify the window + * @return value of one task in a single Job tracker window + */ + public int getTaskTrackerLevelStatistics(TaskTrackerStatus ttStatus, + String timePeriod, String totalTasksOrSucceededTasks) + throws IOException; + + /** + * This gets the value of all task trackers windows in the tasktracker page. + * + * @param none, + * return a object which returns all the tasktracker info + */ + public StatisticsCollectionHandler getInfoFromAllClientsForAllTaskType() + throws Exception; + + /** + * Get Information for Time Period and TaskType box + * from all tasktrackers + * @param + * timePeriod and totalTasksOrSucceededTasks, which are requried to + * identify the window + * @return The total number of tasks info for a particular column in + * tasktracker page. + */ + public int getInfoFromAllClients(String timePeriod, + String totalTasksOrSucceededTasks) throws IOException; + + /** + * This gets the value of all task trackers windows in the tasktracker page. + */ + public int getTaskTrackerHeartbeatInterval() throws Exception; + } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java?rev=1077582&r1=1077581&r2=1077582&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java Fri Mar 4 04:31:31 2011 @@ -37,6 +37,7 @@ import org.apache.hadoop.test.system.pro import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.TaskID; import java.util.Collection; +import org.apache.hadoop.mapred.UtilsForTests; /** * Concrete AbstractDaemonCluster representing a Map-Reduce cluster. @@ -203,4 +204,34 @@ public class MRCluster extends AbstractD super(mrDaemonInfos); } } + + /** + * Get a TTClient Instance from a running task
+ * @param Task Information of the running task + * @return TTClient instance + * @throws IOException + */ + public TTClient getTTClientInstance(TaskInfo taskInfo) + throws IOException { + JTProtocol remoteJTClient = getJTClient().getProxy(); + String [] taskTrackers = taskInfo.getTaskTrackers(); + int counter = 0; + TTClient ttClient = null; + while (counter < 60) { + if (taskTrackers.length != 0) { + break; + } + UtilsForTests.waitFor(100); + taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID()); + taskTrackers = taskInfo.getTaskTrackers(); + counter ++; + } + if ( taskTrackers.length != 0 ) { + String hostName = taskTrackers[0].split("_")[1]; + hostName = hostName.split(":")[0]; + ttClient = getTTClient(hostName); + } + return ttClient; + } + } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java?rev=1077582&r1=1077581&r2=1077582&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java Fri Mar 4 04:31:31 2011 @@ -27,6 +27,8 @@ import org.apache.hadoop.mapred.JobTrack import org.apache.hadoop.mapred.TaskTrackerStatus; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.test.system.process.RemoteProcess; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapred.UtilsForTests; @@ -40,6 +42,7 @@ import org.apache.hadoop.mapred.UtilsFor public class TTClient extends MRDaemonClient { TTProtocol proxy; + static final Log LOG = LogFactory.getLog(TTClient.class); public TTClient(Configuration conf, RemoteProcess daemon) throws IOException { @@ -114,4 +117,45 @@ public class TTClient extends MRDaemonCl return (counter != 60)? true : false; } + /** + * Waits till this Tasktracker daemon process is stopped
+ * + * @return void + * @throws IOException + */ + public void waitForTTStop() throws IOException { + LOG.info("Waiting for Tasktracker:" + getHostName() + + " to stop....."); + while (true) { + try { + ping(); + LOG.debug(getHostName() +" is waiting state to stop."); + UtilsForTests.waitFor(10000); + } catch (Exception exp) { + LOG.info("TaskTracker : " + getHostName() + " is stopped..."); + break; + } + } + } + + /** + * Waits till this Tasktracker daemon process is started
+ * + * @return void + * @throws IOException + */ + public void waitForTTStart() throws + IOException { + LOG.debug("Waiting for Tasktracker:" + getHostName() + " to come up."); + while (true) { + try { + ping(); + LOG.debug("TaskTracker : " + getHostName() + " is pinging..."); + break; + } catch (Exception exp) { + LOG.info(getHostName() + " is waiting to come up."); + UtilsForTests.waitFor(10000); + } + } + } } Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/mapred/StatisticsCollectionHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/mapred/StatisticsCollectionHandler.java?rev=1077582&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/mapred/StatisticsCollectionHandler.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/mapred/StatisticsCollectionHandler.java Fri Mar 4 04:31:31 2011 @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/* + * Object which stores all the tasktracker info + */ +public class StatisticsCollectionHandler implements Writable{ + + private int sinceStartTotalTasks = 0; + private int sinceStartSucceededTasks = 0; + private int lastHourTotalTasks = 0; + private int lastHourSucceededTasks = 0; + private int lastDayTotalTasks = 0; + private int lastDaySucceededTasks = 0; + + public int getSinceStartTotalTasks() { + return sinceStartTotalTasks; + } + + public int getSinceStartSucceededTasks() { + return sinceStartSucceededTasks; + } + + public int getLastHourTotalTasks() { + return lastHourTotalTasks; + } + + public int getLastHourSucceededTasks() { + return lastHourSucceededTasks; + } + + public int getLastDayTotalTasks() { + return lastDayTotalTasks; + } + + public int getLastDaySucceededTasks() { + return lastDaySucceededTasks; + } + + public void setSinceStartTotalTasks(int value) { + sinceStartTotalTasks = value; + } + + public void setSinceStartSucceededTasks(int value) { + sinceStartSucceededTasks = value; + } + + public void setLastHourTotalTasks(int value) { + lastHourTotalTasks = value; + } + + public void setLastHourSucceededTasks(int value) { + lastHourSucceededTasks = value; + } + + public void setLastDayTotalTasks(int value) { + lastDayTotalTasks = value; + } + + public void setLastDaySucceededTasks(int value) { + lastDaySucceededTasks = value; + } + + @Override + public void readFields(DataInput in) throws IOException { + sinceStartTotalTasks = WritableUtils.readVInt(in); + sinceStartSucceededTasks = WritableUtils.readVInt(in); + lastHourTotalTasks = WritableUtils.readVInt(in); + lastHourSucceededTasks = WritableUtils.readVInt(in); + lastDayTotalTasks = WritableUtils.readVInt(in); + lastDaySucceededTasks = WritableUtils.readVInt(in); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, sinceStartTotalTasks); + WritableUtils.writeVInt(out, sinceStartSucceededTasks); + WritableUtils.writeVInt(out, lastHourTotalTasks); + WritableUtils.writeVInt(out, lastHourSucceededTasks); + WritableUtils.writeVInt(out, lastDayTotalTasks); + WritableUtils.writeVInt(out, lastDaySucceededTasks); + } + +} +