Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 73563 invoked from network); 4 Mar 2011 04:20:49 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 04:20:49 -0000 Received: (qmail 76123 invoked by uid 500); 4 Mar 2011 04:20:49 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 76093 invoked by uid 500); 4 Mar 2011 04:20:49 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 76078 invoked by uid 99); 4 Mar 2011 04:20:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:20:49 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:20:47 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B777723888CE; Fri, 4 Mar 2011 04:20:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077492 - /hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java Date: Fri, 04 Mar 2011 04:20:27 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304042027.B777723888CE@eris.apache.org> Author: omalley Date: Fri Mar 4 04:20:27 2011 New Revision: 1077492 URL: http://svn.apache.org/viewvc?rev=1077492&view=rev Log: commit 79c3eabb8f0ea7fc174e814e51cf9663fb2b1a70 Author: Vinay Kumar Thota Date: Fri Jun 4 09:18:17 2010 +0000 MAPREDUCE:1731 from https://issues.apache.org/jira/secure/attachment/12444915/1731-ydist-security.patch Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java?rev=1077492&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java Fri Mar 4 04:20:27 2011 @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.junit.Test; +import org.junit.Assert; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + +import java.util.Collection; +import java.util.Hashtable; + +import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction; +import org.apache.hadoop.mapreduce.test.system.JTProtocol; +import org.apache.hadoop.mapreduce.test.system.JobInfo; +import org.apache.hadoop.mapreduce.test.system.MRCluster; +import org.apache.hadoop.mapreduce.test.system.TTClient; +import org.apache.hadoop.mapreduce.test.system.JTClient; +import org.apache.hadoop.mapreduce.test.system.TTProtocol; +import org.apache.hadoop.mapreduce.test.system.TTTaskInfo; +import org.apache.hadoop.mapreduce.test.system.TaskInfo; +import testjar.GenerateTaskChildProcess; + +public class TestChildsKillingOfSuspendTask { + private static final Log LOG = LogFactory + .getLog(TestChildsKillingOfSuspendTask.class); + private static Configuration conf = new Configuration(); + private static MRCluster cluster; + private static Path inputDir = new Path("input"); + private static Path outputDir = new Path("output"); + private static String confFile = "mapred-site.xml"; + + @BeforeClass + public static void before() throws Exception { + Hashtable prop = new Hashtable(); + prop.put("mapred.map.max.attempts",1L); + prop.put("mapred.task.timeout",30000L); + String [] expExcludeList = {"java.net.ConnectException", + "java.io.IOException"}; + cluster = MRCluster.createCluster(conf); + cluster.setExcludeExpList(expExcludeList); + cluster.setUp(); + cluster.restartClusterWithNewConfig(prop, confFile); + UtilsForTests.waitFor(1000); + conf = cluster.getJTClient().getProxy().getDaemonConf(); + createInput(inputDir, conf); + } + @AfterClass + public static void after() throws Exception { + cleanup(inputDir, conf); + cleanup(outputDir, conf); + cluster.tearDown(); + cluster.restart(); + } + + /** + * Verify the process tree clean up of a task after + * task is suspended and wait till the task is + * terminated based on timeout. + */ + @Test + public void testProcessTreeCleanupOfSuspendTask() throws + IOException { + TaskInfo taskInfo = null; + TaskID tID = null; + TTTaskInfo [] ttTaskinfo = null; + String pid = null; + TTProtocol ttIns = null; + TTClient ttClientIns = null; + int counter = 0; + + JobConf jobConf = new JobConf(conf); + jobConf.setJobName("Message Display"); + jobConf.setJarByClass(GenerateTaskChildProcess.class); + jobConf.setMapperClass(GenerateTaskChildProcess.StrDisplayMapper.class); + jobConf.setNumMapTasks(1); + jobConf.setNumReduceTasks(0); + jobConf.setMaxMapAttempts(1); + cleanup(outputDir, conf); + FileInputFormat.setInputPaths(jobConf, inputDir); + FileOutputFormat.setOutputPath(jobConf, outputDir); + + JTClient jtClient = cluster.getJTClient(); + JobClient client = jtClient.getClient(); + JTProtocol wovenClient = cluster.getJTClient().getProxy(); + RunningJob runJob = client.submitJob(jobConf); + JobID id = runJob.getID(); + JobInfo jInfo = wovenClient.getJobInfo(id); + Assert.assertNotNull("Job information is null",jInfo); + + Assert.assertTrue("Job has not been started for 1 min.", + jtClient.isJobStarted(id)); + + TaskInfo[] taskInfos = wovenClient.getTaskInfo(id); + for (TaskInfo taskinfo : taskInfos) { + if (!taskinfo.isSetupOrCleanup()) { + taskInfo = taskinfo; + break; + } + } + + Assert.assertTrue("Task has not been started for 1 min.", + jtClient.isTaskStarted(taskInfo)); + + tID = TaskID.downgrade(taskInfo.getTaskID()); + TaskAttemptID tAttID = new TaskAttemptID(tID,0); + FinishTaskControlAction action = new FinishTaskControlAction(tID); + + Collection ttClients = cluster.getTTClients(); + for (TTClient ttClient : ttClients) { + TTProtocol tt = ttClient.getProxy(); + tt.sendAction(action); + ttTaskinfo = tt.getTasks(); + for (TTTaskInfo tttInfo : ttTaskinfo) { + if (!tttInfo.isTaskCleanupTask()) { + pid = tttInfo.getPid(); + ttClientIns = ttClient; + ttIns = tt; + break; + } + } + if (ttClientIns != null) { + break; + } + } + Assert.assertTrue("Map process tree is not alive before task suspend.", + ttIns.isProcessTreeAlive(pid)); + LOG.info("Suspend the task of process id " + pid); + boolean exitCode = ttIns.suspendProcess(pid); + Assert.assertTrue("Process(" + pid + ") has not been suspended", + exitCode); + + LOG.info("Waiting till the task is failed..."); + taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID()); + counter = 0; + while (counter < 60) { + if (taskInfo.getTaskStatus().length > 0) { + if (taskInfo.getTaskStatus()[0].getRunState() == + TaskStatus.State.FAILED) { + break; + } + } + UtilsForTests.waitFor(1000); + taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID()); + counter ++; + } + Assert.assertTrue("Suspended task is failed " + + "before the timeout interval.", counter > 30 && + taskInfo.getTaskStatus()[0].getRunState() == TaskStatus.State.FAILED); + + LOG.info("Waiting till the job is completed..."); + counter = 0; + while (counter < 60) { + if (jInfo.getStatus().isJobComplete()) { + break; + } + UtilsForTests.waitFor(1000); + jInfo = wovenClient.getJobInfo(id); + counter ++; + } + Assert.assertTrue("Job has not been completed for 1 min.", + counter != 60); + ttIns = ttClientIns.getProxy(); + UtilsForTests.waitFor(1000); + Assert.assertTrue("Map process is still alive after task has been failed.", + !ttIns.isProcessTreeAlive(pid)); + } + + /** + * Verify the process tree cleanup of task after task + * is suspended and resumed the task before the timeout. + */ + @Test + public void testProcessTreeCleanupOfSuspendAndResumeTask() throws + IOException { + TaskInfo taskInfo = null; + TaskID tID = null; + TTTaskInfo [] ttTaskinfo = null; + String pid = null; + TTProtocol ttIns = null; + TTClient ttClientIns = null; + int counter = 0; + + JobConf jobConf = new JobConf(conf); + jobConf.setJobName("Message Display"); + jobConf.setJarByClass(GenerateTaskChildProcess.class); + jobConf.setMapperClass(GenerateTaskChildProcess.StrDisplayMapper.class); + jobConf.setNumMapTasks(1); + jobConf.setNumReduceTasks(0); + jobConf.setMaxMapAttempts(1); + cleanup(outputDir, conf); + FileInputFormat.setInputPaths(jobConf, inputDir); + FileOutputFormat.setOutputPath(jobConf, outputDir); + + JTClient jtClient = cluster.getJTClient(); + JobClient client = jtClient.getClient(); + JTProtocol wovenClient = cluster.getJTClient().getProxy(); + RunningJob runJob = client.submitJob(jobConf); + JobID id = runJob.getID(); + JobInfo jInfo = wovenClient.getJobInfo(id); + Assert.assertNotNull("Job information is null",jInfo); + + Assert.assertTrue("Job has not been started for 1 min.", + jtClient.isJobStarted(id)); + + TaskInfo[] taskInfos = wovenClient.getTaskInfo(id); + for (TaskInfo taskinfo : taskInfos) { + if (!taskinfo.isSetupOrCleanup()) { + taskInfo = taskinfo; + break; + } + } + + Assert.assertTrue("Task has not been started for 1 min.", + jtClient.isTaskStarted(taskInfo)); + + tID = TaskID.downgrade(taskInfo.getTaskID()); + TaskAttemptID tAttID = new TaskAttemptID(tID,0); + FinishTaskControlAction action = new FinishTaskControlAction(tID); + + Collection ttClients = cluster.getTTClients(); + for (TTClient ttClient : ttClients) { + TTProtocol tt = ttClient.getProxy(); + tt.sendAction(action); + ttTaskinfo = tt.getTasks(); + for (TTTaskInfo tttInfo : ttTaskinfo) { + if (!tttInfo.isTaskCleanupTask()) { + pid = tttInfo.getPid(); + ttClientIns = ttClient; + ttIns = tt; + break; + } + } + if (ttClientIns != null) { + break; + } + } + Assert.assertTrue("Map process tree is not alive before task suspend.", + ttIns.isProcessTreeAlive(pid)); + LOG.info("Suspend the task of process id " + pid); + boolean exitCode = ttIns.suspendProcess(pid); + Assert.assertTrue("Process(" + pid + ") has not been suspended", + exitCode); + Assert.assertTrue("Map process is not alive after task " + + "has been suspended.", ttIns.isProcessTreeAlive(pid)); + UtilsForTests.waitFor(5000); + exitCode = ttIns.resumeProcess(pid); + Assert.assertTrue("Suspended process(" + pid + ") has not been resumed", + exitCode); + UtilsForTests.waitFor(35000); + taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID()); + Assert.assertTrue("Suspended task has not been resumed", + taskInfo.getTaskStatus()[0].getRunState() == + TaskStatus.State.RUNNING); + UtilsForTests.waitFor(1000); + Assert.assertTrue("Map process tree is not alive after task is resumed.", + ttIns.isProcessTreeAlive(pid)); + } + + private static void cleanup(Path dir, Configuration conf) throws + IOException { + FileSystem fs = dir.getFileSystem(conf); + fs.delete(dir, true); + } + + private static void createInput(Path inDir, Configuration conf) throws + IOException { + String input = "Hadoop is framework for data intensive distributed " + + "applications.\n Hadoop enables applications " + + "to work with thousands of nodes."; + FileSystem fs = inDir.getFileSystem(conf); + if (!fs.mkdirs(inDir)) { + throw new IOException("Failed to create the input directory:" + + inDir.toString()); + } + fs.setPermission(inDir, new FsPermission(FsAction.ALL, + FsAction.ALL, FsAction.ALL)); + DataOutputStream file = fs.create(new Path(inDir, "data.txt")); + int i = 0; + while(i < 10) { + file.writeBytes(input); + i++; + } + file.close(); + } + +}